netlink/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! An implementation of Linux's Netlink API for Fuchsia.
6//!
7//! Netlink is a socket-based API provided by Linux that user space applications
8//! can use to interact with the kernel. The API is split up into several
9//! protocol families each offering different functionality. This crate targets
10//! the implementation of families related to networking.
11
12#![warn(missing_docs, unused)]
13
14mod client;
15pub mod interfaces;
16pub(crate) mod logging;
17pub mod messaging;
18pub mod multicast_groups;
19mod nduseropt;
20// TODO(https://fxbug.dev/285127384): Remove once used.
21#[cfg(test)]
22pub mod neighbors;
23mod netlink_packet;
24pub mod protocol_family;
25pub(crate) mod route_eventloop;
26pub(crate) mod route_tables;
27pub mod routes;
28mod rules;
29pub(crate) mod util;
30
31use std::num::NonZeroU64;
32
33use fuchsia_component::client::connect_to_protocol;
34use futures::StreamExt as _;
35use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
36use futures::channel::oneshot;
37use net_types::ip::{Ipv4, Ipv6};
38use netlink_packet_route::RouteNetlinkMessage;
39use protocol_family::route::NetlinkRouteNotifiedGroup;
40use {
41    fidl_fuchsia_net_interfaces as fnet_interfaces, fidl_fuchsia_net_ndp as fnet_ndp,
42    fidl_fuchsia_net_root as fnet_root, fidl_fuchsia_net_routes as fnet_routes,
43    fidl_fuchsia_net_routes_admin as fnet_routes_admin,
44    fidl_fuchsia_net_routes_ext as fnet_routes_ext,
45};
46
47use crate::client::{AsyncWorkItem, ClientIdGenerator, ClientTable, InternalClient};
48use crate::logging::{log_debug, log_warn};
49use crate::messaging::{NetlinkContext, UnvalidatedNetlinkMessage as _, ValidationError};
50pub use crate::netlink_packet::errno::Errno;
51use crate::protocol_family::route::{NetlinkRoute, NetlinkRouteClient, NetlinkRouteRequestHandler};
52use crate::protocol_family::{NetlinkFamilyRequestHandler as _, ProtocolFamily};
53use crate::route_eventloop::RouteEventLoop;
54
55/// The tag added to all logs generated by this crate.
56pub const NETLINK_LOG_TAG: &'static str = "netlink";
57
58/// Selects the interface for the sysctl.
59#[derive(Debug, Clone, Copy)]
60pub enum SysctlInterfaceSelector {
61    /// "all" interfaces.
62    ///
63    /// This is supposed to change all interfaces' settings, but this is a
64    /// lie for most of the sysctls, they have no effect at all when written.
65    All,
66    /// "default" interface, all interface created after this write will inherit the value.
67    Default,
68    /// The id of the interface to change.
69    Id(NonZeroU64),
70}
71
72/// The implementation of the Netlink protocol suite.
73pub struct Netlink<C: NetlinkContext> {
74    /// Generator of new Client IDs.
75    id_generator: ClientIdGenerator,
76    /// Sender to attach new `NETLINK_ROUTE` clients to the Netlink worker.
77    route_client_sender: UnboundedSender<ClientWithReceiver<C, NetlinkRoute>>,
78    /// Sender to send other async work items to the Netlink worker.
79    route_async_work_sink: mpsc::UnboundedSender<AsyncWorkItem<NetlinkRouteNotifiedGroup>>,
80}
81
82impl<C: NetlinkContext> Netlink<C> {
83    /// Returns a newly instantiated [`Netlink`] and parameters used to start the
84    /// asynchronous worker.
85    ///
86    /// Caller is expected to run the worker by calling `run_netlink_worker()`.
87    pub fn new<H: interfaces::InterfacesHandler>(
88        interfaces_handler: H,
89    ) -> (Self, NetlinkWorkerParams<H, C>) {
90        let (route_client_sender, route_client_receiver) = mpsc::unbounded();
91        let (route_async_work_sink, async_work_receiver) = mpsc::unbounded();
92        (
93            Netlink {
94                id_generator: ClientIdGenerator::default(),
95                route_client_sender,
96                route_async_work_sink,
97            },
98            NetlinkWorkerParams {
99                interfaces_handler,
100                route_client_receiver,
101                route_async_work_receiver: async_work_receiver,
102            },
103        )
104    }
105
106    /// Writes the accept_ra_rt_table sysctl for the selected interface.
107    pub fn write_accept_ra_rt_table(
108        &self,
109        interface: SysctlInterfaceSelector,
110        value: i32,
111    ) -> Result<(), SysctlError> {
112        let (responder, receiver) = oneshot_sync::channel();
113        self.route_async_work_sink
114            .unbounded_send(AsyncWorkItem::SetAcceptRaRtTable {
115                interface,
116                value: value.into(),
117                responder,
118            })
119            .map_err(|_| SysctlError::Disconnected)?;
120        receiver.receive().map_err(|_| SysctlError::Disconnected)?
121    }
122
123    /// Reads the accept_ra_rt_table sysctl for the selected interface.
124    pub fn read_accept_ra_rt_table(
125        &self,
126        interface: SysctlInterfaceSelector,
127    ) -> Result<i32, SysctlError> {
128        let (responder, receiver) = oneshot_sync::channel();
129        self.route_async_work_sink
130            .unbounded_send(AsyncWorkItem::GetAcceptRaRtTable { interface, responder })
131            .map_err(|_| SysctlError::Disconnected)?;
132        Ok(receiver.receive().map_err(|_| SysctlError::Disconnected)??.into())
133    }
134
135    /// Creates a new client of the `NETLINK_ROUTE` protocol family.
136    ///
137    /// `sender` is used by Netlink to send messages to the client.
138    /// `receiver` is used by Netlink to receive messages from the client.
139    ///
140    /// Closing the `receiver` will close this client, disconnecting `sender`.
141    pub fn new_route_client(
142        &self,
143        sender: C::Sender<RouteNetlinkMessage>,
144        receiver: C::Receiver<RouteNetlinkMessage>,
145    ) -> Result<NetlinkRouteClient, NewClientError> {
146        let Netlink { id_generator, route_client_sender, route_async_work_sink } = self;
147        let (external_client, internal_client) = client::new_client_pair::<NetlinkRoute, _>(
148            id_generator.new_id(),
149            sender,
150            route_async_work_sink.clone(),
151        );
152        route_client_sender
153            .unbounded_send(ClientWithReceiver { client: internal_client, receiver })
154            .map_err(|e| {
155                // Sending on an `UnboundedSender` can never fail with `is_full()`.
156                debug_assert!(e.is_disconnected());
157                NewClientError::Disconnected
158            })?;
159        Ok(NetlinkRouteClient(external_client))
160    }
161}
162
163/// A wrapper to hold an [`InternalClient`], and its [`Receiver`] of requests.
164struct ClientWithReceiver<C: NetlinkContext, F: ProtocolFamily> {
165    client: InternalClient<F, C::Sender<F::Response>>,
166    receiver: C::Receiver<F::Request>,
167}
168
169/// The possible error types when instantiating a new client.
170#[derive(Debug)]
171pub enum NewClientError {
172    /// The [`Netlink`] is disconnected from its associated worker, perhaps as a
173    /// result of dropping the worker.
174    Disconnected,
175}
176
177/// The possible error types when trying to access a sysctl.
178#[derive(Debug)]
179pub enum SysctlError {
180    /// The [`Netlink`] is disconnected from its associated worker.
181    Disconnected,
182    /// The interface went away.
183    NoInterface,
184    /// The written value requests for an unsupported operation.
185    Unsupported,
186}
187
188/// Parameters used to start the Netlink asynchronous worker.
189pub struct NetlinkWorkerParams<H, C: NetlinkContext> {
190    interfaces_handler: H,
191    /// Receiver of newly created `NETLINK_ROUTE` clients.
192    route_client_receiver: UnboundedReceiver<ClientWithReceiver<C, NetlinkRoute>>,
193    route_async_work_receiver:
194        futures::channel::mpsc::UnboundedReceiver<AsyncWorkItem<NetlinkRouteNotifiedGroup>>,
195}
196
197/// All of the protocols that the netlink worker connects to.
198#[allow(missing_docs)]
199pub struct NetlinkWorkerDiscoverableProtocols {
200    pub root_interfaces: fnet_root::InterfacesProxy,
201    pub interfaces_state: fnet_interfaces::StateProxy,
202    pub v4_routes_state: fnet_routes::StateV4Proxy,
203    pub v6_routes_state: fnet_routes::StateV6Proxy,
204    pub v4_main_route_table: fnet_routes_admin::RouteTableV4Proxy,
205    pub v6_main_route_table: fnet_routes_admin::RouteTableV6Proxy,
206    pub v4_route_table_provider: fnet_routes_admin::RouteTableProviderV4Proxy,
207    pub v6_route_table_provider: fnet_routes_admin::RouteTableProviderV6Proxy,
208    pub v4_rule_table: fnet_routes_admin::RuleTableV4Proxy,
209    pub v6_rule_table: fnet_routes_admin::RuleTableV6Proxy,
210    pub ndp_option_watcher_provider: fnet_ndp::RouterAdvertisementOptionWatcherProviderProxy,
211}
212
213impl NetlinkWorkerDiscoverableProtocols {
214    fn from_environment() -> Self {
215        let root_interfaces = connect_to_protocol::<fnet_root::InterfacesMarker>()
216            .expect("connect to fuchsia.net.root.Interfaces");
217        let interfaces_state = connect_to_protocol::<fnet_interfaces::StateMarker>()
218            .expect("connect to fuchsia.net.interfaces.State");
219        let v4_routes_state =
220            connect_to_protocol::<<Ipv4 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
221                .expect("connect to fuchsia.net.routes.StateV4");
222        let v6_routes_state =
223            connect_to_protocol::<<Ipv6 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
224                .expect("connect to fuchsia.net.routes.StateV6");
225        let v4_main_route_table = connect_to_protocol::<
226            <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
227        >()
228        .expect("connect to fuchsia.net.routes.admin.RouteTableV4");
229        let v6_main_route_table = connect_to_protocol::<
230            <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
231        >()
232        .expect("connect to fuchsia.net.routes.admin.RouteTableV6");
233        let v4_route_table_provider = connect_to_protocol::<
234            <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
235        >()
236        .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV4");
237        let v6_route_table_provider = connect_to_protocol::<
238            <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
239        >()
240        .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV6");
241        let v4_rule_table = connect_to_protocol::<
242            <Ipv4 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
243        >()
244        .expect("connect to fuchsia.net.routes.admin.RuleTableV4");
245        let v6_rule_table = connect_to_protocol::<
246            <Ipv6 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
247        >()
248        .expect("connect to fuchsia.net.routes.admin.RuleTableV6");
249        let ndp_option_watcher_provider =
250            connect_to_protocol::<fnet_ndp::RouterAdvertisementOptionWatcherProviderMarker>()
251                .expect("connect to fuchsia.net.ndp.RouterAdvertisementOptionWatcherProvider");
252        Self {
253            root_interfaces,
254            interfaces_state,
255            v4_routes_state,
256            v6_routes_state,
257            v4_main_route_table,
258            v6_main_route_table,
259            v4_route_table_provider,
260            v6_route_table_provider,
261            v4_rule_table,
262            v6_rule_table,
263            ndp_option_watcher_provider,
264        }
265    }
266}
267
268/// The worker encompassing all asynchronous Netlink work.
269///
270/// The worker is never expected to complete.
271///
272/// `protocols` is taken as a closure because we need to avoid creating asynchronous FIDL proxies
273/// until an executor is running, so it's helpful to defer creation until the event loop starts
274/// running.
275///
276/// # Panics
277///
278/// Panics if a non-recoverable error is encountered by the worker. For example,
279/// a FIDL error on one of the FIDL connections with the netstack.
280pub async fn run_netlink_worker<H: interfaces::InterfacesHandler, C: NetlinkContext>(
281    params: NetlinkWorkerParams<H, C>,
282    access_control: C::AccessControl<'_>,
283) {
284    run_netlink_worker_with_protocols(
285        params,
286        NetlinkWorkerDiscoverableProtocols::from_environment(),
287        None,
288        access_control,
289    )
290    .await;
291}
292
293/// Same as `run_netlink_worker()`, but allows to pass custom
294/// `NetlinkWorkerDiscoverableProtocols`.
295pub async fn run_netlink_worker_with_protocols<
296    H: interfaces::InterfacesHandler,
297    C: NetlinkContext,
298>(
299    params: NetlinkWorkerParams<H, C>,
300    protocols: NetlinkWorkerDiscoverableProtocols,
301    on_route_initialized: Option<oneshot::Sender<()>>,
302    access_control: C::AccessControl<'_>,
303) {
304    let NetlinkWorkerParams {
305        interfaces_handler,
306        route_client_receiver,
307        route_async_work_receiver,
308    } = params;
309
310    let route_clients = ClientTable::default();
311    let (route_request_sink, route_request_stream) = mpsc::channel(1);
312
313    let route_event_loop = {
314        let route_clients = route_clients.clone();
315        async move {
316            let NetlinkWorkerDiscoverableProtocols {
317                root_interfaces,
318                interfaces_state,
319                v4_routes_state,
320                v6_routes_state,
321                v4_main_route_table,
322                v6_main_route_table,
323                v4_route_table_provider,
324                v6_route_table_provider,
325                v4_rule_table,
326                v6_rule_table,
327                ndp_option_watcher_provider,
328            } = protocols;
329            let event_loop: RouteEventLoop<H, C::Sender<_>> = RouteEventLoop {
330                interfaces_proxy: root_interfaces,
331                interfaces_state_proxy: interfaces_state,
332                v4_routes_state,
333                v6_routes_state,
334                v4_main_route_table,
335                v6_main_route_table,
336                v4_route_table_provider,
337                v6_route_table_provider,
338                v4_rule_table,
339                v6_rule_table,
340                ndp_option_watcher_provider,
341                route_clients,
342                request_stream: route_request_stream,
343                interfaces_handler,
344                async_work_receiver: route_async_work_receiver,
345            };
346
347            event_loop.run(on_route_initialized).await;
348        }
349    };
350
351    let route_client_receiver_loop = {
352        async move {
353            // Accept new NETLINK_ROUTE clients.
354            connect_new_clients::<C, NetlinkRoute>(
355                route_clients,
356                route_client_receiver,
357                NetlinkRouteRequestHandler { unified_request_sink: route_request_sink },
358                access_control,
359            )
360            .await;
361            panic!("route_client_receiver stream unexpectedly finished");
362        }
363    };
364
365    futures::future::join(route_event_loop, route_client_receiver_loop).await;
366}
367
368/// Receives clients from the given receiver, adding them to the given table.
369///
370/// A "Request Handler" Task will be spawned for each received client. The given
371/// `request_handler_impl` defines how the requests will be handled.
372async fn connect_new_clients<C: NetlinkContext, F: ProtocolFamily>(
373    client_table: ClientTable<F, C::Sender<F::Response>>,
374    client_receiver: UnboundedReceiver<ClientWithReceiver<C, F>>,
375    request_handler_impl: F::RequestHandler<C::Sender<F::Response>>,
376    access_control: C::AccessControl<'_>,
377) {
378    client_receiver
379        // Drive each client concurrently with `for_each_concurrent`.
380        .for_each_concurrent(None, async |ClientWithReceiver { client, receiver }| {
381            client_table.add_client(client.clone());
382            let client = run_client_request_handler::<C, F>(
383                client,
384                receiver,
385                request_handler_impl.clone(),
386                access_control.clone(),
387            )
388            .await;
389            client_table.remove_client(client);
390        })
391        .await;
392}
393
394/// Reads messages from the `receiver` and handles them using the `handler`.
395///
396/// The task terminates when the underlying `Receiver` closes, yielding the
397/// original client.
398async fn run_client_request_handler<C: NetlinkContext, F: ProtocolFamily>(
399    client: InternalClient<F, C::Sender<F::Response>>,
400    receiver: C::Receiver<F::Request>,
401    handler: F::RequestHandler<C::Sender<F::Response>>,
402    access_control: C::AccessControl<'_>,
403) -> InternalClient<F, C::Sender<F::Response>> {
404    // State needed to handle an individual request, that is cycled through the
405    // `fold` combinator below.
406    struct FoldState<C, H, P> {
407        client: C,
408        handler: H,
409        access_control: P,
410    }
411
412    // Use `fold` for two reasons. First, it processes requests serially,
413    // ensuring requests are handled in order. Second, it allows us to
414    // "hand-off" the client/handler from one request to the other, avoiding
415    // copies for each request.
416    let FoldState { client, handler: _, access_control: _ } = receiver
417        .fold(
418            FoldState { client, handler, access_control },
419            |FoldState { mut client, mut handler, access_control }, req| async {
420                match req.validate_creds_and_get_message(&access_control) {
421                    Ok(req) => {
422                        log_debug!("{} Received request: {:?}", client, req);
423                        handler.handle_request(req, &mut client).await
424                    }
425                    Err(e) => {
426                        match &e {
427                            ValidationError::Parse(e) => {
428                                log_warn!("{client} failed to parse netlink message: {e:?}");
429                            }
430                            p @ ValidationError::Permission { .. } => {
431                                log_debug!("{client} permission check failed {p:?}")
432                            }
433                        }
434                        if let Some(rsp) = e.into_error_message() {
435                            client.send_unicast(rsp)
436                        }
437                    }
438                }
439                FoldState { client, handler, access_control }
440            },
441        )
442        .await;
443
444    client
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use fuchsia_async as fasync;
451    use futures::FutureExt as _;
452
453    use assert_matches::assert_matches;
454    use netlink_packet_core::{ErrorMessage, NetlinkPayload};
455    use std::num::NonZeroI32;
456    use std::pin::pin;
457
458    use crate::messaging::NetlinkMessageWithCreds;
459    use crate::messaging::testutil::{FakeCreds, SentMessage, TestNetlinkContext};
460    use crate::protocol_family::testutil::{
461        FakeNetlinkRequestHandler, FakeProtocolFamily, new_fake_netlink_message,
462        new_fake_netlink_message_with_creds,
463    };
464
465    #[fasync::run_singlethreaded(test)]
466    async fn test_run_client_request_handler() {
467        let (mut req_sender, req_receiver) = mpsc::channel(0);
468        let (mut client_sink, client, async_work_drain_task) =
469            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
470                crate::client::testutil::CLIENT_ID_1,
471                std::iter::empty(),
472            );
473        let join_handle = fasync::Task::spawn(async_work_drain_task);
474
475        {
476            let mut client_task = pin!(
477                run_client_request_handler::<TestNetlinkContext, FakeProtocolFamily>(
478                    client,
479                    req_receiver,
480                    FakeNetlinkRequestHandler,
481                    Default::default()
482                )
483                .fuse()
484            );
485
486            assert_matches!((&mut client_task).now_or_never(), None);
487            assert_eq!(&client_sink.take_messages()[..], &[]);
488
489            // Send a message and expect to see the response on the `client_sink`.
490            // NB: Use the sender's channel size as a synchronization method; If a
491            // second message could be sent, the first *must* have been handled.
492            req_sender
493                .try_send(new_fake_netlink_message_with_creds())
494                .expect("should send without error");
495            let mut could_send_fut =
496                pin!(futures::future::poll_fn(|ctx| req_sender.poll_ready(ctx)).fuse());
497            futures::select!(
498                res = could_send_fut => res.expect("should be able to send without error"),
499                _client = client_task => panic!("client task unexpectedly finished"),
500            );
501            assert_eq!(
502                &client_sink.take_messages()[..],
503                &[SentMessage::unicast(new_fake_netlink_message())]
504            );
505
506            // Close the sender, and expect the Task to exit.
507            req_sender.close_channel();
508            let _client = client_task.await;
509            assert_eq!(&client_sink.take_messages()[..], &[]);
510        }
511        join_handle.await;
512    }
513
514    #[fasync::run_singlethreaded(test)]
515    async fn test_connect_new_clients() {
516        let client_table = ClientTable::default();
517        let scope = fasync::Scope::new();
518        let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
519        let mut client_acceptor_fut = Box::pin(
520            connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
521                client_table.clone(),
522                client_receiver,
523                FakeNetlinkRequestHandler,
524                Default::default(),
525            )
526            .fuse(),
527        );
528
529        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
530
531        // Connect Client 1.
532        let (mut _client_sink1, client1, async_work_drain_task) =
533            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
534                crate::client::testutil::CLIENT_ID_1,
535                std::iter::empty(),
536            );
537        let _join_handle = scope.spawn(async_work_drain_task);
538        let (mut req_sender1, req_receiver1) = mpsc::channel(0);
539        client_sender
540            .unbounded_send(ClientWithReceiver { client: client1, receiver: req_receiver1 })
541            .expect("should send without error");
542
543        // Connect Client 2.
544        let (mut client_sink2, client2, async_work_drain_task) =
545            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
546                crate::client::testutil::CLIENT_ID_2,
547                std::iter::empty(),
548            );
549        let _join_handle = scope.spawn(async_work_drain_task);
550        let (mut req_sender2, req_receiver2) = mpsc::channel(0);
551        client_sender
552            .unbounded_send(ClientWithReceiver { client: client2, receiver: req_receiver2 })
553            .expect("should send without error");
554
555        // Send a request to Client 2, and verify it's handled despite Client 1
556        // being open (e.g. concurrent handling of requests across clients).
557        // NB: Use the sender's channel size as a synchronization method; If a
558        // second message could be sent, the first *must* have been handled.
559        req_sender2
560            .try_send(new_fake_netlink_message_with_creds())
561            .expect("should send without error");
562        let mut could_send_fut =
563            pin!(futures::future::poll_fn(|ctx| req_sender2.poll_ready(ctx)).fuse());
564        futures::select!(
565            res = could_send_fut => res.expect("should be able to send without error"),
566            () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
567        );
568        assert_eq!(
569            &client_table.client_ids()[..],
570            [client::testutil::CLIENT_ID_1, client::testutil::CLIENT_ID_2]
571        );
572        assert_eq!(
573            &client_sink2.take_messages()[..],
574            &[SentMessage::unicast(new_fake_netlink_message())]
575        );
576
577        // Close the two clients, and verify the acceptor fut is still pending.
578        req_sender1.close_channel();
579        req_sender2.close_channel();
580        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
581
582        // Close the client_sender, and verify the acceptor fut finishes.
583        client_sender.close_channel();
584        client_acceptor_fut.await;
585
586        // Confirm the clients have been cleaned up from the client table.
587        assert_eq!(&client_table.client_ids()[..], []);
588
589        drop(client_table);
590        scope.join().await;
591    }
592
593    #[fasync::run_singlethreaded(test)]
594    async fn test_permissions() {
595        let client_table = ClientTable::default();
596        let scope = fasync::Scope::new();
597        let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
598        let mut client_acceptor_fut = Box::pin(
599            connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
600                client_table.clone(),
601                client_receiver,
602                FakeNetlinkRequestHandler,
603                Default::default(),
604            )
605            .fuse(),
606        );
607        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
608
609        let (mut client_sink, client, async_work_drain_task) =
610            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
611                crate::client::testutil::CLIENT_ID_1,
612                std::iter::empty(),
613            );
614        let _join_handle = scope.spawn(async_work_drain_task);
615        let (mut req_sender, req_receiver) = mpsc::channel(0);
616        client_sender
617            .unbounded_send(ClientWithReceiver { client, receiver: req_receiver })
618            .expect("should send without error");
619
620        let message = NetlinkMessageWithCreds::new(
621            new_fake_netlink_message(),
622            FakeCreds::with_error(Errno::new(libc::EPERM).unwrap()),
623        );
624        req_sender.try_send(message).expect("should send without error");
625
626        let response = futures::select!(
627            res = client_sink.next_message().fuse() => res,
628            () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
629        );
630
631        assert_matches!(
632            response.message.payload,
633            NetlinkPayload::Error(ErrorMessage { code: Some(error_code), .. }) => {
634              assert_eq!(error_code , NonZeroI32::new(-libc::EPERM).unwrap());
635            }
636        );
637    }
638}