1#![warn(missing_docs, unused)]
13
14mod client;
15pub mod interfaces;
16pub(crate) mod logging;
17pub mod messaging;
18pub mod multicast_groups;
19mod nduseropt;
20pub mod neighbors;
21mod netlink_packet;
22pub mod protocol_family;
23pub(crate) mod route_eventloop;
24pub(crate) mod route_tables;
25pub mod routes;
26mod rules;
27pub(crate) mod util;
28
29use std::num::NonZeroU64;
30
31use fuchsia_component::client::connect_to_protocol;
32use futures::StreamExt as _;
33use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
34use futures::channel::oneshot;
35use net_types::ip::{Ipv4, Ipv6};
36use netlink_packet_route::RouteNetlinkMessage;
37use netlink_packet_sock_diag::{SockDiagRequest, SockDiagResponse};
38use protocol_family::route::NetlinkRouteNotifiedGroup;
39use {
40 fidl_fuchsia_net_interfaces as fnet_interfaces, fidl_fuchsia_net_ndp as fnet_ndp,
41 fidl_fuchsia_net_neighbor as fnet_neighbor, fidl_fuchsia_net_root as fnet_root,
42 fidl_fuchsia_net_routes as fnet_routes, fidl_fuchsia_net_routes_admin as fnet_routes_admin,
43 fidl_fuchsia_net_routes_ext as fnet_routes_ext, fidl_fuchsia_net_sockets as fnet_sockets,
44};
45
46use crate::client::{AsyncWorkItem, ClientIdGenerator, ClientTable, InternalClient};
47use crate::logging::{log_debug, log_warn};
48use crate::messaging::{NetlinkContext, UnvalidatedNetlinkMessage as _, ValidationError};
49pub use crate::netlink_packet::errno::Errno;
50use crate::protocol_family::route::{
51 NetlinkRoute, NetlinkRouteClient, NetlinkRouteRequestHandler, RouteAsyncWork,
52};
53use crate::protocol_family::sock_diag::{
54 NetlinkSockDiag, NetlinkSockDiagClient, NetlinkSockDiagRequestHandler, SockDiagEventLoop,
55};
56use crate::protocol_family::{NetlinkFamilyRequestHandler as _, ProtocolFamily};
57use crate::route_eventloop::RouteEventLoop;
58
59pub const NETLINK_LOG_TAG: &'static str = "netlink";
61
62#[derive(Debug, Clone, Copy)]
64pub enum SysctlInterfaceSelector {
65 All,
70 Default,
72 Id(NonZeroU64),
74}
75
76pub struct Netlink<C: NetlinkContext> {
78 id_generator: ClientIdGenerator,
80 route_client_sender: UnboundedSender<ClientWithReceiver<C, NetlinkRoute>>,
82 route_async_work_sink: mpsc::UnboundedSender<AsyncWorkItem<NetlinkRoute>>,
84 sock_diag_client_sender: UnboundedSender<ClientWithReceiver<C, NetlinkSockDiag>>,
86 sock_diag_async_work_sink: mpsc::UnboundedSender<AsyncWorkItem<NetlinkSockDiag>>,
88}
89
90impl<C: NetlinkContext> Netlink<C> {
91 pub fn new<H: interfaces::InterfacesHandler>(
96 interfaces_handler: H,
97 ) -> (Self, NetlinkWorkerParams<H, C>) {
98 let (route_client_sender, route_client_receiver) = mpsc::unbounded();
99 let (route_async_work_sink, async_work_receiver) = mpsc::unbounded();
100 let (sock_diag_client_sender, sock_diag_client_receiver) = mpsc::unbounded();
101 let (sock_diag_async_work_sink, sock_diag_async_work_receiver) = mpsc::unbounded();
102 (
103 Netlink {
104 id_generator: ClientIdGenerator::default(),
105 route_client_sender,
106 sock_diag_client_sender,
107 route_async_work_sink,
108 sock_diag_async_work_sink,
109 },
110 NetlinkWorkerParams {
111 interfaces_handler,
112 route_client_receiver,
113 route_async_work_receiver: async_work_receiver,
114 sock_diag_client_receiver,
115 sock_diag_async_work_receiver,
116 },
117 )
118 }
119
120 pub fn write_accept_ra_rt_table(
122 &self,
123 interface: SysctlInterfaceSelector,
124 value: i32,
125 ) -> Result<(), SysctlError> {
126 let (responder, receiver) = oneshot_sync::channel();
127 self.route_async_work_sink
128 .unbounded_send(AsyncWorkItem::Inner(RouteAsyncWork::SetAcceptRaRtTable {
129 interface,
130 value: value.into(),
131 responder,
132 }))
133 .map_err(|_| SysctlError::Disconnected)?;
134 receiver.receive().map_err(|_| SysctlError::Disconnected)?
135 }
136
137 pub fn read_accept_ra_rt_table(
139 &self,
140 interface: SysctlInterfaceSelector,
141 ) -> Result<i32, SysctlError> {
142 let (responder, receiver) = oneshot_sync::channel();
143 self.route_async_work_sink
144 .unbounded_send(AsyncWorkItem::Inner(RouteAsyncWork::GetAcceptRaRtTable {
145 interface,
146 responder,
147 }))
148 .map_err(|_| SysctlError::Disconnected)?;
149 Ok(receiver.receive().map_err(|_| SysctlError::Disconnected)??.into())
150 }
151
152 pub fn new_route_client(
159 &self,
160 sender: C::Sender<RouteNetlinkMessage>,
161 receiver: C::Receiver<RouteNetlinkMessage>,
162 ) -> Result<NetlinkRouteClient, NewClientError> {
163 let Netlink {
164 id_generator,
165 route_client_sender,
166 route_async_work_sink,
167 sock_diag_client_sender: _,
168 sock_diag_async_work_sink: _,
169 } = self;
170 let (external_client, internal_client) = client::new_client_pair::<NetlinkRoute, _>(
171 id_generator.new_id(),
172 sender,
173 route_async_work_sink.clone(),
174 );
175 route_client_sender
176 .unbounded_send(ClientWithReceiver { client: internal_client, receiver })
177 .map_err(|e| {
178 debug_assert!(e.is_disconnected());
180 NewClientError::Disconnected
181 })?;
182 Ok(NetlinkRouteClient(external_client))
183 }
184
185 pub fn new_sock_diag_client(
192 &self,
193 sender: C::Sender<SockDiagResponse>,
194 receiver: C::Receiver<SockDiagRequest>,
195 ) -> Result<NetlinkSockDiagClient, NewClientError> {
196 let Netlink {
197 id_generator,
198 route_client_sender: _,
199 route_async_work_sink: _,
200 sock_diag_client_sender,
201 sock_diag_async_work_sink,
202 } = self;
203 let (external_client, internal_client) = client::new_client_pair::<NetlinkSockDiag, _>(
204 id_generator.new_id(),
205 sender,
206 sock_diag_async_work_sink.clone(),
207 );
208 sock_diag_client_sender
209 .unbounded_send(ClientWithReceiver { client: internal_client, receiver })
210 .map_err(|e| {
211 debug_assert!(e.is_disconnected());
213 NewClientError::Disconnected
214 })?;
215 Ok(NetlinkSockDiagClient(external_client))
216 }
217}
218
219struct ClientWithReceiver<C: NetlinkContext, F: ProtocolFamily> {
221 client: InternalClient<F, C::Sender<F::Response>>,
222 receiver: C::Receiver<F::Request>,
223}
224
225#[derive(Debug)]
227pub enum NewClientError {
228 Disconnected,
231}
232
233#[derive(Debug)]
235pub enum SysctlError {
236 Disconnected,
238 NoInterface,
240 Unsupported,
242}
243
244pub struct NetlinkWorkerParams<H, C: NetlinkContext> {
246 interfaces_handler: H,
247 route_client_receiver: UnboundedReceiver<ClientWithReceiver<C, NetlinkRoute>>,
249 route_async_work_receiver:
250 futures::channel::mpsc::UnboundedReceiver<AsyncWorkItem<NetlinkRoute>>,
251 #[allow(dead_code)]
253 sock_diag_client_receiver: UnboundedReceiver<ClientWithReceiver<C, NetlinkSockDiag>>,
254 #[allow(dead_code)]
255 sock_diag_async_work_receiver:
256 futures::channel::mpsc::UnboundedReceiver<AsyncWorkItem<NetlinkSockDiag>>,
257}
258
259#[allow(missing_docs)]
261pub struct NetlinkWorkerDiscoverableProtocols {
262 pub root_interfaces: fnet_root::InterfacesProxy,
263 pub interfaces_state: fnet_interfaces::StateProxy,
264 pub v4_routes_state: fnet_routes::StateV4Proxy,
265 pub v6_routes_state: fnet_routes::StateV6Proxy,
266 pub v4_main_route_table: fnet_routes_admin::RouteTableV4Proxy,
267 pub v6_main_route_table: fnet_routes_admin::RouteTableV6Proxy,
268 pub v4_route_table_provider: fnet_routes_admin::RouteTableProviderV4Proxy,
269 pub v6_route_table_provider: fnet_routes_admin::RouteTableProviderV6Proxy,
270 pub v4_rule_table: fnet_routes_admin::RuleTableV4Proxy,
271 pub v6_rule_table: fnet_routes_admin::RuleTableV6Proxy,
272 pub ndp_option_watcher_provider: fnet_ndp::RouterAdvertisementOptionWatcherProviderProxy,
273 pub socket_diagnostics: fnet_sockets::DiagnosticsProxy,
274 pub socket_control: fnet_sockets::ControlProxy,
275 pub neighbors_view: fnet_neighbor::ViewProxy,
276}
277
278impl NetlinkWorkerDiscoverableProtocols {
279 fn from_environment() -> Self {
280 let root_interfaces = connect_to_protocol::<fnet_root::InterfacesMarker>()
281 .expect("connect to fuchsia.net.root.Interfaces");
282 let interfaces_state = connect_to_protocol::<fnet_interfaces::StateMarker>()
283 .expect("connect to fuchsia.net.interfaces.State");
284 let v4_routes_state =
285 connect_to_protocol::<<Ipv4 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
286 .expect("connect to fuchsia.net.routes.StateV4");
287 let v6_routes_state =
288 connect_to_protocol::<<Ipv6 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
289 .expect("connect to fuchsia.net.routes.StateV6");
290 let v4_main_route_table = connect_to_protocol::<
291 <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
292 >()
293 .expect("connect to fuchsia.net.routes.admin.RouteTableV4");
294 let v6_main_route_table = connect_to_protocol::<
295 <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
296 >()
297 .expect("connect to fuchsia.net.routes.admin.RouteTableV6");
298 let v4_route_table_provider = connect_to_protocol::<
299 <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
300 >()
301 .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV4");
302 let v6_route_table_provider = connect_to_protocol::<
303 <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
304 >()
305 .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV6");
306 let v4_rule_table = connect_to_protocol::<
307 <Ipv4 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
308 >()
309 .expect("connect to fuchsia.net.routes.admin.RuleTableV4");
310 let v6_rule_table = connect_to_protocol::<
311 <Ipv6 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
312 >()
313 .expect("connect to fuchsia.net.routes.admin.RuleTableV6");
314 let ndp_option_watcher_provider =
315 connect_to_protocol::<fnet_ndp::RouterAdvertisementOptionWatcherProviderMarker>()
316 .expect("connect to fuchsia.net.ndp.RouterAdvertisementOptionWatcherProvider");
317 let socket_diagnostics = connect_to_protocol::<fnet_sockets::DiagnosticsMarker>()
318 .expect("connect to fuchsia.net.sockets.Diagnostics");
319 let socket_control = connect_to_protocol::<fnet_sockets::ControlMarker>()
320 .expect("connect to fuchsia.net.sockets.Control");
321 let neighbors_view = connect_to_protocol::<fnet_neighbor::ViewMarker>()
322 .expect("connect to fuchsia.net.neighbor.View");
323
324 Self {
325 root_interfaces,
326 interfaces_state,
327 v4_routes_state,
328 v6_routes_state,
329 v4_main_route_table,
330 v6_main_route_table,
331 v4_route_table_provider,
332 v6_route_table_provider,
333 v4_rule_table,
334 v6_rule_table,
335 ndp_option_watcher_provider,
336 socket_diagnostics,
337 socket_control,
338 neighbors_view,
339 }
340 }
341}
342
343pub async fn run_netlink_worker<H: interfaces::InterfacesHandler, C: NetlinkContext>(
356 params: NetlinkWorkerParams<H, C>,
357 access_control: C::AccessControl<'_>,
358) {
359 run_netlink_worker_with_protocols(
360 params,
361 NetlinkWorkerDiscoverableProtocols::from_environment(),
362 None,
363 access_control,
364 )
365 .await;
366}
367
368pub async fn run_netlink_worker_with_protocols<
371 H: interfaces::InterfacesHandler,
372 C: NetlinkContext,
373>(
374 params: NetlinkWorkerParams<H, C>,
375 protocols: NetlinkWorkerDiscoverableProtocols,
376 on_route_initialized: Option<oneshot::Sender<()>>,
377 access_control: C::AccessControl<'_>,
378) {
379 let NetlinkWorkerParams {
380 interfaces_handler,
381 route_client_receiver,
382 route_async_work_receiver,
383 sock_diag_client_receiver,
384 sock_diag_async_work_receiver,
385 } = params;
386
387 let NetlinkWorkerDiscoverableProtocols {
388 root_interfaces,
389 interfaces_state,
390 v4_routes_state,
391 v6_routes_state,
392 v4_main_route_table,
393 v6_main_route_table,
394 v4_route_table_provider,
395 v6_route_table_provider,
396 v4_rule_table,
397 v6_rule_table,
398 ndp_option_watcher_provider,
399 socket_diagnostics,
400 socket_control,
401 neighbors_view,
402 } = protocols;
403
404 let route_clients = ClientTable::default();
405 let (route_request_sink, route_request_stream) = mpsc::channel(1);
406
407 let route_event_loop = {
408 let route_clients = route_clients.clone();
409 async move {
410 let event_loop: RouteEventLoop<H, C::Sender<_>> = RouteEventLoop {
411 interfaces_proxy: root_interfaces,
412 interfaces_state_proxy: interfaces_state,
413 v4_routes_state,
414 v6_routes_state,
415 v4_main_route_table,
416 v6_main_route_table,
417 v4_route_table_provider,
418 v6_route_table_provider,
419 v4_rule_table,
420 v6_rule_table,
421 ndp_option_watcher_provider,
422 neighbors_view,
423 route_clients,
424 request_stream: route_request_stream,
425 interfaces_handler,
426 async_work_receiver: route_async_work_receiver,
427 };
428
429 event_loop.run(on_route_initialized).await;
430 }
431 };
432
433 let route_client_receiver_loop = {
434 let access_control = access_control.clone();
435 async move {
436 connect_new_clients::<C, NetlinkRoute>(
438 route_clients,
439 route_client_receiver,
440 NetlinkRouteRequestHandler { unified_request_sink: route_request_sink },
441 access_control,
442 )
443 .await;
444 panic!("route_client_receiver stream unexpectedly finished");
445 }
446 };
447
448 let sock_diag_clients = ClientTable::default();
449 let (sock_diag_request_sink, sock_diag_request_stream) = mpsc::channel(1);
450
451 let sock_diag_event_loop = async move {
452 SockDiagEventLoop {
453 socket_diagnostics,
454 socket_control,
455 request_stream: sock_diag_request_stream,
456 async_work_receiver: sock_diag_async_work_receiver,
457 }
458 .run()
459 .await;
460 };
461
462 let sock_diag_client_receiver_loop = async move {
463 connect_new_clients::<C, NetlinkSockDiag>(
465 sock_diag_clients,
466 sock_diag_client_receiver,
467 NetlinkSockDiagRequestHandler { sock_diag_request_sink },
468 access_control,
469 )
470 .await;
471 panic!("sock_diag_client_receiver stream unexpectedly finished");
472 };
473
474 futures::future::join4(
475 route_event_loop,
476 route_client_receiver_loop,
477 sock_diag_event_loop,
478 sock_diag_client_receiver_loop,
479 )
480 .await;
481}
482
483async fn connect_new_clients<C: NetlinkContext, F: ProtocolFamily>(
488 client_table: ClientTable<F, C::Sender<F::Response>>,
489 client_receiver: UnboundedReceiver<ClientWithReceiver<C, F>>,
490 request_handler_impl: F::RequestHandler<C::Sender<F::Response>>,
491 access_control: C::AccessControl<'_>,
492) {
493 client_receiver
494 .for_each_concurrent(None, async |ClientWithReceiver { client, receiver }| {
496 client_table.add_client(client.clone());
497 let client = run_client_request_handler::<C, F>(
498 client,
499 receiver,
500 request_handler_impl.clone(),
501 access_control.clone(),
502 )
503 .await;
504 client_table.remove_client(client);
505 })
506 .await;
507}
508
509async fn run_client_request_handler<C: NetlinkContext, F: ProtocolFamily>(
514 client: InternalClient<F, C::Sender<F::Response>>,
515 receiver: C::Receiver<F::Request>,
516 handler: F::RequestHandler<C::Sender<F::Response>>,
517 access_control: C::AccessControl<'_>,
518) -> InternalClient<F, C::Sender<F::Response>> {
519 struct FoldState<C, H, P> {
522 client: C,
523 handler: H,
524 access_control: P,
525 }
526
527 let FoldState { client, handler: _, access_control: _ } = receiver
532 .fold(
533 FoldState { client, handler, access_control },
534 |FoldState { mut client, mut handler, access_control }, req| async {
535 match req.validate_creds_and_get_message(&access_control) {
536 Ok(req) => {
537 log_debug!("{} Received request: {:?}", client, req);
538 handler.handle_request(req, &mut client).await
539 }
540 Err(e) => {
541 match &e {
542 ValidationError::Parse(e) => {
543 log_warn!("{client} failed to parse netlink message: {e:?}");
544 }
545 p @ ValidationError::Permission { .. } => {
546 log_debug!("{client} permission check failed {p:?}")
547 }
548 }
549 if let Some(rsp) = e.into_error_message() {
550 client.send_unicast(rsp)
551 }
552 }
553 }
554 FoldState { client, handler, access_control }
555 },
556 )
557 .await;
558
559 client
560}
561
562#[cfg(test)]
563mod tests {
564 use super::*;
565 use fuchsia_async as fasync;
566 use futures::FutureExt as _;
567
568 use assert_matches::assert_matches;
569 use netlink_packet_core::{ErrorMessage, NetlinkPayload};
570 use std::num::NonZeroI32;
571 use std::pin::pin;
572
573 use crate::messaging::NetlinkMessageWithCreds;
574 use crate::messaging::testutil::{FakeCreds, SentMessage, TestNetlinkContext};
575 use crate::protocol_family::testutil::{
576 FakeNetlinkRequestHandler, FakeProtocolFamily, new_fake_netlink_message,
577 new_fake_netlink_message_with_creds,
578 };
579
580 #[fasync::run_singlethreaded(test)]
581 async fn test_run_client_request_handler() {
582 let (mut req_sender, req_receiver) = mpsc::channel(0);
583 let (mut client_sink, client, async_work_drain_task) =
584 crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
585 crate::client::testutil::CLIENT_ID_1,
586 std::iter::empty(),
587 );
588 let join_handle = fasync::Task::spawn(async_work_drain_task);
589
590 {
591 let mut client_task = pin!(
592 run_client_request_handler::<TestNetlinkContext, FakeProtocolFamily>(
593 client,
594 req_receiver,
595 FakeNetlinkRequestHandler,
596 Default::default()
597 )
598 .fuse()
599 );
600
601 assert_matches!((&mut client_task).now_or_never(), None);
602 assert_eq!(&client_sink.take_messages()[..], &[]);
603
604 req_sender
608 .try_send(new_fake_netlink_message_with_creds())
609 .expect("should send without error");
610 let mut could_send_fut =
611 pin!(futures::future::poll_fn(|ctx| req_sender.poll_ready(ctx)).fuse());
612 futures::select!(
613 res = could_send_fut => res.expect("should be able to send without error"),
614 _client = client_task => panic!("client task unexpectedly finished"),
615 );
616 assert_eq!(
617 &client_sink.take_messages()[..],
618 &[SentMessage::unicast(new_fake_netlink_message())]
619 );
620
621 req_sender.close_channel();
623 let _client = client_task.await;
624 assert_eq!(&client_sink.take_messages()[..], &[]);
625 }
626 join_handle.await;
627 }
628
629 #[fasync::run_singlethreaded(test)]
630 async fn test_connect_new_clients() {
631 let client_table = ClientTable::default();
632 let scope = fasync::Scope::new();
633 let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
634 let mut client_acceptor_fut = Box::pin(
635 connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
636 client_table.clone(),
637 client_receiver,
638 FakeNetlinkRequestHandler,
639 Default::default(),
640 )
641 .fuse(),
642 );
643
644 assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
645
646 let (mut _client_sink1, client1, async_work_drain_task) =
648 crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
649 crate::client::testutil::CLIENT_ID_1,
650 std::iter::empty(),
651 );
652 let _join_handle = scope.spawn(async_work_drain_task);
653 let (mut req_sender1, req_receiver1) = mpsc::channel(0);
654 client_sender
655 .unbounded_send(ClientWithReceiver { client: client1, receiver: req_receiver1 })
656 .expect("should send without error");
657
658 let (mut client_sink2, client2, async_work_drain_task) =
660 crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
661 crate::client::testutil::CLIENT_ID_2,
662 std::iter::empty(),
663 );
664 let _join_handle = scope.spawn(async_work_drain_task);
665 let (mut req_sender2, req_receiver2) = mpsc::channel(0);
666 client_sender
667 .unbounded_send(ClientWithReceiver { client: client2, receiver: req_receiver2 })
668 .expect("should send without error");
669
670 req_sender2
675 .try_send(new_fake_netlink_message_with_creds())
676 .expect("should send without error");
677 let mut could_send_fut =
678 pin!(futures::future::poll_fn(|ctx| req_sender2.poll_ready(ctx)).fuse());
679 futures::select!(
680 res = could_send_fut => res.expect("should be able to send without error"),
681 () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
682 );
683 assert_eq!(
684 &client_table.client_ids()[..],
685 [client::testutil::CLIENT_ID_1, client::testutil::CLIENT_ID_2]
686 );
687 assert_eq!(
688 &client_sink2.take_messages()[..],
689 &[SentMessage::unicast(new_fake_netlink_message())]
690 );
691
692 req_sender1.close_channel();
694 req_sender2.close_channel();
695 assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
696
697 client_sender.close_channel();
699 client_acceptor_fut.await;
700
701 assert_eq!(&client_table.client_ids()[..], []);
703
704 drop(client_table);
705 scope.join().await;
706 }
707
708 #[fasync::run_singlethreaded(test)]
709 async fn test_permissions() {
710 let client_table = ClientTable::default();
711 let scope = fasync::Scope::new();
712 let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
713 let mut client_acceptor_fut = Box::pin(
714 connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
715 client_table.clone(),
716 client_receiver,
717 FakeNetlinkRequestHandler,
718 Default::default(),
719 )
720 .fuse(),
721 );
722 assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
723
724 let (mut client_sink, client, async_work_drain_task) =
725 crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
726 crate::client::testutil::CLIENT_ID_1,
727 std::iter::empty(),
728 );
729 let _join_handle = scope.spawn(async_work_drain_task);
730 let (mut req_sender, req_receiver) = mpsc::channel(0);
731 client_sender
732 .unbounded_send(ClientWithReceiver { client, receiver: req_receiver })
733 .expect("should send without error");
734
735 let message = NetlinkMessageWithCreds::new(
736 new_fake_netlink_message(),
737 FakeCreds::with_error(Errno::new(libc::EPERM).unwrap()),
738 );
739 req_sender.try_send(message).expect("should send without error");
740
741 let response = futures::select!(
742 res = client_sink.next_message().fuse() => res,
743 () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
744 );
745
746 assert_matches!(
747 response.message.payload,
748 NetlinkPayload::Error(ErrorMessage { code: Some(error_code), .. }) => {
749 assert_eq!(error_code , NonZeroI32::new(-libc::EPERM).unwrap());
750 }
751 );
752 }
753}