netlink/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! An implementation of Linux's Netlink API for Fuchsia.
6//!
7//! Netlink is a socket-based API provided by Linux that user space applications
8//! can use to interact with the kernel. The API is split up into several
9//! protocol families each offering different functionality. This crate targets
10//! the implementation of families related to networking.
11
12#![warn(missing_docs, unused)]
13
14mod client;
15pub mod interfaces;
16pub(crate) mod logging;
17pub mod messaging;
18pub mod multicast_groups;
19mod nduseropt;
20mod netlink_packet;
21pub mod protocol_family;
22pub(crate) mod route_eventloop;
23pub(crate) mod route_tables;
24pub mod routes;
25mod rules;
26pub(crate) mod util;
27
28use std::num::NonZeroU64;
29
30use fuchsia_component::client::connect_to_protocol;
31use futures::StreamExt as _;
32use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
33use futures::channel::oneshot;
34use net_types::ip::{Ipv4, Ipv6};
35use netlink_packet_route::RouteNetlinkMessage;
36use protocol_family::route::NetlinkRouteNotifiedGroup;
37use {
38    fidl_fuchsia_net_interfaces as fnet_interfaces, fidl_fuchsia_net_ndp as fnet_ndp,
39    fidl_fuchsia_net_root as fnet_root, fidl_fuchsia_net_routes as fnet_routes,
40    fidl_fuchsia_net_routes_admin as fnet_routes_admin,
41    fidl_fuchsia_net_routes_ext as fnet_routes_ext,
42};
43
44use crate::client::{AsyncWorkItem, ClientIdGenerator, ClientTable, InternalClient};
45use crate::logging::{log_debug, log_warn};
46use crate::messaging::{NetlinkContext, UnvalidatedNetlinkMessage as _, ValidationError};
47pub use crate::netlink_packet::errno::Errno;
48use crate::protocol_family::route::{NetlinkRoute, NetlinkRouteClient, NetlinkRouteRequestHandler};
49use crate::protocol_family::{NetlinkFamilyRequestHandler as _, ProtocolFamily};
50use crate::route_eventloop::RouteEventLoop;
51
52/// The tag added to all logs generated by this crate.
53pub const NETLINK_LOG_TAG: &'static str = "netlink";
54
55/// Selects the interface for the sysctl.
56#[derive(Debug, Clone, Copy)]
57pub enum SysctlInterfaceSelector {
58    /// "all" interfaces.
59    ///
60    /// This is supposed to change all interfaces' settings, but this is a
61    /// lie for most of the sysctls, they have no effect at all when written.
62    All,
63    /// "default" interface, all interface created after this write will inherit the value.
64    Default,
65    /// The id of the interface to change.
66    Id(NonZeroU64),
67}
68
69/// The implementation of the Netlink protocol suite.
70pub struct Netlink<C: NetlinkContext> {
71    /// Generator of new Client IDs.
72    id_generator: ClientIdGenerator,
73    /// Sender to attach new `NETLINK_ROUTE` clients to the Netlink worker.
74    route_client_sender: UnboundedSender<ClientWithReceiver<C, NetlinkRoute>>,
75    /// Sender to send other async work items to the Netlink worker.
76    route_async_work_sink: mpsc::UnboundedSender<AsyncWorkItem<NetlinkRouteNotifiedGroup>>,
77}
78
79impl<C: NetlinkContext> Netlink<C> {
80    /// Returns a newly instantiated [`Netlink`] and parameters used to start the
81    /// asynchronous worker.
82    ///
83    /// Caller is expected to run the worker by calling `run_netlink_worker()`.
84    pub fn new<H: interfaces::InterfacesHandler>(
85        interfaces_handler: H,
86    ) -> (Self, NetlinkWorkerParams<H, C>) {
87        let (route_client_sender, route_client_receiver) = mpsc::unbounded();
88        let (route_async_work_sink, async_work_receiver) = mpsc::unbounded();
89        (
90            Netlink {
91                id_generator: ClientIdGenerator::default(),
92                route_client_sender,
93                route_async_work_sink,
94            },
95            NetlinkWorkerParams {
96                interfaces_handler,
97                route_client_receiver,
98                route_async_work_receiver: async_work_receiver,
99            },
100        )
101    }
102
103    /// Writes the accept_ra_rt_table sysctl for the selected interface.
104    pub fn write_accept_ra_rt_table(
105        &self,
106        interface: SysctlInterfaceSelector,
107        value: i32,
108    ) -> Result<(), SysctlError> {
109        let (responder, receiver) = oneshot_sync::channel();
110        self.route_async_work_sink
111            .unbounded_send(AsyncWorkItem::SetAcceptRaRtTable {
112                interface,
113                value: value.into(),
114                responder,
115            })
116            .map_err(|_| SysctlError::Disconnected)?;
117        receiver.receive().map_err(|_| SysctlError::Disconnected)?
118    }
119
120    /// Reads the accept_ra_rt_table sysctl for the selected interface.
121    pub fn read_accept_ra_rt_table(
122        &self,
123        interface: SysctlInterfaceSelector,
124    ) -> Result<i32, SysctlError> {
125        let (responder, receiver) = oneshot_sync::channel();
126        self.route_async_work_sink
127            .unbounded_send(AsyncWorkItem::GetAcceptRaRtTable { interface, responder })
128            .map_err(|_| SysctlError::Disconnected)?;
129        Ok(receiver.receive().map_err(|_| SysctlError::Disconnected)??.into())
130    }
131
132    /// Creates a new client of the `NETLINK_ROUTE` protocol family.
133    ///
134    /// `sender` is used by Netlink to send messages to the client.
135    /// `receiver` is used by Netlink to receive messages from the client.
136    ///
137    /// Closing the `receiver` will close this client, disconnecting `sender`.
138    pub fn new_route_client(
139        &self,
140        sender: C::Sender<RouteNetlinkMessage>,
141        receiver: C::Receiver<RouteNetlinkMessage>,
142    ) -> Result<NetlinkRouteClient, NewClientError> {
143        let Netlink { id_generator, route_client_sender, route_async_work_sink } = self;
144        let (external_client, internal_client) = client::new_client_pair::<NetlinkRoute, _>(
145            id_generator.new_id(),
146            sender,
147            route_async_work_sink.clone(),
148        );
149        route_client_sender
150            .unbounded_send(ClientWithReceiver { client: internal_client, receiver })
151            .map_err(|e| {
152                // Sending on an `UnboundedSender` can never fail with `is_full()`.
153                debug_assert!(e.is_disconnected());
154                NewClientError::Disconnected
155            })?;
156        Ok(NetlinkRouteClient(external_client))
157    }
158}
159
160/// A wrapper to hold an [`InternalClient`], and its [`Receiver`] of requests.
161struct ClientWithReceiver<C: NetlinkContext, F: ProtocolFamily> {
162    client: InternalClient<F, C::Sender<F::Response>>,
163    receiver: C::Receiver<F::Request>,
164}
165
166/// The possible error types when instantiating a new client.
167#[derive(Debug)]
168pub enum NewClientError {
169    /// The [`Netlink`] is disconnected from its associated worker, perhaps as a
170    /// result of dropping the worker.
171    Disconnected,
172}
173
174/// The possible error types when trying to access a sysctl.
175#[derive(Debug)]
176pub enum SysctlError {
177    /// The [`Netlink`] is disconnected from its associated worker.
178    Disconnected,
179    /// The interface went away.
180    NoInterface,
181    /// The written value requests for an unsupported operation.
182    Unsupported,
183}
184
185/// Parameters used to start the Netlink asynchronous worker.
186pub struct NetlinkWorkerParams<H, C: NetlinkContext> {
187    interfaces_handler: H,
188    /// Receiver of newly created `NETLINK_ROUTE` clients.
189    route_client_receiver: UnboundedReceiver<ClientWithReceiver<C, NetlinkRoute>>,
190    route_async_work_receiver:
191        futures::channel::mpsc::UnboundedReceiver<AsyncWorkItem<NetlinkRouteNotifiedGroup>>,
192}
193
194/// All of the protocols that the netlink worker connects to.
195#[allow(missing_docs)]
196pub struct NetlinkWorkerDiscoverableProtocols {
197    pub root_interfaces: fnet_root::InterfacesProxy,
198    pub interfaces_state: fnet_interfaces::StateProxy,
199    pub v4_routes_state: fnet_routes::StateV4Proxy,
200    pub v6_routes_state: fnet_routes::StateV6Proxy,
201    pub v4_main_route_table: fnet_routes_admin::RouteTableV4Proxy,
202    pub v6_main_route_table: fnet_routes_admin::RouteTableV6Proxy,
203    pub v4_route_table_provider: fnet_routes_admin::RouteTableProviderV4Proxy,
204    pub v6_route_table_provider: fnet_routes_admin::RouteTableProviderV6Proxy,
205    pub v4_rule_table: fnet_routes_admin::RuleTableV4Proxy,
206    pub v6_rule_table: fnet_routes_admin::RuleTableV6Proxy,
207    pub ndp_option_watcher_provider: fnet_ndp::RouterAdvertisementOptionWatcherProviderProxy,
208}
209
210impl NetlinkWorkerDiscoverableProtocols {
211    fn from_environment() -> Self {
212        let root_interfaces = connect_to_protocol::<fnet_root::InterfacesMarker>()
213            .expect("connect to fuchsia.net.root.Interfaces");
214        let interfaces_state = connect_to_protocol::<fnet_interfaces::StateMarker>()
215            .expect("connect to fuchsia.net.interfaces.State");
216        let v4_routes_state =
217            connect_to_protocol::<<Ipv4 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
218                .expect("connect to fuchsia.net.routes.StateV4");
219        let v6_routes_state =
220            connect_to_protocol::<<Ipv6 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
221                .expect("connect to fuchsia.net.routes.StateV6");
222        let v4_main_route_table = connect_to_protocol::<
223            <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
224        >()
225        .expect("connect to fuchsia.net.routes.admin.RouteTableV4");
226        let v6_main_route_table = connect_to_protocol::<
227            <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
228        >()
229        .expect("connect to fuchsia.net.routes.admin.RouteTableV6");
230        let v4_route_table_provider = connect_to_protocol::<
231            <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
232        >()
233        .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV4");
234        let v6_route_table_provider = connect_to_protocol::<
235            <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
236        >()
237        .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV6");
238        let v4_rule_table = connect_to_protocol::<
239            <Ipv4 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
240        >()
241        .expect("connect to fuchsia.net.routes.admin.RuleTableV4");
242        let v6_rule_table = connect_to_protocol::<
243            <Ipv6 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
244        >()
245        .expect("connect to fuchsia.net.routes.admin.RuleTableV6");
246        let ndp_option_watcher_provider =
247            connect_to_protocol::<fnet_ndp::RouterAdvertisementOptionWatcherProviderMarker>()
248                .expect("connect to fuchsia.net.ndp.RouterAdvertisementOptionWatcherProvider");
249        Self {
250            root_interfaces,
251            interfaces_state,
252            v4_routes_state,
253            v6_routes_state,
254            v4_main_route_table,
255            v6_main_route_table,
256            v4_route_table_provider,
257            v6_route_table_provider,
258            v4_rule_table,
259            v6_rule_table,
260            ndp_option_watcher_provider,
261        }
262    }
263}
264
265/// The worker encompassing all asynchronous Netlink work.
266///
267/// The worker is never expected to complete.
268///
269/// `protocols` is taken as a closure because we need to avoid creating asynchronous FIDL proxies
270/// until an executor is running, so it's helpful to defer creation until the event loop starts
271/// running.
272///
273/// # Panics
274///
275/// Panics if a non-recoverable error is encountered by the worker. For example,
276/// a FIDL error on one of the FIDL connections with the netstack.
277pub async fn run_netlink_worker<H: interfaces::InterfacesHandler, C: NetlinkContext>(
278    params: NetlinkWorkerParams<H, C>,
279    access_control: C::AccessControl<'_>,
280) {
281    run_netlink_worker_with_protocols(
282        params,
283        NetlinkWorkerDiscoverableProtocols::from_environment(),
284        None,
285        access_control,
286    )
287    .await;
288}
289
290/// Same as `run_netlink_worker()`, but allows to pass custom
291/// `NetlinkWorkerDiscoverableProtocols`.
292pub async fn run_netlink_worker_with_protocols<
293    H: interfaces::InterfacesHandler,
294    C: NetlinkContext,
295>(
296    params: NetlinkWorkerParams<H, C>,
297    protocols: NetlinkWorkerDiscoverableProtocols,
298    on_route_initialized: Option<oneshot::Sender<()>>,
299    access_control: C::AccessControl<'_>,
300) {
301    let NetlinkWorkerParams {
302        interfaces_handler,
303        route_client_receiver,
304        route_async_work_receiver,
305    } = params;
306
307    let route_clients = ClientTable::default();
308    let (route_request_sink, route_request_stream) = mpsc::channel(1);
309
310    let route_event_loop = {
311        let route_clients = route_clients.clone();
312        async move {
313            let NetlinkWorkerDiscoverableProtocols {
314                root_interfaces,
315                interfaces_state,
316                v4_routes_state,
317                v6_routes_state,
318                v4_main_route_table,
319                v6_main_route_table,
320                v4_route_table_provider,
321                v6_route_table_provider,
322                v4_rule_table,
323                v6_rule_table,
324                ndp_option_watcher_provider,
325            } = protocols;
326            let event_loop: RouteEventLoop<H, C::Sender<_>> = RouteEventLoop {
327                interfaces_proxy: root_interfaces,
328                interfaces_state_proxy: interfaces_state,
329                v4_routes_state,
330                v6_routes_state,
331                v4_main_route_table,
332                v6_main_route_table,
333                v4_route_table_provider,
334                v6_route_table_provider,
335                v4_rule_table,
336                v6_rule_table,
337                ndp_option_watcher_provider,
338                route_clients,
339                request_stream: route_request_stream,
340                interfaces_handler,
341                async_work_receiver: route_async_work_receiver,
342            };
343
344            event_loop.run(on_route_initialized).await;
345        }
346    };
347
348    let route_client_receiver_loop = {
349        async move {
350            // Accept new NETLINK_ROUTE clients.
351            connect_new_clients::<C, NetlinkRoute>(
352                route_clients,
353                route_client_receiver,
354                NetlinkRouteRequestHandler { unified_request_sink: route_request_sink },
355                access_control,
356            )
357            .await;
358            panic!("route_client_receiver stream unexpectedly finished");
359        }
360    };
361
362    futures::future::join(route_event_loop, route_client_receiver_loop).await;
363}
364
365/// Receives clients from the given receiver, adding them to the given table.
366///
367/// A "Request Handler" Task will be spawned for each received client. The given
368/// `request_handler_impl` defines how the requests will be handled.
369async fn connect_new_clients<C: NetlinkContext, F: ProtocolFamily>(
370    client_table: ClientTable<F, C::Sender<F::Response>>,
371    client_receiver: UnboundedReceiver<ClientWithReceiver<C, F>>,
372    request_handler_impl: F::RequestHandler<C::Sender<F::Response>>,
373    access_control: C::AccessControl<'_>,
374) {
375    client_receiver
376        // Drive each client concurrently with `for_each_concurrent`.
377        .for_each_concurrent(None, async |ClientWithReceiver { client, receiver }| {
378            client_table.add_client(client.clone());
379            let client = run_client_request_handler::<C, F>(
380                client,
381                receiver,
382                request_handler_impl.clone(),
383                access_control.clone(),
384            )
385            .await;
386            client_table.remove_client(client);
387        })
388        .await;
389}
390
391/// Reads messages from the `receiver` and handles them using the `handler`.
392///
393/// The task terminates when the underlying `Receiver` closes, yielding the
394/// original client.
395async fn run_client_request_handler<C: NetlinkContext, F: ProtocolFamily>(
396    client: InternalClient<F, C::Sender<F::Response>>,
397    receiver: C::Receiver<F::Request>,
398    handler: F::RequestHandler<C::Sender<F::Response>>,
399    access_control: C::AccessControl<'_>,
400) -> InternalClient<F, C::Sender<F::Response>> {
401    // State needed to handle an individual request, that is cycled through the
402    // `fold` combinator below.
403    struct FoldState<C, H, P> {
404        client: C,
405        handler: H,
406        access_control: P,
407    }
408
409    // Use `fold` for two reasons. First, it processes requests serially,
410    // ensuring requests are handled in order. Second, it allows us to
411    // "hand-off" the client/handler from one request to the other, avoiding
412    // copies for each request.
413    let FoldState { client, handler: _, access_control: _ } = receiver
414        .fold(
415            FoldState { client, handler, access_control },
416            |FoldState { mut client, mut handler, access_control }, req| async {
417                match req.validate_creds_and_get_message(&access_control) {
418                    Ok(req) => {
419                        log_debug!("{} Received request: {:?}", client, req);
420                        handler.handle_request(req, &mut client).await
421                    }
422                    Err(e) => {
423                        match &e {
424                            ValidationError::Parse(e) => {
425                                log_warn!("{client} failed to parse netlink message: {e:?}");
426                            }
427                            p @ ValidationError::Permission { .. } => {
428                                log_debug!("{client} permission check failed {p:?}")
429                            }
430                        }
431                        if let Some(rsp) = e.into_error_message() {
432                            client.send_unicast(rsp)
433                        }
434                    }
435                }
436                FoldState { client, handler, access_control }
437            },
438        )
439        .await;
440
441    client
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447    use fuchsia_async as fasync;
448    use futures::FutureExt as _;
449
450    use assert_matches::assert_matches;
451    use netlink_packet_core::{ErrorMessage, NetlinkPayload};
452    use std::num::NonZeroI32;
453    use std::pin::pin;
454
455    use crate::messaging::NetlinkMessageWithCreds;
456    use crate::messaging::testutil::{FakeCreds, SentMessage, TestNetlinkContext};
457    use crate::protocol_family::testutil::{
458        FakeNetlinkRequestHandler, FakeProtocolFamily, new_fake_netlink_message,
459        new_fake_netlink_message_with_creds,
460    };
461
462    #[fasync::run_singlethreaded(test)]
463    async fn test_run_client_request_handler() {
464        let (mut req_sender, req_receiver) = mpsc::channel(0);
465        let (mut client_sink, client, async_work_drain_task) =
466            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
467                crate::client::testutil::CLIENT_ID_1,
468                std::iter::empty(),
469            );
470        let join_handle = fasync::Task::spawn(async_work_drain_task);
471
472        {
473            let mut client_task = pin!(
474                run_client_request_handler::<TestNetlinkContext, FakeProtocolFamily>(
475                    client,
476                    req_receiver,
477                    FakeNetlinkRequestHandler,
478                    Default::default()
479                )
480                .fuse()
481            );
482
483            assert_matches!((&mut client_task).now_or_never(), None);
484            assert_eq!(&client_sink.take_messages()[..], &[]);
485
486            // Send a message and expect to see the response on the `client_sink`.
487            // NB: Use the sender's channel size as a synchronization method; If a
488            // second message could be sent, the first *must* have been handled.
489            req_sender
490                .try_send(new_fake_netlink_message_with_creds())
491                .expect("should send without error");
492            let mut could_send_fut =
493                pin!(futures::future::poll_fn(|ctx| req_sender.poll_ready(ctx)).fuse());
494            futures::select!(
495                res = could_send_fut => res.expect("should be able to send without error"),
496                _client = client_task => panic!("client task unexpectedly finished"),
497            );
498            assert_eq!(
499                &client_sink.take_messages()[..],
500                &[SentMessage::unicast(new_fake_netlink_message())]
501            );
502
503            // Close the sender, and expect the Task to exit.
504            req_sender.close_channel();
505            let _client = client_task.await;
506            assert_eq!(&client_sink.take_messages()[..], &[]);
507        }
508        join_handle.await;
509    }
510
511    #[fasync::run_singlethreaded(test)]
512    async fn test_connect_new_clients() {
513        let client_table = ClientTable::default();
514        let scope = fasync::Scope::new();
515        let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
516        let mut client_acceptor_fut = Box::pin(
517            connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
518                client_table.clone(),
519                client_receiver,
520                FakeNetlinkRequestHandler,
521                Default::default(),
522            )
523            .fuse(),
524        );
525
526        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
527
528        // Connect Client 1.
529        let (mut _client_sink1, client1, async_work_drain_task) =
530            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
531                crate::client::testutil::CLIENT_ID_1,
532                std::iter::empty(),
533            );
534        let _join_handle = scope.spawn(async_work_drain_task);
535        let (mut req_sender1, req_receiver1) = mpsc::channel(0);
536        client_sender
537            .unbounded_send(ClientWithReceiver { client: client1, receiver: req_receiver1 })
538            .expect("should send without error");
539
540        // Connect Client 2.
541        let (mut client_sink2, client2, async_work_drain_task) =
542            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
543                crate::client::testutil::CLIENT_ID_2,
544                std::iter::empty(),
545            );
546        let _join_handle = scope.spawn(async_work_drain_task);
547        let (mut req_sender2, req_receiver2) = mpsc::channel(0);
548        client_sender
549            .unbounded_send(ClientWithReceiver { client: client2, receiver: req_receiver2 })
550            .expect("should send without error");
551
552        // Send a request to Client 2, and verify it's handled despite Client 1
553        // being open (e.g. concurrent handling of requests across clients).
554        // NB: Use the sender's channel size as a synchronization method; If a
555        // second message could be sent, the first *must* have been handled.
556        req_sender2
557            .try_send(new_fake_netlink_message_with_creds())
558            .expect("should send without error");
559        let mut could_send_fut =
560            pin!(futures::future::poll_fn(|ctx| req_sender2.poll_ready(ctx)).fuse());
561        futures::select!(
562            res = could_send_fut => res.expect("should be able to send without error"),
563            () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
564        );
565        assert_eq!(
566            &client_table.client_ids()[..],
567            [client::testutil::CLIENT_ID_1, client::testutil::CLIENT_ID_2]
568        );
569        assert_eq!(
570            &client_sink2.take_messages()[..],
571            &[SentMessage::unicast(new_fake_netlink_message())]
572        );
573
574        // Close the two clients, and verify the acceptor fut is still pending.
575        req_sender1.close_channel();
576        req_sender2.close_channel();
577        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
578
579        // Close the client_sender, and verify the acceptor fut finishes.
580        client_sender.close_channel();
581        client_acceptor_fut.await;
582
583        // Confirm the clients have been cleaned up from the client table.
584        assert_eq!(&client_table.client_ids()[..], []);
585
586        drop(client_table);
587        scope.join().await;
588    }
589
590    #[fasync::run_singlethreaded(test)]
591    async fn test_permissions() {
592        let client_table = ClientTable::default();
593        let scope = fasync::Scope::new();
594        let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
595        let mut client_acceptor_fut = Box::pin(
596            connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
597                client_table.clone(),
598                client_receiver,
599                FakeNetlinkRequestHandler,
600                Default::default(),
601            )
602            .fuse(),
603        );
604        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
605
606        let (mut client_sink, client, async_work_drain_task) =
607            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
608                crate::client::testutil::CLIENT_ID_1,
609                std::iter::empty(),
610            );
611        let _join_handle = scope.spawn(async_work_drain_task);
612        let (mut req_sender, req_receiver) = mpsc::channel(0);
613        client_sender
614            .unbounded_send(ClientWithReceiver { client, receiver: req_receiver })
615            .expect("should send without error");
616
617        let message = NetlinkMessageWithCreds::new(
618            new_fake_netlink_message(),
619            FakeCreds::with_error(Errno::new(libc::EPERM).unwrap()),
620        );
621        req_sender.try_send(message).expect("should send without error");
622
623        let response = futures::select!(
624            res = client_sink.next_message().fuse() => res,
625            () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
626        );
627
628        assert_matches!(
629            response.message.payload,
630            NetlinkPayload::Error(ErrorMessage { code: Some(error_code), .. }) => {
631              assert_eq!(error_code , NonZeroI32::new(-libc::EPERM).unwrap());
632            }
633        );
634    }
635}