1#![warn(missing_docs, unused)]
13
14mod client;
15pub mod interfaces;
16pub(crate) mod logging;
17pub mod messaging;
18pub mod multicast_groups;
19mod nduseropt;
20pub mod neighbors;
21mod netlink_packet;
22pub mod protocol_family;
23pub(crate) mod route_eventloop;
24pub(crate) mod route_tables;
25pub mod routes;
26mod rules;
27pub(crate) mod util;
28
29use std::num::NonZeroU64;
30
31use fuchsia_component::client::connect_to_protocol;
32use futures::StreamExt as _;
33use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
34use futures::channel::oneshot;
35use net_types::ip::{Ipv4, Ipv6};
36use netlink_packet_route::RouteNetlinkMessage;
37use netlink_packet_sock_diag::{SockDiagRequest, SockDiagResponse};
38use protocol_family::route::NetlinkRouteNotifiedGroup;
39use {
40 fidl_fuchsia_net_interfaces as fnet_interfaces, fidl_fuchsia_net_ndp as fnet_ndp,
41 fidl_fuchsia_net_neighbor as fnet_neighbor, fidl_fuchsia_net_root as fnet_root,
42 fidl_fuchsia_net_routes as fnet_routes, fidl_fuchsia_net_routes_admin as fnet_routes_admin,
43 fidl_fuchsia_net_routes_ext as fnet_routes_ext, fidl_fuchsia_net_sockets as fnet_sockets,
44};
45
46use crate::client::{AsyncWorkItem, ClientIdGenerator, ClientTable, InternalClient};
47use crate::logging::{log_debug, log_warn};
48use crate::messaging::{NetlinkContext, UnvalidatedNetlinkMessage as _, ValidationError};
49pub use crate::netlink_packet::errno::Errno;
50use crate::protocol_family::route::{
51 NetlinkRoute, NetlinkRouteClient, NetlinkRouteRequestHandler, RouteAsyncWork,
52};
53use crate::protocol_family::sock_diag::{
54 NetlinkSockDiag, NetlinkSockDiagClient, NetlinkSockDiagRequestHandler, SockDiagEventLoop,
55};
56use crate::protocol_family::{NetlinkFamilyRequestHandler as _, ProtocolFamily};
57use crate::route_eventloop::RouteEventLoop;
58
59pub const NETLINK_LOG_TAG: &'static str = "netlink";
61
62#[derive(Debug, Clone, Copy)]
64pub enum SysctlInterfaceSelector {
65 All,
70 Default,
72 Id(NonZeroU64),
74}
75
76pub struct Netlink<C: NetlinkContext> {
78 id_generator: ClientIdGenerator,
80 route_client_sender: UnboundedSender<ClientWithReceiver<C, NetlinkRoute>>,
82 route_async_work_sink: mpsc::UnboundedSender<AsyncWorkItem<NetlinkRoute>>,
84 sock_diag_client_sender: UnboundedSender<ClientWithReceiver<C, NetlinkSockDiag>>,
86 sock_diag_async_work_sink: mpsc::UnboundedSender<AsyncWorkItem<NetlinkSockDiag>>,
88}
89
90impl<C: NetlinkContext> Netlink<C> {
91 pub fn new<H: interfaces::InterfacesHandler>(
96 interfaces_handler: H,
97 ) -> (Self, NetlinkWorkerParams<H, C>) {
98 let (route_client_sender, route_client_receiver) = mpsc::unbounded();
99 let (route_async_work_sink, async_work_receiver) = mpsc::unbounded();
100 let (sock_diag_client_sender, sock_diag_client_receiver) = mpsc::unbounded();
101 let (sock_diag_async_work_sink, sock_diag_async_work_receiver) = mpsc::unbounded();
102 (
103 Netlink {
104 id_generator: ClientIdGenerator::default(),
105 route_client_sender,
106 sock_diag_client_sender,
107 route_async_work_sink,
108 sock_diag_async_work_sink,
109 },
110 NetlinkWorkerParams {
111 interfaces_handler,
112 route_client_receiver,
113 route_async_work_receiver: async_work_receiver,
114 sock_diag_client_receiver,
115 sock_diag_async_work_receiver,
116 },
117 )
118 }
119
120 pub fn write_accept_ra_rt_table(
122 &self,
123 interface: SysctlInterfaceSelector,
124 value: i32,
125 ) -> Result<(), SysctlError> {
126 let (responder, receiver) = oneshot_sync::channel();
127 self.route_async_work_sink
128 .unbounded_send(AsyncWorkItem::Inner(RouteAsyncWork::SetAcceptRaRtTable {
129 interface,
130 value: value.into(),
131 responder,
132 }))
133 .map_err(|_| SysctlError::Disconnected)?;
134 receiver.receive().map_err(|_| SysctlError::Disconnected)?
135 }
136
137 pub fn read_accept_ra_rt_table(
139 &self,
140 interface: SysctlInterfaceSelector,
141 ) -> Result<i32, SysctlError> {
142 let (responder, receiver) = oneshot_sync::channel();
143 self.route_async_work_sink
144 .unbounded_send(AsyncWorkItem::Inner(RouteAsyncWork::GetAcceptRaRtTable {
145 interface,
146 responder,
147 }))
148 .map_err(|_| SysctlError::Disconnected)?;
149 Ok(receiver.receive().map_err(|_| SysctlError::Disconnected)??.into())
150 }
151
152 pub fn new_route_client(
159 &self,
160 sender: C::Sender<RouteNetlinkMessage>,
161 receiver: C::Receiver<RouteNetlinkMessage>,
162 ) -> Result<NetlinkRouteClient, NewClientError> {
163 let Netlink {
164 id_generator,
165 route_client_sender,
166 route_async_work_sink,
167 sock_diag_client_sender: _,
168 sock_diag_async_work_sink: _,
169 } = self;
170 let (external_client, internal_client) = client::new_client_pair::<NetlinkRoute, _>(
171 id_generator.new_id(),
172 sender,
173 route_async_work_sink.clone(),
174 );
175 route_client_sender
176 .unbounded_send(ClientWithReceiver { client: internal_client, receiver })
177 .map_err(|e| {
178 debug_assert!(e.is_disconnected());
180 NewClientError::Disconnected
181 })?;
182 Ok(NetlinkRouteClient(external_client))
183 }
184
185 pub fn new_sock_diag_client(
192 &self,
193 sender: C::Sender<SockDiagResponse>,
194 receiver: C::Receiver<SockDiagRequest>,
195 ) -> Result<NetlinkSockDiagClient, NewClientError> {
196 let Netlink {
197 id_generator,
198 route_client_sender: _,
199 route_async_work_sink: _,
200 sock_diag_client_sender,
201 sock_diag_async_work_sink,
202 } = self;
203 let (external_client, internal_client) = client::new_client_pair::<NetlinkSockDiag, _>(
204 id_generator.new_id(),
205 sender,
206 sock_diag_async_work_sink.clone(),
207 );
208 sock_diag_client_sender
209 .unbounded_send(ClientWithReceiver { client: internal_client, receiver })
210 .map_err(|e| {
211 debug_assert!(e.is_disconnected());
213 NewClientError::Disconnected
214 })?;
215 Ok(NetlinkSockDiagClient(external_client))
216 }
217}
218
219struct ClientWithReceiver<C: NetlinkContext, F: ProtocolFamily> {
221 client: InternalClient<F, C::Sender<F::Response>>,
222 receiver: C::Receiver<F::Request>,
223}
224
225#[derive(Debug)]
227pub enum NewClientError {
228 Disconnected,
231}
232
233#[derive(Debug)]
235pub enum SysctlError {
236 Disconnected,
238 NoInterface,
240 Unsupported,
242}
243
244pub struct NetlinkWorkerParams<H, C: NetlinkContext> {
246 interfaces_handler: H,
247 route_client_receiver: UnboundedReceiver<ClientWithReceiver<C, NetlinkRoute>>,
249 route_async_work_receiver:
250 futures::channel::mpsc::UnboundedReceiver<AsyncWorkItem<NetlinkRoute>>,
251 #[allow(dead_code)]
253 sock_diag_client_receiver: UnboundedReceiver<ClientWithReceiver<C, NetlinkSockDiag>>,
254 #[allow(dead_code)]
255 sock_diag_async_work_receiver:
256 futures::channel::mpsc::UnboundedReceiver<AsyncWorkItem<NetlinkSockDiag>>,
257}
258
259#[allow(missing_docs)]
261pub struct NetlinkWorkerDiscoverableProtocols {
262 pub root_interfaces: fnet_root::InterfacesProxy,
263 pub interfaces_state: fnet_interfaces::StateProxy,
264 pub v4_routes_state: fnet_routes::StateV4Proxy,
265 pub v6_routes_state: fnet_routes::StateV6Proxy,
266 pub v4_main_route_table: fnet_routes_admin::RouteTableV4Proxy,
267 pub v6_main_route_table: fnet_routes_admin::RouteTableV6Proxy,
268 pub v4_route_table_provider: fnet_routes_admin::RouteTableProviderV4Proxy,
269 pub v6_route_table_provider: fnet_routes_admin::RouteTableProviderV6Proxy,
270 pub v4_rule_table: fnet_routes_admin::RuleTableV4Proxy,
271 pub v6_rule_table: fnet_routes_admin::RuleTableV6Proxy,
272 pub ndp_option_watcher_provider: fnet_ndp::RouterAdvertisementOptionWatcherProviderProxy,
273 pub socket_diagnostics: fnet_sockets::DiagnosticsProxy,
274 pub socket_control: fnet_sockets::ControlProxy,
275 pub neighbors_view: fnet_neighbor::ViewProxy,
276 pub neighbors_controller: fnet_neighbor::ControllerProxy,
277}
278
279impl NetlinkWorkerDiscoverableProtocols {
280 fn from_environment() -> Self {
281 let root_interfaces = connect_to_protocol::<fnet_root::InterfacesMarker>()
282 .expect("connect to fuchsia.net.root.Interfaces");
283 let interfaces_state = connect_to_protocol::<fnet_interfaces::StateMarker>()
284 .expect("connect to fuchsia.net.interfaces.State");
285 let v4_routes_state =
286 connect_to_protocol::<<Ipv4 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
287 .expect("connect to fuchsia.net.routes.StateV4");
288 let v6_routes_state =
289 connect_to_protocol::<<Ipv6 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
290 .expect("connect to fuchsia.net.routes.StateV6");
291 let v4_main_route_table = connect_to_protocol::<
292 <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
293 >()
294 .expect("connect to fuchsia.net.routes.admin.RouteTableV4");
295 let v6_main_route_table = connect_to_protocol::<
296 <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
297 >()
298 .expect("connect to fuchsia.net.routes.admin.RouteTableV6");
299 let v4_route_table_provider = connect_to_protocol::<
300 <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
301 >()
302 .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV4");
303 let v6_route_table_provider = connect_to_protocol::<
304 <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
305 >()
306 .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV6");
307 let v4_rule_table = connect_to_protocol::<
308 <Ipv4 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
309 >()
310 .expect("connect to fuchsia.net.routes.admin.RuleTableV4");
311 let v6_rule_table = connect_to_protocol::<
312 <Ipv6 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
313 >()
314 .expect("connect to fuchsia.net.routes.admin.RuleTableV6");
315 let ndp_option_watcher_provider =
316 connect_to_protocol::<fnet_ndp::RouterAdvertisementOptionWatcherProviderMarker>()
317 .expect("connect to fuchsia.net.ndp.RouterAdvertisementOptionWatcherProvider");
318 let socket_diagnostics = connect_to_protocol::<fnet_sockets::DiagnosticsMarker>()
319 .expect("connect to fuchsia.net.sockets.Diagnostics");
320 let socket_control = connect_to_protocol::<fnet_sockets::ControlMarker>()
321 .expect("connect to fuchsia.net.sockets.Control");
322 let neighbors_view = connect_to_protocol::<fnet_neighbor::ViewMarker>()
323 .expect("connect to fuchsia.net.neighbor.View");
324 let neighbors_controller = connect_to_protocol::<fnet_neighbor::ControllerMarker>()
325 .expect("connect to fuchsia.net.neighbor.Controller");
326
327 Self {
328 root_interfaces,
329 interfaces_state,
330 v4_routes_state,
331 v6_routes_state,
332 v4_main_route_table,
333 v6_main_route_table,
334 v4_route_table_provider,
335 v6_route_table_provider,
336 v4_rule_table,
337 v6_rule_table,
338 ndp_option_watcher_provider,
339 socket_diagnostics,
340 socket_control,
341 neighbors_view,
342 neighbors_controller,
343 }
344 }
345}
346
347pub async fn run_netlink_worker<H: interfaces::InterfacesHandler, C: NetlinkContext>(
360 params: NetlinkWorkerParams<H, C>,
361 access_control: C::AccessControl<'_>,
362) {
363 run_netlink_worker_with_protocols(
364 params,
365 NetlinkWorkerDiscoverableProtocols::from_environment(),
366 None,
367 access_control,
368 )
369 .await;
370}
371
372pub async fn run_netlink_worker_with_protocols<
375 H: interfaces::InterfacesHandler,
376 C: NetlinkContext,
377>(
378 params: NetlinkWorkerParams<H, C>,
379 protocols: NetlinkWorkerDiscoverableProtocols,
380 on_route_initialized: Option<oneshot::Sender<()>>,
381 access_control: C::AccessControl<'_>,
382) {
383 let NetlinkWorkerParams {
384 interfaces_handler,
385 route_client_receiver,
386 route_async_work_receiver,
387 sock_diag_client_receiver,
388 sock_diag_async_work_receiver,
389 } = params;
390
391 let NetlinkWorkerDiscoverableProtocols {
392 root_interfaces,
393 interfaces_state,
394 v4_routes_state,
395 v6_routes_state,
396 v4_main_route_table,
397 v6_main_route_table,
398 v4_route_table_provider,
399 v6_route_table_provider,
400 v4_rule_table,
401 v6_rule_table,
402 ndp_option_watcher_provider,
403 socket_diagnostics,
404 socket_control,
405 neighbors_view,
406 neighbors_controller,
407 } = protocols;
408
409 let route_clients = ClientTable::default();
410 let (route_request_sink, route_request_stream) = mpsc::channel(1);
411
412 let route_event_loop = {
413 let route_clients = route_clients.clone();
414 async move {
415 let event_loop: RouteEventLoop<H, C::Sender<_>> = RouteEventLoop {
416 interfaces_proxy: root_interfaces,
417 interfaces_state_proxy: interfaces_state,
418 v4_routes_state,
419 v6_routes_state,
420 v4_main_route_table,
421 v6_main_route_table,
422 v4_route_table_provider,
423 v6_route_table_provider,
424 v4_rule_table,
425 v6_rule_table,
426 ndp_option_watcher_provider,
427 neighbors_view,
428 neighbors_controller,
429 route_clients,
430 request_stream: route_request_stream,
431 interfaces_handler,
432 async_work_receiver: route_async_work_receiver,
433 };
434
435 event_loop.run(on_route_initialized).await;
436 }
437 };
438
439 let route_client_receiver_loop = {
440 let access_control = access_control.clone();
441 async move {
442 connect_new_clients::<C, NetlinkRoute>(
444 route_clients,
445 route_client_receiver,
446 NetlinkRouteRequestHandler { unified_request_sink: route_request_sink },
447 access_control,
448 )
449 .await;
450 panic!("route_client_receiver stream unexpectedly finished");
451 }
452 };
453
454 let sock_diag_clients = ClientTable::default();
455 let (sock_diag_request_sink, sock_diag_request_stream) = mpsc::channel(1);
456
457 let sock_diag_event_loop = async move {
458 SockDiagEventLoop {
459 socket_diagnostics,
460 socket_control,
461 request_stream: sock_diag_request_stream,
462 async_work_receiver: sock_diag_async_work_receiver,
463 }
464 .run()
465 .await;
466 };
467
468 let sock_diag_client_receiver_loop = async move {
469 connect_new_clients::<C, NetlinkSockDiag>(
471 sock_diag_clients,
472 sock_diag_client_receiver,
473 NetlinkSockDiagRequestHandler { sock_diag_request_sink },
474 access_control,
475 )
476 .await;
477 panic!("sock_diag_client_receiver stream unexpectedly finished");
478 };
479
480 futures::future::join4(
481 route_event_loop,
482 route_client_receiver_loop,
483 sock_diag_event_loop,
484 sock_diag_client_receiver_loop,
485 )
486 .await;
487}
488
489async fn connect_new_clients<C: NetlinkContext, F: ProtocolFamily>(
494 client_table: ClientTable<F, C::Sender<F::Response>>,
495 client_receiver: UnboundedReceiver<ClientWithReceiver<C, F>>,
496 request_handler_impl: F::RequestHandler<C::Sender<F::Response>>,
497 access_control: C::AccessControl<'_>,
498) {
499 client_receiver
500 .for_each_concurrent(None, async |ClientWithReceiver { client, receiver }| {
502 client_table.add_client(client.clone());
503 let client = run_client_request_handler::<C, F>(
504 client,
505 receiver,
506 request_handler_impl.clone(),
507 access_control.clone(),
508 )
509 .await;
510 client_table.remove_client(client);
511 })
512 .await;
513}
514
515async fn run_client_request_handler<C: NetlinkContext, F: ProtocolFamily>(
520 client: InternalClient<F, C::Sender<F::Response>>,
521 receiver: C::Receiver<F::Request>,
522 handler: F::RequestHandler<C::Sender<F::Response>>,
523 access_control: C::AccessControl<'_>,
524) -> InternalClient<F, C::Sender<F::Response>> {
525 struct FoldState<C, H, P> {
528 client: C,
529 handler: H,
530 access_control: P,
531 }
532
533 let FoldState { client, handler: _, access_control: _ } = receiver
538 .fold(
539 FoldState { client, handler, access_control },
540 |FoldState { mut client, mut handler, access_control }, req| async {
541 match req.validate_creds_and_get_message(&access_control) {
542 Ok(req) => {
543 log_debug!("{} Received request: {:?}", client, req);
544 handler.handle_request(req, &mut client).await
545 }
546 Err(e) => {
547 match &e {
548 ValidationError::Parse(e) => {
549 log_warn!("{client} failed to parse netlink message: {e:?}");
550 }
551 p @ ValidationError::Permission { .. } => {
552 log_debug!("{client} permission check failed {p:?}")
553 }
554 }
555 if let Some(rsp) = e.into_error_message() {
556 client.send_unicast(rsp)
557 }
558 }
559 }
560 FoldState { client, handler, access_control }
561 },
562 )
563 .await;
564
565 client
566}
567
568#[cfg(test)]
569mod tests {
570 use super::*;
571 use fuchsia_async as fasync;
572 use futures::FutureExt as _;
573
574 use assert_matches::assert_matches;
575 use netlink_packet_core::{ErrorMessage, NetlinkPayload};
576 use std::num::NonZeroI32;
577 use std::pin::pin;
578
579 use crate::messaging::NetlinkMessageWithCreds;
580 use crate::messaging::testutil::{FakeCreds, SentMessage, TestNetlinkContext};
581 use crate::protocol_family::testutil::{
582 FakeNetlinkRequestHandler, FakeProtocolFamily, new_fake_netlink_message,
583 new_fake_netlink_message_with_creds,
584 };
585
586 #[fasync::run_singlethreaded(test)]
587 async fn test_run_client_request_handler() {
588 let (mut req_sender, req_receiver) = mpsc::channel(0);
589 let (mut client_sink, client, async_work_drain_task) =
590 crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
591 crate::client::testutil::CLIENT_ID_1,
592 std::iter::empty(),
593 );
594 let join_handle = fasync::Task::spawn(async_work_drain_task);
595
596 {
597 let mut client_task = pin!(
598 run_client_request_handler::<TestNetlinkContext, FakeProtocolFamily>(
599 client,
600 req_receiver,
601 FakeNetlinkRequestHandler,
602 Default::default()
603 )
604 .fuse()
605 );
606
607 assert_matches!((&mut client_task).now_or_never(), None);
608 assert_eq!(&client_sink.take_messages()[..], &[]);
609
610 req_sender
614 .try_send(new_fake_netlink_message_with_creds())
615 .expect("should send without error");
616 let mut could_send_fut =
617 pin!(futures::future::poll_fn(|ctx| req_sender.poll_ready(ctx)).fuse());
618 futures::select!(
619 res = could_send_fut => res.expect("should be able to send without error"),
620 _client = client_task => panic!("client task unexpectedly finished"),
621 );
622 assert_eq!(
623 &client_sink.take_messages()[..],
624 &[SentMessage::unicast(new_fake_netlink_message())]
625 );
626
627 req_sender.close_channel();
629 let _client = client_task.await;
630 assert_eq!(&client_sink.take_messages()[..], &[]);
631 }
632 join_handle.await;
633 }
634
635 #[fasync::run_singlethreaded(test)]
636 async fn test_connect_new_clients() {
637 let client_table = ClientTable::default();
638 let scope = fasync::Scope::new();
639 let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
640 let mut client_acceptor_fut = Box::pin(
641 connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
642 client_table.clone(),
643 client_receiver,
644 FakeNetlinkRequestHandler,
645 Default::default(),
646 )
647 .fuse(),
648 );
649
650 assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
651
652 let (mut _client_sink1, client1, async_work_drain_task) =
654 crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
655 crate::client::testutil::CLIENT_ID_1,
656 std::iter::empty(),
657 );
658 let _join_handle = scope.spawn(async_work_drain_task);
659 let (mut req_sender1, req_receiver1) = mpsc::channel(0);
660 client_sender
661 .unbounded_send(ClientWithReceiver { client: client1, receiver: req_receiver1 })
662 .expect("should send without error");
663
664 let (mut client_sink2, client2, async_work_drain_task) =
666 crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
667 crate::client::testutil::CLIENT_ID_2,
668 std::iter::empty(),
669 );
670 let _join_handle = scope.spawn(async_work_drain_task);
671 let (mut req_sender2, req_receiver2) = mpsc::channel(0);
672 client_sender
673 .unbounded_send(ClientWithReceiver { client: client2, receiver: req_receiver2 })
674 .expect("should send without error");
675
676 req_sender2
681 .try_send(new_fake_netlink_message_with_creds())
682 .expect("should send without error");
683 let mut could_send_fut =
684 pin!(futures::future::poll_fn(|ctx| req_sender2.poll_ready(ctx)).fuse());
685 futures::select!(
686 res = could_send_fut => res.expect("should be able to send without error"),
687 () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
688 );
689 assert_eq!(
690 &client_table.client_ids()[..],
691 [client::testutil::CLIENT_ID_1, client::testutil::CLIENT_ID_2]
692 );
693 assert_eq!(
694 &client_sink2.take_messages()[..],
695 &[SentMessage::unicast(new_fake_netlink_message())]
696 );
697
698 req_sender1.close_channel();
700 req_sender2.close_channel();
701 assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
702
703 client_sender.close_channel();
705 client_acceptor_fut.await;
706
707 assert_eq!(&client_table.client_ids()[..], []);
709
710 drop(client_table);
711 scope.join().await;
712 }
713
714 #[fasync::run_singlethreaded(test)]
715 async fn test_permissions() {
716 let client_table = ClientTable::default();
717 let scope = fasync::Scope::new();
718 let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
719 let mut client_acceptor_fut = Box::pin(
720 connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
721 client_table.clone(),
722 client_receiver,
723 FakeNetlinkRequestHandler,
724 Default::default(),
725 )
726 .fuse(),
727 );
728 assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
729
730 let (mut client_sink, client, async_work_drain_task) =
731 crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
732 crate::client::testutil::CLIENT_ID_1,
733 std::iter::empty(),
734 );
735 let _join_handle = scope.spawn(async_work_drain_task);
736 let (mut req_sender, req_receiver) = mpsc::channel(0);
737 client_sender
738 .unbounded_send(ClientWithReceiver { client, receiver: req_receiver })
739 .expect("should send without error");
740
741 let message = NetlinkMessageWithCreds::new(
742 new_fake_netlink_message(),
743 FakeCreds::with_error(Errno::new(libc::EPERM).unwrap()),
744 );
745 req_sender.try_send(message).expect("should send without error");
746
747 let response = futures::select!(
748 res = client_sink.next_message().fuse() => res,
749 () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
750 );
751
752 assert_matches!(
753 response.message.payload,
754 NetlinkPayload::Error(ErrorMessage { code: Some(error_code), .. }) => {
755 assert_eq!(error_code , NonZeroI32::new(-libc::EPERM).unwrap());
756 }
757 );
758 }
759}