1#![warn(missing_docs, unused)]
13
14mod client;
15pub mod interfaces;
16pub(crate) mod logging;
17pub mod messaging;
18pub mod multicast_groups;
19mod nduseropt;
20pub mod neighbors;
21mod netlink_packet;
22pub mod protocol_family;
23pub(crate) mod route_eventloop;
24pub(crate) mod route_tables;
25pub mod routes;
26mod rules;
27pub(crate) mod util;
28
29use std::num::NonZeroU64;
30
31use fidl_fuchsia_net_interfaces as fnet_interfaces;
32use fidl_fuchsia_net_ndp as fnet_ndp;
33use fidl_fuchsia_net_neighbor as fnet_neighbor;
34use fidl_fuchsia_net_root as fnet_root;
35use fidl_fuchsia_net_routes as fnet_routes;
36use fidl_fuchsia_net_routes_admin as fnet_routes_admin;
37use fidl_fuchsia_net_routes_ext as fnet_routes_ext;
38use fidl_fuchsia_net_sockets as fnet_sockets;
39use fuchsia_component::client::connect_to_protocol;
40use futures::StreamExt as _;
41use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
42use futures::channel::oneshot;
43use net_types::ip::{Ipv4, Ipv6};
44use netlink_packet_route::RouteNetlinkMessage;
45use netlink_packet_sock_diag::{SockDiagRequest, SockDiagResponse};
46use protocol_family::route::NetlinkRouteNotifiedGroup;
47
48use crate::client::{AsyncWorkItem, ClientIdGenerator, ClientTable, InternalClient};
49use crate::logging::{log_debug, log_warn};
50use crate::messaging::{NetlinkContext, UnvalidatedNetlinkMessage as _, ValidationError};
51pub use crate::netlink_packet::errno::Errno;
52use crate::protocol_family::route::{
53 NetlinkRoute, NetlinkRouteClient, NetlinkRouteRequestHandler, RouteAsyncWork,
54};
55use crate::protocol_family::sock_diag::{
56 NetlinkSockDiag, NetlinkSockDiagClient, NetlinkSockDiagRequestHandler, SockDiagEventLoop,
57};
58use crate::protocol_family::{NetlinkFamilyRequestHandler as _, ProtocolFamily};
59use crate::route_eventloop::RouteEventLoop;
60
61pub const NETLINK_LOG_TAG: &'static str = "netlink";
63
64#[derive(Debug, Clone, Copy)]
66pub enum SysctlInterfaceSelector {
67 All,
72 Default,
74 Id(NonZeroU64),
76}
77
78pub struct Netlink<C: NetlinkContext> {
80 id_generator: ClientIdGenerator,
82 route_client_sender: UnboundedSender<ClientWithReceiver<C, NetlinkRoute>>,
84 route_async_work_sink: mpsc::UnboundedSender<AsyncWorkItem<NetlinkRoute>>,
86 sock_diag_client_sender: UnboundedSender<ClientWithReceiver<C, NetlinkSockDiag>>,
88 sock_diag_async_work_sink: mpsc::UnboundedSender<AsyncWorkItem<NetlinkSockDiag>>,
90}
91
92impl<C: NetlinkContext> Netlink<C> {
93 pub fn new<H: interfaces::InterfacesHandler>(
98 interfaces_handler: H,
99 ) -> (Self, NetlinkWorkerParams<H, C>) {
100 let (route_client_sender, route_client_receiver) = mpsc::unbounded();
101 let (route_async_work_sink, async_work_receiver) = mpsc::unbounded();
102 let (sock_diag_client_sender, sock_diag_client_receiver) = mpsc::unbounded();
103 let (sock_diag_async_work_sink, sock_diag_async_work_receiver) = mpsc::unbounded();
104 (
105 Netlink {
106 id_generator: ClientIdGenerator::default(),
107 route_client_sender,
108 sock_diag_client_sender,
109 route_async_work_sink,
110 sock_diag_async_work_sink,
111 },
112 NetlinkWorkerParams {
113 interfaces_handler,
114 route_client_receiver,
115 route_async_work_receiver: async_work_receiver,
116 sock_diag_client_receiver,
117 sock_diag_async_work_receiver,
118 },
119 )
120 }
121
122 pub fn write_accept_ra_rt_table(
124 &self,
125 interface: SysctlInterfaceSelector,
126 value: i32,
127 ) -> Result<(), SysctlError> {
128 let (responder, receiver) = oneshot_sync::channel();
129 self.route_async_work_sink
130 .unbounded_send(AsyncWorkItem::Inner(RouteAsyncWork::SetAcceptRaRtTable {
131 interface,
132 value: value.into(),
133 responder,
134 }))
135 .map_err(|_| SysctlError::Disconnected)?;
136 receiver.receive().map_err(|_| SysctlError::Disconnected)?
137 }
138
139 pub fn read_accept_ra_rt_table(
141 &self,
142 interface: SysctlInterfaceSelector,
143 ) -> Result<i32, SysctlError> {
144 let (responder, receiver) = oneshot_sync::channel();
145 self.route_async_work_sink
146 .unbounded_send(AsyncWorkItem::Inner(RouteAsyncWork::GetAcceptRaRtTable {
147 interface,
148 responder,
149 }))
150 .map_err(|_| SysctlError::Disconnected)?;
151 Ok(receiver.receive().map_err(|_| SysctlError::Disconnected)??.into())
152 }
153
154 pub fn new_route_client(
161 &self,
162 sender: C::Sender<RouteNetlinkMessage>,
163 receiver: C::Receiver<RouteNetlinkMessage>,
164 ) -> Result<NetlinkRouteClient, NewClientError> {
165 let Netlink {
166 id_generator,
167 route_client_sender,
168 route_async_work_sink,
169 sock_diag_client_sender: _,
170 sock_diag_async_work_sink: _,
171 } = self;
172 let (external_client, internal_client) = client::new_client_pair::<NetlinkRoute, _>(
173 id_generator.new_id(),
174 sender,
175 route_async_work_sink.clone(),
176 );
177 route_client_sender
178 .unbounded_send(ClientWithReceiver { client: internal_client, receiver })
179 .map_err(|e| {
180 debug_assert!(e.is_disconnected());
182 NewClientError::Disconnected
183 })?;
184 Ok(NetlinkRouteClient(external_client))
185 }
186
187 pub fn new_sock_diag_client(
194 &self,
195 sender: C::Sender<SockDiagResponse>,
196 receiver: C::Receiver<SockDiagRequest>,
197 ) -> Result<NetlinkSockDiagClient, NewClientError> {
198 let Netlink {
199 id_generator,
200 route_client_sender: _,
201 route_async_work_sink: _,
202 sock_diag_client_sender,
203 sock_diag_async_work_sink,
204 } = self;
205 let (external_client, internal_client) = client::new_client_pair::<NetlinkSockDiag, _>(
206 id_generator.new_id(),
207 sender,
208 sock_diag_async_work_sink.clone(),
209 );
210 sock_diag_client_sender
211 .unbounded_send(ClientWithReceiver { client: internal_client, receiver })
212 .map_err(|e| {
213 debug_assert!(e.is_disconnected());
215 NewClientError::Disconnected
216 })?;
217 Ok(NetlinkSockDiagClient(external_client))
218 }
219}
220
221struct ClientWithReceiver<C: NetlinkContext, F: ProtocolFamily> {
223 client: InternalClient<F, C::Sender<F::Response>>,
224 receiver: C::Receiver<F::Request>,
225}
226
227#[derive(Debug)]
229pub enum NewClientError {
230 Disconnected,
233}
234
235#[derive(Debug)]
237pub enum SysctlError {
238 Disconnected,
240 NoInterface,
242 Unsupported,
244}
245
246pub struct NetlinkWorkerParams<H, C: NetlinkContext> {
248 interfaces_handler: H,
249 route_client_receiver: UnboundedReceiver<ClientWithReceiver<C, NetlinkRoute>>,
251 route_async_work_receiver:
252 futures::channel::mpsc::UnboundedReceiver<AsyncWorkItem<NetlinkRoute>>,
253 #[allow(dead_code)]
255 sock_diag_client_receiver: UnboundedReceiver<ClientWithReceiver<C, NetlinkSockDiag>>,
256 #[allow(dead_code)]
257 sock_diag_async_work_receiver:
258 futures::channel::mpsc::UnboundedReceiver<AsyncWorkItem<NetlinkSockDiag>>,
259}
260
261#[allow(missing_docs)]
263pub struct NetlinkWorkerDiscoverableProtocols {
264 pub root_interfaces: fnet_root::InterfacesProxy,
265 pub interfaces_state: fnet_interfaces::StateProxy,
266 pub v4_routes_state: fnet_routes::StateV4Proxy,
267 pub v6_routes_state: fnet_routes::StateV6Proxy,
268 pub v4_main_route_table: fnet_routes_admin::RouteTableV4Proxy,
269 pub v6_main_route_table: fnet_routes_admin::RouteTableV6Proxy,
270 pub v4_route_table_provider: fnet_routes_admin::RouteTableProviderV4Proxy,
271 pub v6_route_table_provider: fnet_routes_admin::RouteTableProviderV6Proxy,
272 pub v4_rule_table: fnet_routes_admin::RuleTableV4Proxy,
273 pub v6_rule_table: fnet_routes_admin::RuleTableV6Proxy,
274 pub ndp_option_watcher_provider: fnet_ndp::RouterAdvertisementOptionWatcherProviderProxy,
275 pub socket_diagnostics: fnet_sockets::DiagnosticsProxy,
276 pub socket_control: fnet_sockets::ControlProxy,
277 pub neighbors_view: fnet_neighbor::ViewProxy,
278 pub neighbors_controller: fnet_neighbor::ControllerProxy,
279}
280
281impl NetlinkWorkerDiscoverableProtocols {
282 fn from_environment() -> Self {
283 let root_interfaces = connect_to_protocol::<fnet_root::InterfacesMarker>()
284 .expect("connect to fuchsia.net.root.Interfaces");
285 let interfaces_state = connect_to_protocol::<fnet_interfaces::StateMarker>()
286 .expect("connect to fuchsia.net.interfaces.State");
287 let v4_routes_state =
288 connect_to_protocol::<<Ipv4 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
289 .expect("connect to fuchsia.net.routes.StateV4");
290 let v6_routes_state =
291 connect_to_protocol::<<Ipv6 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
292 .expect("connect to fuchsia.net.routes.StateV6");
293 let v4_main_route_table = connect_to_protocol::<
294 <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
295 >()
296 .expect("connect to fuchsia.net.routes.admin.RouteTableV4");
297 let v6_main_route_table = connect_to_protocol::<
298 <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
299 >()
300 .expect("connect to fuchsia.net.routes.admin.RouteTableV6");
301 let v4_route_table_provider = connect_to_protocol::<
302 <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
303 >()
304 .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV4");
305 let v6_route_table_provider = connect_to_protocol::<
306 <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
307 >()
308 .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV6");
309 let v4_rule_table = connect_to_protocol::<
310 <Ipv4 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
311 >()
312 .expect("connect to fuchsia.net.routes.admin.RuleTableV4");
313 let v6_rule_table = connect_to_protocol::<
314 <Ipv6 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
315 >()
316 .expect("connect to fuchsia.net.routes.admin.RuleTableV6");
317 let ndp_option_watcher_provider =
318 connect_to_protocol::<fnet_ndp::RouterAdvertisementOptionWatcherProviderMarker>()
319 .expect("connect to fuchsia.net.ndp.RouterAdvertisementOptionWatcherProvider");
320 let socket_diagnostics = connect_to_protocol::<fnet_sockets::DiagnosticsMarker>()
321 .expect("connect to fuchsia.net.sockets.Diagnostics");
322 let socket_control = connect_to_protocol::<fnet_sockets::ControlMarker>()
323 .expect("connect to fuchsia.net.sockets.Control");
324 let neighbors_view = connect_to_protocol::<fnet_neighbor::ViewMarker>()
325 .expect("connect to fuchsia.net.neighbor.View");
326 let neighbors_controller = connect_to_protocol::<fnet_neighbor::ControllerMarker>()
327 .expect("connect to fuchsia.net.neighbor.Controller");
328
329 Self {
330 root_interfaces,
331 interfaces_state,
332 v4_routes_state,
333 v6_routes_state,
334 v4_main_route_table,
335 v6_main_route_table,
336 v4_route_table_provider,
337 v6_route_table_provider,
338 v4_rule_table,
339 v6_rule_table,
340 ndp_option_watcher_provider,
341 socket_diagnostics,
342 socket_control,
343 neighbors_view,
344 neighbors_controller,
345 }
346 }
347}
348
349pub async fn run_netlink_worker<H: interfaces::InterfacesHandler, C: NetlinkContext>(
362 params: NetlinkWorkerParams<H, C>,
363 access_control: C::AccessControl<'_>,
364) {
365 run_netlink_worker_with_protocols(
366 params,
367 NetlinkWorkerDiscoverableProtocols::from_environment(),
368 None,
369 access_control,
370 )
371 .await;
372}
373
374pub async fn run_netlink_worker_with_protocols<
377 H: interfaces::InterfacesHandler,
378 C: NetlinkContext,
379>(
380 params: NetlinkWorkerParams<H, C>,
381 protocols: NetlinkWorkerDiscoverableProtocols,
382 on_route_initialized: Option<oneshot::Sender<()>>,
383 access_control: C::AccessControl<'_>,
384) {
385 let NetlinkWorkerParams {
386 interfaces_handler,
387 route_client_receiver,
388 route_async_work_receiver,
389 sock_diag_client_receiver,
390 sock_diag_async_work_receiver,
391 } = params;
392
393 let NetlinkWorkerDiscoverableProtocols {
394 root_interfaces,
395 interfaces_state,
396 v4_routes_state,
397 v6_routes_state,
398 v4_main_route_table,
399 v6_main_route_table,
400 v4_route_table_provider,
401 v6_route_table_provider,
402 v4_rule_table,
403 v6_rule_table,
404 ndp_option_watcher_provider,
405 socket_diagnostics,
406 socket_control,
407 neighbors_view,
408 neighbors_controller,
409 } = protocols;
410
411 let route_clients = ClientTable::default();
412 let (route_request_sink, route_request_stream) = mpsc::channel(1);
413
414 let route_event_loop = {
415 let route_clients = route_clients.clone();
416 async move {
417 let event_loop: RouteEventLoop<H, C::Sender<_>> = RouteEventLoop {
418 interfaces_proxy: root_interfaces,
419 interfaces_state_proxy: interfaces_state,
420 v4_routes_state,
421 v6_routes_state,
422 v4_main_route_table,
423 v6_main_route_table,
424 v4_route_table_provider,
425 v6_route_table_provider,
426 v4_rule_table,
427 v6_rule_table,
428 ndp_option_watcher_provider,
429 neighbors_view,
430 neighbors_controller,
431 route_clients,
432 request_stream: route_request_stream,
433 interfaces_handler,
434 async_work_receiver: route_async_work_receiver,
435 };
436
437 event_loop.run(on_route_initialized).await;
438 }
439 };
440
441 let route_client_receiver_loop = {
442 let access_control = access_control.clone();
443 async move {
444 connect_new_clients::<C, NetlinkRoute>(
446 route_clients,
447 route_client_receiver,
448 NetlinkRouteRequestHandler { unified_request_sink: route_request_sink },
449 access_control,
450 )
451 .await;
452 panic!("route_client_receiver stream unexpectedly finished");
453 }
454 };
455
456 let sock_diag_clients = ClientTable::default();
457 let (sock_diag_request_sink, sock_diag_request_stream) = mpsc::channel(1);
458
459 let sock_diag_event_loop = async move {
460 SockDiagEventLoop {
461 socket_diagnostics,
462 socket_control,
463 request_stream: sock_diag_request_stream,
464 async_work_receiver: sock_diag_async_work_receiver,
465 }
466 .run()
467 .await;
468 };
469
470 let sock_diag_client_receiver_loop = async move {
471 connect_new_clients::<C, NetlinkSockDiag>(
473 sock_diag_clients,
474 sock_diag_client_receiver,
475 NetlinkSockDiagRequestHandler { sock_diag_request_sink },
476 access_control,
477 )
478 .await;
479 panic!("sock_diag_client_receiver stream unexpectedly finished");
480 };
481
482 futures::future::join4(
483 route_event_loop,
484 route_client_receiver_loop,
485 sock_diag_event_loop,
486 sock_diag_client_receiver_loop,
487 )
488 .await;
489}
490
491async fn connect_new_clients<C: NetlinkContext, F: ProtocolFamily>(
496 client_table: ClientTable<F, C::Sender<F::Response>>,
497 client_receiver: UnboundedReceiver<ClientWithReceiver<C, F>>,
498 request_handler_impl: F::RequestHandler<C::Sender<F::Response>>,
499 access_control: C::AccessControl<'_>,
500) {
501 client_receiver
502 .for_each_concurrent(None, async |ClientWithReceiver { client, receiver }| {
504 client_table.add_client(client.clone());
505 let client = run_client_request_handler::<C, F>(
506 client,
507 receiver,
508 request_handler_impl.clone(),
509 access_control.clone(),
510 )
511 .await;
512 client_table.remove_client(client);
513 })
514 .await;
515}
516
517async fn run_client_request_handler<C: NetlinkContext, F: ProtocolFamily>(
522 client: InternalClient<F, C::Sender<F::Response>>,
523 receiver: C::Receiver<F::Request>,
524 handler: F::RequestHandler<C::Sender<F::Response>>,
525 access_control: C::AccessControl<'_>,
526) -> InternalClient<F, C::Sender<F::Response>> {
527 struct FoldState<C, H, P> {
530 client: C,
531 handler: H,
532 access_control: P,
533 }
534
535 let FoldState { client, handler: _, access_control: _ } = receiver
540 .fold(
541 FoldState { client, handler, access_control },
542 |FoldState { mut client, mut handler, access_control }, req| async {
543 match req.validate_creds_and_get_message(&access_control) {
544 Ok(req) => {
545 log_debug!("{} Received request: {:?}", client, req);
546 handler.handle_request(req, &mut client).await
547 }
548 Err(e) => {
549 match &e {
550 ValidationError::Parse(e) => {
551 log_warn!("{client} failed to parse netlink message: {e:?}");
552 }
553 p @ ValidationError::Permission { .. } => {
554 log_debug!("{client} permission check failed {p:?}")
555 }
556 }
557 if let Some(rsp) = e.into_error_message() {
558 client.send_unicast(rsp)
559 }
560 }
561 }
562 FoldState { client, handler, access_control }
563 },
564 )
565 .await;
566
567 client
568}
569
570#[cfg(test)]
571mod tests {
572 use super::*;
573 use fuchsia_async as fasync;
574 use futures::FutureExt as _;
575
576 use assert_matches::assert_matches;
577 use netlink_packet_core::{ErrorMessage, NetlinkPayload};
578 use std::num::NonZeroI32;
579 use std::pin::pin;
580
581 use crate::messaging::NetlinkMessageWithCreds;
582 use crate::messaging::testutil::{FakeCreds, SentMessage, TestNetlinkContext};
583 use crate::protocol_family::testutil::{
584 FakeNetlinkRequestHandler, FakeProtocolFamily, new_fake_netlink_message,
585 new_fake_netlink_message_with_creds,
586 };
587
588 #[fasync::run_singlethreaded(test)]
589 async fn test_run_client_request_handler() {
590 let (mut req_sender, req_receiver) = mpsc::channel(0);
591 let (mut client_sink, client, async_work_drain_task) =
592 crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
593 crate::client::testutil::CLIENT_ID_1,
594 std::iter::empty(),
595 );
596 let join_handle = fasync::Task::spawn(async_work_drain_task);
597
598 {
599 let mut client_task = pin!(
600 run_client_request_handler::<TestNetlinkContext, FakeProtocolFamily>(
601 client,
602 req_receiver,
603 FakeNetlinkRequestHandler,
604 Default::default()
605 )
606 .fuse()
607 );
608
609 assert_matches!((&mut client_task).now_or_never(), None);
610 assert_eq!(&client_sink.take_messages()[..], &[]);
611
612 req_sender
616 .try_send(new_fake_netlink_message_with_creds())
617 .expect("should send without error");
618 let mut could_send_fut =
619 pin!(futures::future::poll_fn(|ctx| req_sender.poll_ready(ctx)).fuse());
620 futures::select!(
621 res = could_send_fut => res.expect("should be able to send without error"),
622 _client = client_task => panic!("client task unexpectedly finished"),
623 );
624 assert_eq!(
625 &client_sink.take_messages()[..],
626 &[SentMessage::unicast(new_fake_netlink_message())]
627 );
628
629 req_sender.close_channel();
631 let _client = client_task.await;
632 assert_eq!(&client_sink.take_messages()[..], &[]);
633 }
634 join_handle.await;
635 }
636
637 #[fasync::run_singlethreaded(test)]
638 async fn test_connect_new_clients() {
639 let client_table = ClientTable::default();
640 let scope = fasync::Scope::new();
641 let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
642 let mut client_acceptor_fut = Box::pin(
643 connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
644 client_table.clone(),
645 client_receiver,
646 FakeNetlinkRequestHandler,
647 Default::default(),
648 )
649 .fuse(),
650 );
651
652 assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
653
654 let (mut _client_sink1, client1, async_work_drain_task) =
656 crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
657 crate::client::testutil::CLIENT_ID_1,
658 std::iter::empty(),
659 );
660 let _join_handle = scope.spawn(async_work_drain_task);
661 let (mut req_sender1, req_receiver1) = mpsc::channel(0);
662 client_sender
663 .unbounded_send(ClientWithReceiver { client: client1, receiver: req_receiver1 })
664 .expect("should send without error");
665
666 let (mut client_sink2, client2, async_work_drain_task) =
668 crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
669 crate::client::testutil::CLIENT_ID_2,
670 std::iter::empty(),
671 );
672 let _join_handle = scope.spawn(async_work_drain_task);
673 let (mut req_sender2, req_receiver2) = mpsc::channel(0);
674 client_sender
675 .unbounded_send(ClientWithReceiver { client: client2, receiver: req_receiver2 })
676 .expect("should send without error");
677
678 req_sender2
683 .try_send(new_fake_netlink_message_with_creds())
684 .expect("should send without error");
685 let mut could_send_fut =
686 pin!(futures::future::poll_fn(|ctx| req_sender2.poll_ready(ctx)).fuse());
687 futures::select!(
688 res = could_send_fut => res.expect("should be able to send without error"),
689 () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
690 );
691 assert_eq!(
692 &client_table.client_ids()[..],
693 [client::testutil::CLIENT_ID_1, client::testutil::CLIENT_ID_2]
694 );
695 assert_eq!(
696 &client_sink2.take_messages()[..],
697 &[SentMessage::unicast(new_fake_netlink_message())]
698 );
699
700 req_sender1.close_channel();
702 req_sender2.close_channel();
703 assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
704
705 client_sender.close_channel();
707 client_acceptor_fut.await;
708
709 assert_eq!(&client_table.client_ids()[..], []);
711
712 drop(client_table);
713 scope.join().await;
714 }
715
716 #[fasync::run_singlethreaded(test)]
717 async fn test_permissions() {
718 let client_table = ClientTable::default();
719 let scope = fasync::Scope::new();
720 let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
721 let mut client_acceptor_fut = Box::pin(
722 connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
723 client_table.clone(),
724 client_receiver,
725 FakeNetlinkRequestHandler,
726 Default::default(),
727 )
728 .fuse(),
729 );
730 assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
731
732 let (mut client_sink, client, async_work_drain_task) =
733 crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
734 crate::client::testutil::CLIENT_ID_1,
735 std::iter::empty(),
736 );
737 let _join_handle = scope.spawn(async_work_drain_task);
738 let (mut req_sender, req_receiver) = mpsc::channel(0);
739 client_sender
740 .unbounded_send(ClientWithReceiver { client, receiver: req_receiver })
741 .expect("should send without error");
742
743 let message = NetlinkMessageWithCreds::new(
744 new_fake_netlink_message(),
745 FakeCreds::with_error(Errno::new(libc::EPERM).unwrap()),
746 );
747 req_sender.try_send(message).expect("should send without error");
748
749 let response = futures::select!(
750 res = client_sink.next_message().fuse() => res,
751 () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
752 );
753
754 assert_matches!(
755 response.message.payload,
756 NetlinkPayload::Error(ErrorMessage { code: Some(error_code), .. }) => {
757 assert_eq!(error_code , NonZeroI32::new(-libc::EPERM).unwrap());
758 }
759 );
760 }
761}