netlink/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! An implementation of Linux's Netlink API for Fuchsia.
6//!
7//! Netlink is a socket-based API provided by Linux that user space applications
8//! can use to interact with the kernel. The API is split up into several
9//! protocol families each offering different functionality. This crate targets
10//! the implementation of families related to networking.
11
12#![warn(missing_docs, unused)]
13
14mod client;
15pub(crate) mod eventloop;
16pub mod interfaces;
17pub(crate) mod logging;
18pub mod messaging;
19pub mod multicast_groups;
20mod nduseropt;
21mod netlink_packet;
22pub mod protocol_family;
23pub(crate) mod route_tables;
24pub mod routes;
25mod rules;
26pub(crate) mod util;
27
28use std::num::NonZeroU64;
29
30use fuchsia_component::client::connect_to_protocol;
31use futures::StreamExt as _;
32use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
33use futures::channel::oneshot;
34use net_types::ip::{Ipv4, Ipv6};
35use netlink_packet_route::RouteNetlinkMessage;
36use protocol_family::route::NetlinkRouteNotifiedGroup;
37use {
38    fidl_fuchsia_net_interfaces as fnet_interfaces, fidl_fuchsia_net_ndp as fnet_ndp,
39    fidl_fuchsia_net_root as fnet_root, fidl_fuchsia_net_routes as fnet_routes,
40    fidl_fuchsia_net_routes_admin as fnet_routes_admin,
41    fidl_fuchsia_net_routes_ext as fnet_routes_ext,
42};
43
44use crate::client::{AsyncWorkItem, ClientIdGenerator, ClientTable, InternalClient};
45use crate::eventloop::EventLoop;
46use crate::logging::{log_debug, log_warn};
47use crate::messaging::{NetlinkContext, UnvalidatedNetlinkMessage as _, ValidationError};
48pub use crate::netlink_packet::errno::Errno;
49use crate::protocol_family::route::{NetlinkRoute, NetlinkRouteClient, NetlinkRouteRequestHandler};
50use crate::protocol_family::{NetlinkFamilyRequestHandler as _, ProtocolFamily};
51
52/// The tag added to all logs generated by this crate.
53pub const NETLINK_LOG_TAG: &'static str = "netlink";
54
55/// Selects the interface for the sysctl.
56#[derive(Debug, Clone, Copy)]
57pub enum SysctlInterfaceSelector {
58    /// "all" interfaces.
59    ///
60    /// This is supposed to change all interfaces' settings, but this is a
61    /// lie for most of the sysctls, they have no effect at all when written.
62    All,
63    /// "default" interface, all interface created after this write will inherit the value.
64    Default,
65    /// The id of the interface to change.
66    Id(NonZeroU64),
67}
68
69/// The implementation of the Netlink protocol suite.
70pub struct Netlink<C: NetlinkContext> {
71    /// Generator of new Client IDs.
72    id_generator: ClientIdGenerator,
73    /// Sender to attach new `NETLINK_ROUTE` clients to the Netlink worker.
74    route_client_sender: UnboundedSender<ClientWithReceiver<C, NetlinkRoute>>,
75    /// Sender to send other async work items to the Netlink worker.
76    async_work_sink: mpsc::UnboundedSender<AsyncWorkItem<NetlinkRouteNotifiedGroup>>,
77}
78
79impl<C: NetlinkContext> Netlink<C> {
80    /// Returns a newly instantiated [`Netlink`] and parameters used to start the
81    /// asynchronous worker.
82    ///
83    /// Caller is expected to run the worker by calling `run_netlink_worker()`.
84    pub fn new<H: interfaces::InterfacesHandler>(
85        interfaces_handler: H,
86    ) -> (Self, NetlinkWorkerParams<H, C>) {
87        let (route_client_sender, route_client_receiver) = mpsc::unbounded();
88        let (async_work_sink, async_work_receiver) = mpsc::unbounded();
89        (
90            Netlink {
91                id_generator: ClientIdGenerator::default(),
92                route_client_sender,
93                async_work_sink,
94            },
95            NetlinkWorkerParams { interfaces_handler, route_client_receiver, async_work_receiver },
96        )
97    }
98
99    /// Writes the accept_ra_rt_table sysctl for the selected interface.
100    pub fn write_accept_ra_rt_table(
101        &self,
102        interface: SysctlInterfaceSelector,
103        value: i32,
104    ) -> Result<(), SysctlError> {
105        let (responder, receiver) = oneshot_sync::channel();
106        self.async_work_sink
107            .unbounded_send(AsyncWorkItem::SetAcceptRaRtTable {
108                interface,
109                value: value.into(),
110                responder,
111            })
112            .map_err(|_| SysctlError::Disconnected)?;
113        receiver.receive().map_err(|_| SysctlError::Disconnected)?
114    }
115
116    /// Reads the accept_ra_rt_table sysctl for the selected interface.
117    pub fn read_accept_ra_rt_table(
118        &self,
119        interface: SysctlInterfaceSelector,
120    ) -> Result<i32, SysctlError> {
121        let (responder, receiver) = oneshot_sync::channel();
122        self.async_work_sink
123            .unbounded_send(AsyncWorkItem::GetAcceptRaRtTable { interface, responder })
124            .map_err(|_| SysctlError::Disconnected)?;
125        Ok(receiver.receive().map_err(|_| SysctlError::Disconnected)??.into())
126    }
127
128    /// Creates a new client of the `NETLINK_ROUTE` protocol family.
129    ///
130    /// `sender` is used by Netlink to send messages to the client.
131    /// `receiver` is used by Netlink to receive messages from the client.
132    ///
133    /// Closing the `receiver` will close this client, disconnecting `sender`.
134    pub fn new_route_client(
135        &self,
136        sender: C::Sender<RouteNetlinkMessage>,
137        receiver: C::Receiver<RouteNetlinkMessage>,
138    ) -> Result<NetlinkRouteClient, NewClientError> {
139        let Netlink { id_generator, route_client_sender, async_work_sink } = self;
140        let (external_client, internal_client) = client::new_client_pair::<NetlinkRoute, _>(
141            id_generator.new_id(),
142            sender,
143            async_work_sink.clone(),
144        );
145        route_client_sender
146            .unbounded_send(ClientWithReceiver { client: internal_client, receiver })
147            .map_err(|e| {
148                // Sending on an `UnboundedSender` can never fail with `is_full()`.
149                debug_assert!(e.is_disconnected());
150                NewClientError::Disconnected
151            })?;
152        Ok(NetlinkRouteClient(external_client))
153    }
154}
155
156/// A wrapper to hold an [`InternalClient`], and its [`Receiver`] of requests.
157struct ClientWithReceiver<C: NetlinkContext, F: ProtocolFamily> {
158    client: InternalClient<F, C::Sender<F::InnerMessage>>,
159    receiver: C::Receiver<F::InnerMessage>,
160}
161
162/// The possible error types when instantiating a new client.
163#[derive(Debug)]
164pub enum NewClientError {
165    /// The [`Netlink`] is disconnected from its associated worker, perhaps as a
166    /// result of dropping the worker.
167    Disconnected,
168}
169
170/// The possible error types when trying to access a sysctl.
171#[derive(Debug)]
172pub enum SysctlError {
173    /// The [`Netlink`] is disconnected from its associated worker.
174    Disconnected,
175    /// The interface went away.
176    NoInterface,
177    /// The written value requests for an unsupported operation.
178    Unsupported,
179}
180
181/// Parameters used to start the Netlink asynchronous worker.
182pub struct NetlinkWorkerParams<H, C: NetlinkContext> {
183    interfaces_handler: H,
184    /// Receiver of newly created `NETLINK_ROUTE` clients.
185    route_client_receiver: UnboundedReceiver<ClientWithReceiver<C, NetlinkRoute>>,
186    async_work_receiver:
187        futures::channel::mpsc::UnboundedReceiver<AsyncWorkItem<NetlinkRouteNotifiedGroup>>,
188}
189
190/// All of the protocols that the netlink worker connects to.
191#[allow(missing_docs)]
192pub struct NetlinkWorkerDiscoverableProtocols {
193    pub root_interfaces: fnet_root::InterfacesProxy,
194    pub interfaces_state: fnet_interfaces::StateProxy,
195    pub v4_routes_state: fnet_routes::StateV4Proxy,
196    pub v6_routes_state: fnet_routes::StateV6Proxy,
197    pub v4_main_route_table: fnet_routes_admin::RouteTableV4Proxy,
198    pub v6_main_route_table: fnet_routes_admin::RouteTableV6Proxy,
199    pub v4_route_table_provider: fnet_routes_admin::RouteTableProviderV4Proxy,
200    pub v6_route_table_provider: fnet_routes_admin::RouteTableProviderV6Proxy,
201    pub v4_rule_table: fnet_routes_admin::RuleTableV4Proxy,
202    pub v6_rule_table: fnet_routes_admin::RuleTableV6Proxy,
203    pub ndp_option_watcher_provider: fnet_ndp::RouterAdvertisementOptionWatcherProviderProxy,
204}
205
206impl NetlinkWorkerDiscoverableProtocols {
207    fn from_environment() -> Self {
208        let root_interfaces = connect_to_protocol::<fnet_root::InterfacesMarker>()
209            .expect("connect to fuchsia.net.root.Interfaces");
210        let interfaces_state = connect_to_protocol::<fnet_interfaces::StateMarker>()
211            .expect("connect to fuchsia.net.interfaces.State");
212        let v4_routes_state =
213            connect_to_protocol::<<Ipv4 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
214                .expect("connect to fuchsia.net.routes.StateV4");
215        let v6_routes_state =
216            connect_to_protocol::<<Ipv6 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
217                .expect("connect to fuchsia.net.routes.StateV6");
218        let v4_main_route_table = connect_to_protocol::<
219            <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
220        >()
221        .expect("connect to fuchsia.net.routes.admin.RouteTableV4");
222        let v6_main_route_table = connect_to_protocol::<
223            <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
224        >()
225        .expect("connect to fuchsia.net.routes.admin.RouteTableV6");
226        let v4_route_table_provider = connect_to_protocol::<
227            <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
228        >()
229        .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV4");
230        let v6_route_table_provider = connect_to_protocol::<
231            <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
232        >()
233        .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV6");
234        let v4_rule_table = connect_to_protocol::<
235            <Ipv4 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
236        >()
237        .expect("connect to fuchsia.net.routes.admin.RuleTableV4");
238        let v6_rule_table = connect_to_protocol::<
239            <Ipv6 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
240        >()
241        .expect("connect to fuchsia.net.routes.admin.RuleTableV6");
242        let ndp_option_watcher_provider =
243            connect_to_protocol::<fnet_ndp::RouterAdvertisementOptionWatcherProviderMarker>()
244                .expect("connect to fuchsia.net.ndp.RouterAdvertisementOptionWatcherProvider");
245        Self {
246            root_interfaces,
247            interfaces_state,
248            v4_routes_state,
249            v6_routes_state,
250            v4_main_route_table,
251            v6_main_route_table,
252            v4_route_table_provider,
253            v6_route_table_provider,
254            v4_rule_table,
255            v6_rule_table,
256            ndp_option_watcher_provider,
257        }
258    }
259}
260
261/// The worker encompassing all asynchronous Netlink work.
262///
263/// The worker is never expected to complete.
264///
265/// `protocols` is taken as a closure because we need to avoid creating asynchronous FIDL proxies
266/// until an executor is running, so it's helpful to defer creation until the event loop starts
267/// running.
268///
269/// # Panics
270///
271/// Panics if a non-recoverable error is encountered by the worker. For example,
272/// a FIDL error on one of the FIDL connections with the netstack.
273pub async fn run_netlink_worker<H: interfaces::InterfacesHandler, C: NetlinkContext>(
274    params: NetlinkWorkerParams<H, C>,
275    access_control: C::AccessControl<'_>,
276) {
277    run_netlink_worker_with_protocols(
278        params,
279        NetlinkWorkerDiscoverableProtocols::from_environment(),
280        None,
281        access_control,
282    )
283    .await;
284}
285
286/// Same as `run_netlink_worker()`, but allows to pass custom
287/// `NetlinkWorkerDiscoverableProtocols`.
288pub async fn run_netlink_worker_with_protocols<
289    H: interfaces::InterfacesHandler,
290    C: NetlinkContext,
291>(
292    params: NetlinkWorkerParams<H, C>,
293    protocols: NetlinkWorkerDiscoverableProtocols,
294    on_initialized: Option<oneshot::Sender<()>>,
295    access_control: C::AccessControl<'_>,
296) {
297    let NetlinkWorkerParams { interfaces_handler, route_client_receiver, async_work_receiver } =
298        params;
299
300    let route_clients = ClientTable::default();
301    let (unified_request_sink, unified_request_stream) = mpsc::channel(1);
302
303    let unified_event_loop = {
304        let route_clients = route_clients.clone();
305        async move {
306            let NetlinkWorkerDiscoverableProtocols {
307                root_interfaces,
308                interfaces_state,
309                v4_routes_state,
310                v6_routes_state,
311                v4_main_route_table,
312                v6_main_route_table,
313                v4_route_table_provider,
314                v6_route_table_provider,
315                v4_rule_table,
316                v6_rule_table,
317                ndp_option_watcher_provider,
318            } = protocols;
319            let event_loop: EventLoop<H, C::Sender<_>> = EventLoop {
320                interfaces_proxy: root_interfaces,
321                interfaces_state_proxy: interfaces_state,
322                v4_routes_state,
323                v6_routes_state,
324                v4_main_route_table,
325                v6_main_route_table,
326                v4_route_table_provider,
327                v6_route_table_provider,
328                v4_rule_table,
329                v6_rule_table,
330                ndp_option_watcher_provider,
331                route_clients,
332                unified_request_stream,
333                interfaces_handler,
334                async_work_receiver,
335            };
336
337            event_loop.run(on_initialized).await;
338        }
339    };
340
341    let route_client_receiver_loop = async move {
342        // Accept new NETLINK_ROUTE clients.
343        connect_new_clients::<C, NetlinkRoute>(
344            route_clients,
345            route_client_receiver,
346            NetlinkRouteRequestHandler { unified_request_sink },
347            access_control,
348        )
349        .await;
350        panic!("route_client_receiver stream unexpectedly finished");
351    };
352
353    futures::future::join(unified_event_loop, route_client_receiver_loop).await;
354}
355
356/// Receives clients from the given receiver, adding them to the given table.
357///
358/// A "Request Handler" Task will be spawned for each received client. The given
359/// `request_handler_impl` defines how the requests will be handled.
360async fn connect_new_clients<C: NetlinkContext, F: ProtocolFamily>(
361    client_table: ClientTable<F, C::Sender<F::InnerMessage>>,
362    client_receiver: UnboundedReceiver<ClientWithReceiver<C, F>>,
363    request_handler_impl: F::RequestHandler<C::Sender<F::InnerMessage>>,
364    access_control: C::AccessControl<'_>,
365) {
366    client_receiver
367        // Drive each client concurrently with `for_each_concurrent`.
368        .for_each_concurrent(None, async |ClientWithReceiver { client, receiver }| {
369            client_table.add_client(client.clone());
370            let client = run_client_request_handler::<C, F>(
371                client,
372                receiver,
373                request_handler_impl.clone(),
374                access_control.clone(),
375            )
376            .await;
377            client_table.remove_client(client);
378        })
379        .await;
380}
381
382/// Reads messages from the `receiver` and handles them using the `handler`.
383///
384/// The task terminates when the underlying `Receiver` closes, yielding the
385/// original client.
386async fn run_client_request_handler<C: NetlinkContext, F: ProtocolFamily>(
387    client: InternalClient<F, C::Sender<F::InnerMessage>>,
388    receiver: C::Receiver<F::InnerMessage>,
389    handler: F::RequestHandler<C::Sender<F::InnerMessage>>,
390    access_control: C::AccessControl<'_>,
391) -> InternalClient<F, C::Sender<F::InnerMessage>> {
392    // State needed to handle an individual request, that is cycled through the
393    // `fold` combinator below.
394    struct FoldState<C, H, P> {
395        client: C,
396        handler: H,
397        access_control: P,
398    }
399
400    // Use `fold` for two reasons. First, it processes requests serially,
401    // ensuring requests are handled in order. Second, it allows us to
402    // "hand-off" the client/handler from one request to the other, avoiding
403    // copies for each request.
404    let FoldState { client, handler: _, access_control: _ } = receiver
405        .fold(
406            FoldState { client, handler, access_control },
407            |FoldState { mut client, mut handler, access_control }, req| async {
408                match req.validate_creds_and_get_message(&access_control) {
409                    Ok(req) => {
410                        log_debug!("{} Received request: {:?}", client, req);
411                        handler.handle_request(req, &mut client).await
412                    }
413                    Err(e) => {
414                        match &e {
415                            ValidationError::Parse(e) => {
416                                log_warn!("{client} failed to parse netlink message: {e:?}");
417                            }
418                            p @ ValidationError::Permission { .. } => {
419                                log_debug!("{client} permission check failed {p:?}")
420                            }
421                        }
422                        if let Some(rsp) = e.into_error_message() {
423                            client.send_unicast(rsp)
424                        }
425                    }
426                }
427                FoldState { client, handler, access_control }
428            },
429        )
430        .await;
431
432    client
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    use fuchsia_async as fasync;
439    use futures::FutureExt as _;
440
441    use assert_matches::assert_matches;
442    use netlink_packet_core::{ErrorMessage, NetlinkPayload};
443    use std::num::NonZeroI32;
444    use std::pin::pin;
445
446    use crate::messaging::NetlinkMessageWithCreds;
447    use crate::messaging::testutil::{FakeCreds, SentMessage, TestNetlinkContext};
448    use crate::protocol_family::testutil::{
449        FakeNetlinkRequestHandler, FakeProtocolFamily, new_fake_netlink_message,
450        new_fake_netlink_message_with_creds,
451    };
452
453    #[fasync::run_singlethreaded(test)]
454    async fn test_run_client_request_handler() {
455        let (mut req_sender, req_receiver) = mpsc::channel(0);
456        let (mut client_sink, client, async_work_drain_task) =
457            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
458                crate::client::testutil::CLIENT_ID_1,
459                std::iter::empty(),
460            );
461        let join_handle = fasync::Task::spawn(async_work_drain_task);
462
463        {
464            let mut client_task = pin!(
465                run_client_request_handler::<TestNetlinkContext, FakeProtocolFamily>(
466                    client,
467                    req_receiver,
468                    FakeNetlinkRequestHandler,
469                    Default::default()
470                )
471                .fuse()
472            );
473
474            assert_matches!((&mut client_task).now_or_never(), None);
475            assert_eq!(&client_sink.take_messages()[..], &[]);
476
477            // Send a message and expect to see the response on the `client_sink`.
478            // NB: Use the sender's channel size as a synchronization method; If a
479            // second message could be sent, the first *must* have been handled.
480            req_sender
481                .try_send(new_fake_netlink_message_with_creds())
482                .expect("should send without error");
483            let mut could_send_fut =
484                pin!(futures::future::poll_fn(|ctx| req_sender.poll_ready(ctx)).fuse());
485            futures::select!(
486                res = could_send_fut => res.expect("should be able to send without error"),
487                _client = client_task => panic!("client task unexpectedly finished"),
488            );
489            assert_eq!(
490                &client_sink.take_messages()[..],
491                &[SentMessage::unicast(new_fake_netlink_message())]
492            );
493
494            // Close the sender, and expect the Task to exit.
495            req_sender.close_channel();
496            let _client = client_task.await;
497            assert_eq!(&client_sink.take_messages()[..], &[]);
498        }
499        join_handle.await;
500    }
501
502    #[fasync::run_singlethreaded(test)]
503    async fn test_connect_new_clients() {
504        let client_table = ClientTable::default();
505        let scope = fasync::Scope::new();
506        let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
507        let mut client_acceptor_fut = Box::pin(
508            connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
509                client_table.clone(),
510                client_receiver,
511                FakeNetlinkRequestHandler,
512                Default::default(),
513            )
514            .fuse(),
515        );
516
517        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
518
519        // Connect Client 1.
520        let (mut _client_sink1, client1, async_work_drain_task) =
521            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
522                crate::client::testutil::CLIENT_ID_1,
523                std::iter::empty(),
524            );
525        let _join_handle = scope.spawn(async_work_drain_task);
526        let (mut req_sender1, req_receiver1) = mpsc::channel(0);
527        client_sender
528            .unbounded_send(ClientWithReceiver { client: client1, receiver: req_receiver1 })
529            .expect("should send without error");
530
531        // Connect Client 2.
532        let (mut client_sink2, client2, async_work_drain_task) =
533            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
534                crate::client::testutil::CLIENT_ID_2,
535                std::iter::empty(),
536            );
537        let _join_handle = scope.spawn(async_work_drain_task);
538        let (mut req_sender2, req_receiver2) = mpsc::channel(0);
539        client_sender
540            .unbounded_send(ClientWithReceiver { client: client2, receiver: req_receiver2 })
541            .expect("should send without error");
542
543        // Send a request to Client 2, and verify it's handled despite Client 1
544        // being open (e.g. concurrent handling of requests across clients).
545        // NB: Use the sender's channel size as a synchronization method; If a
546        // second message could be sent, the first *must* have been handled.
547        req_sender2
548            .try_send(new_fake_netlink_message_with_creds())
549            .expect("should send without error");
550        let mut could_send_fut =
551            pin!(futures::future::poll_fn(|ctx| req_sender2.poll_ready(ctx)).fuse());
552        futures::select!(
553            res = could_send_fut => res.expect("should be able to send without error"),
554            () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
555        );
556        assert_eq!(
557            &client_table.client_ids()[..],
558            [client::testutil::CLIENT_ID_1, client::testutil::CLIENT_ID_2]
559        );
560        assert_eq!(
561            &client_sink2.take_messages()[..],
562            &[SentMessage::unicast(new_fake_netlink_message())]
563        );
564
565        // Close the two clients, and verify the acceptor fut is still pending.
566        req_sender1.close_channel();
567        req_sender2.close_channel();
568        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
569
570        // Close the client_sender, and verify the acceptor fut finishes.
571        client_sender.close_channel();
572        client_acceptor_fut.await;
573
574        // Confirm the clients have been cleaned up from the client table.
575        assert_eq!(&client_table.client_ids()[..], []);
576
577        drop(client_table);
578        scope.join().await;
579    }
580
581    #[fasync::run_singlethreaded(test)]
582    async fn test_permissions() {
583        let client_table = ClientTable::default();
584        let scope = fasync::Scope::new();
585        let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
586        let mut client_acceptor_fut = Box::pin(
587            connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
588                client_table.clone(),
589                client_receiver,
590                FakeNetlinkRequestHandler,
591                Default::default(),
592            )
593            .fuse(),
594        );
595        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
596
597        let (mut client_sink, client, async_work_drain_task) =
598            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
599                crate::client::testutil::CLIENT_ID_1,
600                std::iter::empty(),
601            );
602        let _join_handle = scope.spawn(async_work_drain_task);
603        let (mut req_sender, req_receiver) = mpsc::channel(0);
604        client_sender
605            .unbounded_send(ClientWithReceiver { client, receiver: req_receiver })
606            .expect("should send without error");
607
608        let message = NetlinkMessageWithCreds::new(
609            new_fake_netlink_message(),
610            FakeCreds::with_error(Errno::new(libc::EPERM).unwrap()),
611        );
612        req_sender.try_send(message).expect("should send without error");
613
614        let response = futures::select!(
615            res = client_sink.next_message().fuse() => res,
616            () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
617        );
618
619        assert_matches!(
620            response.message.payload,
621            NetlinkPayload::Error(ErrorMessage { code: Some(error_code), .. }) => {
622              assert_eq!(error_code , NonZeroI32::new(-libc::EPERM).unwrap());
623            }
624        );
625    }
626}