Skip to main content

netlink/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! An implementation of Linux's Netlink API for Fuchsia.
6//!
7//! Netlink is a socket-based API provided by Linux that user space applications
8//! can use to interact with the kernel. The API is split up into several
9//! protocol families each offering different functionality. This crate targets
10//! the implementation of families related to networking.
11
12#![warn(missing_docs, unused)]
13
14mod client;
15pub mod interfaces;
16pub(crate) mod logging;
17pub mod messaging;
18pub mod multicast_groups;
19mod nduseropt;
20pub mod neighbors;
21mod netlink_packet;
22pub mod protocol_family;
23pub(crate) mod route_eventloop;
24pub(crate) mod route_tables;
25pub mod routes;
26mod rules;
27pub(crate) mod util;
28
29use std::num::NonZeroU64;
30
31use fuchsia_component::client::connect_to_protocol;
32use futures::StreamExt as _;
33use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
34use futures::channel::oneshot;
35use net_types::ip::{Ipv4, Ipv6};
36use netlink_packet_route::RouteNetlinkMessage;
37use netlink_packet_sock_diag::{SockDiagRequest, SockDiagResponse};
38use protocol_family::route::NetlinkRouteNotifiedGroup;
39use {
40    fidl_fuchsia_net_interfaces as fnet_interfaces, fidl_fuchsia_net_ndp as fnet_ndp,
41    fidl_fuchsia_net_neighbor as fnet_neighbor, fidl_fuchsia_net_root as fnet_root,
42    fidl_fuchsia_net_routes as fnet_routes, fidl_fuchsia_net_routes_admin as fnet_routes_admin,
43    fidl_fuchsia_net_routes_ext as fnet_routes_ext, fidl_fuchsia_net_sockets as fnet_sockets,
44};
45
46use crate::client::{AsyncWorkItem, ClientIdGenerator, ClientTable, InternalClient};
47use crate::logging::{log_debug, log_warn};
48use crate::messaging::{NetlinkContext, UnvalidatedNetlinkMessage as _, ValidationError};
49pub use crate::netlink_packet::errno::Errno;
50use crate::protocol_family::route::{
51    NetlinkRoute, NetlinkRouteClient, NetlinkRouteRequestHandler, RouteAsyncWork,
52};
53use crate::protocol_family::sock_diag::{
54    NetlinkSockDiag, NetlinkSockDiagClient, NetlinkSockDiagRequestHandler, SockDiagEventLoop,
55};
56use crate::protocol_family::{NetlinkFamilyRequestHandler as _, ProtocolFamily};
57use crate::route_eventloop::RouteEventLoop;
58
59/// The tag added to all logs generated by this crate.
60pub const NETLINK_LOG_TAG: &'static str = "netlink";
61
62/// Selects the interface for the sysctl.
63#[derive(Debug, Clone, Copy)]
64pub enum SysctlInterfaceSelector {
65    /// "all" interfaces.
66    ///
67    /// This is supposed to change all interfaces' settings, but this is a
68    /// lie for most of the sysctls, they have no effect at all when written.
69    All,
70    /// "default" interface, all interface created after this write will inherit the value.
71    Default,
72    /// The id of the interface to change.
73    Id(NonZeroU64),
74}
75
76/// The implementation of the Netlink protocol suite.
77pub struct Netlink<C: NetlinkContext> {
78    /// Generator of new Client IDs.
79    id_generator: ClientIdGenerator,
80    /// Sender to attach new `NETLINK_ROUTE` clients to the Netlink worker.
81    route_client_sender: UnboundedSender<ClientWithReceiver<C, NetlinkRoute>>,
82    /// Sender to send other async work items to the Netlink worker.
83    route_async_work_sink: mpsc::UnboundedSender<AsyncWorkItem<NetlinkRoute>>,
84    /// Sender to attach new `NETLINK_SOCK_DIAG` clients to the Netlink worker.
85    sock_diag_client_sender: UnboundedSender<ClientWithReceiver<C, NetlinkSockDiag>>,
86    /// Sender to send other async work items to the Netlink worker.
87    sock_diag_async_work_sink: mpsc::UnboundedSender<AsyncWorkItem<NetlinkSockDiag>>,
88}
89
90impl<C: NetlinkContext> Netlink<C> {
91    /// Returns a newly instantiated [`Netlink`] and parameters used to start the
92    /// asynchronous worker.
93    ///
94    /// Caller is expected to run the worker by calling `run_netlink_worker()`.
95    pub fn new<H: interfaces::InterfacesHandler>(
96        interfaces_handler: H,
97    ) -> (Self, NetlinkWorkerParams<H, C>) {
98        let (route_client_sender, route_client_receiver) = mpsc::unbounded();
99        let (route_async_work_sink, async_work_receiver) = mpsc::unbounded();
100        let (sock_diag_client_sender, sock_diag_client_receiver) = mpsc::unbounded();
101        let (sock_diag_async_work_sink, sock_diag_async_work_receiver) = mpsc::unbounded();
102        (
103            Netlink {
104                id_generator: ClientIdGenerator::default(),
105                route_client_sender,
106                sock_diag_client_sender,
107                route_async_work_sink,
108                sock_diag_async_work_sink,
109            },
110            NetlinkWorkerParams {
111                interfaces_handler,
112                route_client_receiver,
113                route_async_work_receiver: async_work_receiver,
114                sock_diag_client_receiver,
115                sock_diag_async_work_receiver,
116            },
117        )
118    }
119
120    /// Writes the accept_ra_rt_table sysctl for the selected interface.
121    pub fn write_accept_ra_rt_table(
122        &self,
123        interface: SysctlInterfaceSelector,
124        value: i32,
125    ) -> Result<(), SysctlError> {
126        let (responder, receiver) = oneshot_sync::channel();
127        self.route_async_work_sink
128            .unbounded_send(AsyncWorkItem::Inner(RouteAsyncWork::SetAcceptRaRtTable {
129                interface,
130                value: value.into(),
131                responder,
132            }))
133            .map_err(|_| SysctlError::Disconnected)?;
134        receiver.receive().map_err(|_| SysctlError::Disconnected)?
135    }
136
137    /// Reads the accept_ra_rt_table sysctl for the selected interface.
138    pub fn read_accept_ra_rt_table(
139        &self,
140        interface: SysctlInterfaceSelector,
141    ) -> Result<i32, SysctlError> {
142        let (responder, receiver) = oneshot_sync::channel();
143        self.route_async_work_sink
144            .unbounded_send(AsyncWorkItem::Inner(RouteAsyncWork::GetAcceptRaRtTable {
145                interface,
146                responder,
147            }))
148            .map_err(|_| SysctlError::Disconnected)?;
149        Ok(receiver.receive().map_err(|_| SysctlError::Disconnected)??.into())
150    }
151
152    /// Creates a new client of the `NETLINK_ROUTE` protocol family.
153    ///
154    /// `sender` is used by Netlink to send messages to the client.
155    /// `receiver` is used by Netlink to receive messages from the client.
156    ///
157    /// Closing the `receiver` will close this client, disconnecting `sender`.
158    pub fn new_route_client(
159        &self,
160        sender: C::Sender<RouteNetlinkMessage>,
161        receiver: C::Receiver<RouteNetlinkMessage>,
162    ) -> Result<NetlinkRouteClient, NewClientError> {
163        let Netlink {
164            id_generator,
165            route_client_sender,
166            route_async_work_sink,
167            sock_diag_client_sender: _,
168            sock_diag_async_work_sink: _,
169        } = self;
170        let (external_client, internal_client) = client::new_client_pair::<NetlinkRoute, _>(
171            id_generator.new_id(),
172            sender,
173            route_async_work_sink.clone(),
174        );
175        route_client_sender
176            .unbounded_send(ClientWithReceiver { client: internal_client, receiver })
177            .map_err(|e| {
178                // Sending on an `UnboundedSender` can never fail with `is_full()`.
179                debug_assert!(e.is_disconnected());
180                NewClientError::Disconnected
181            })?;
182        Ok(NetlinkRouteClient(external_client))
183    }
184
185    /// Creates a new client of the `NETLINK_SOCK_DIAG` protocol family.
186    ///
187    /// `sender` is used by Netlink to send messages to the client.
188    /// `receiver` is used by Netlink to receive messages from the client.
189    ///
190    /// Closing the `receiver` will close this client, disconnecting `sender`.
191    pub fn new_sock_diag_client(
192        &self,
193        sender: C::Sender<SockDiagResponse>,
194        receiver: C::Receiver<SockDiagRequest>,
195    ) -> Result<NetlinkSockDiagClient, NewClientError> {
196        let Netlink {
197            id_generator,
198            route_client_sender: _,
199            route_async_work_sink: _,
200            sock_diag_client_sender,
201            sock_diag_async_work_sink,
202        } = self;
203        let (external_client, internal_client) = client::new_client_pair::<NetlinkSockDiag, _>(
204            id_generator.new_id(),
205            sender,
206            sock_diag_async_work_sink.clone(),
207        );
208        sock_diag_client_sender
209            .unbounded_send(ClientWithReceiver { client: internal_client, receiver })
210            .map_err(|e| {
211                // Sending on an `UnboundedSender` can never fail with `is_full()`.
212                debug_assert!(e.is_disconnected());
213                NewClientError::Disconnected
214            })?;
215        Ok(NetlinkSockDiagClient(external_client))
216    }
217}
218
219/// A wrapper to hold an [`InternalClient`], and its [`Receiver`] of requests.
220struct ClientWithReceiver<C: NetlinkContext, F: ProtocolFamily> {
221    client: InternalClient<F, C::Sender<F::Response>>,
222    receiver: C::Receiver<F::Request>,
223}
224
225/// The possible error types when instantiating a new client.
226#[derive(Debug)]
227pub enum NewClientError {
228    /// The [`Netlink`] is disconnected from its associated worker, perhaps as a
229    /// result of dropping the worker.
230    Disconnected,
231}
232
233/// The possible error types when trying to access a sysctl.
234#[derive(Debug)]
235pub enum SysctlError {
236    /// The [`Netlink`] is disconnected from its associated worker.
237    Disconnected,
238    /// The interface went away.
239    NoInterface,
240    /// The written value requests for an unsupported operation.
241    Unsupported,
242}
243
244/// Parameters used to start the Netlink asynchronous worker.
245pub struct NetlinkWorkerParams<H, C: NetlinkContext> {
246    interfaces_handler: H,
247    /// Receiver of newly created `NETLINK_ROUTE` clients.
248    route_client_receiver: UnboundedReceiver<ClientWithReceiver<C, NetlinkRoute>>,
249    route_async_work_receiver:
250        futures::channel::mpsc::UnboundedReceiver<AsyncWorkItem<NetlinkRoute>>,
251    /// Receiver of newly created `NETLINK_SOCK_DIAG` clients.
252    #[allow(dead_code)]
253    sock_diag_client_receiver: UnboundedReceiver<ClientWithReceiver<C, NetlinkSockDiag>>,
254    #[allow(dead_code)]
255    sock_diag_async_work_receiver:
256        futures::channel::mpsc::UnboundedReceiver<AsyncWorkItem<NetlinkSockDiag>>,
257}
258
259/// All of the protocols that the netlink worker connects to.
260#[allow(missing_docs)]
261pub struct NetlinkWorkerDiscoverableProtocols {
262    pub root_interfaces: fnet_root::InterfacesProxy,
263    pub interfaces_state: fnet_interfaces::StateProxy,
264    pub v4_routes_state: fnet_routes::StateV4Proxy,
265    pub v6_routes_state: fnet_routes::StateV6Proxy,
266    pub v4_main_route_table: fnet_routes_admin::RouteTableV4Proxy,
267    pub v6_main_route_table: fnet_routes_admin::RouteTableV6Proxy,
268    pub v4_route_table_provider: fnet_routes_admin::RouteTableProviderV4Proxy,
269    pub v6_route_table_provider: fnet_routes_admin::RouteTableProviderV6Proxy,
270    pub v4_rule_table: fnet_routes_admin::RuleTableV4Proxy,
271    pub v6_rule_table: fnet_routes_admin::RuleTableV6Proxy,
272    pub ndp_option_watcher_provider: fnet_ndp::RouterAdvertisementOptionWatcherProviderProxy,
273    pub socket_diagnostics: fnet_sockets::DiagnosticsProxy,
274    pub socket_control: fnet_sockets::ControlProxy,
275    pub neighbors_view: fnet_neighbor::ViewProxy,
276    pub neighbors_controller: fnet_neighbor::ControllerProxy,
277}
278
279impl NetlinkWorkerDiscoverableProtocols {
280    fn from_environment() -> Self {
281        let root_interfaces = connect_to_protocol::<fnet_root::InterfacesMarker>()
282            .expect("connect to fuchsia.net.root.Interfaces");
283        let interfaces_state = connect_to_protocol::<fnet_interfaces::StateMarker>()
284            .expect("connect to fuchsia.net.interfaces.State");
285        let v4_routes_state =
286            connect_to_protocol::<<Ipv4 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
287                .expect("connect to fuchsia.net.routes.StateV4");
288        let v6_routes_state =
289            connect_to_protocol::<<Ipv6 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
290                .expect("connect to fuchsia.net.routes.StateV6");
291        let v4_main_route_table = connect_to_protocol::<
292            <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
293        >()
294        .expect("connect to fuchsia.net.routes.admin.RouteTableV4");
295        let v6_main_route_table = connect_to_protocol::<
296            <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
297        >()
298        .expect("connect to fuchsia.net.routes.admin.RouteTableV6");
299        let v4_route_table_provider = connect_to_protocol::<
300            <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
301        >()
302        .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV4");
303        let v6_route_table_provider = connect_to_protocol::<
304            <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
305        >()
306        .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV6");
307        let v4_rule_table = connect_to_protocol::<
308            <Ipv4 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
309        >()
310        .expect("connect to fuchsia.net.routes.admin.RuleTableV4");
311        let v6_rule_table = connect_to_protocol::<
312            <Ipv6 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
313        >()
314        .expect("connect to fuchsia.net.routes.admin.RuleTableV6");
315        let ndp_option_watcher_provider =
316            connect_to_protocol::<fnet_ndp::RouterAdvertisementOptionWatcherProviderMarker>()
317                .expect("connect to fuchsia.net.ndp.RouterAdvertisementOptionWatcherProvider");
318        let socket_diagnostics = connect_to_protocol::<fnet_sockets::DiagnosticsMarker>()
319            .expect("connect to fuchsia.net.sockets.Diagnostics");
320        let socket_control = connect_to_protocol::<fnet_sockets::ControlMarker>()
321            .expect("connect to fuchsia.net.sockets.Control");
322        let neighbors_view = connect_to_protocol::<fnet_neighbor::ViewMarker>()
323            .expect("connect to fuchsia.net.neighbor.View");
324        let neighbors_controller = connect_to_protocol::<fnet_neighbor::ControllerMarker>()
325            .expect("connect to fuchsia.net.neighbor.Controller");
326
327        Self {
328            root_interfaces,
329            interfaces_state,
330            v4_routes_state,
331            v6_routes_state,
332            v4_main_route_table,
333            v6_main_route_table,
334            v4_route_table_provider,
335            v6_route_table_provider,
336            v4_rule_table,
337            v6_rule_table,
338            ndp_option_watcher_provider,
339            socket_diagnostics,
340            socket_control,
341            neighbors_view,
342            neighbors_controller,
343        }
344    }
345}
346
347/// The worker encompassing all asynchronous Netlink work.
348///
349/// The worker is never expected to complete.
350///
351/// `protocols` is taken as a closure because we need to avoid creating asynchronous FIDL proxies
352/// until an executor is running, so it's helpful to defer creation until the event loop starts
353/// running.
354///
355/// # Panics
356///
357/// Panics if a non-recoverable error is encountered by the worker. For example,
358/// a FIDL error on one of the FIDL connections with the netstack.
359pub async fn run_netlink_worker<H: interfaces::InterfacesHandler, C: NetlinkContext>(
360    params: NetlinkWorkerParams<H, C>,
361    access_control: C::AccessControl<'_>,
362) {
363    run_netlink_worker_with_protocols(
364        params,
365        NetlinkWorkerDiscoverableProtocols::from_environment(),
366        None,
367        access_control,
368    )
369    .await;
370}
371
372/// Same as `run_netlink_worker()`, but allows to pass custom
373/// `NetlinkWorkerDiscoverableProtocols`.
374pub async fn run_netlink_worker_with_protocols<
375    H: interfaces::InterfacesHandler,
376    C: NetlinkContext,
377>(
378    params: NetlinkWorkerParams<H, C>,
379    protocols: NetlinkWorkerDiscoverableProtocols,
380    on_route_initialized: Option<oneshot::Sender<()>>,
381    access_control: C::AccessControl<'_>,
382) {
383    let NetlinkWorkerParams {
384        interfaces_handler,
385        route_client_receiver,
386        route_async_work_receiver,
387        sock_diag_client_receiver,
388        sock_diag_async_work_receiver,
389    } = params;
390
391    let NetlinkWorkerDiscoverableProtocols {
392        root_interfaces,
393        interfaces_state,
394        v4_routes_state,
395        v6_routes_state,
396        v4_main_route_table,
397        v6_main_route_table,
398        v4_route_table_provider,
399        v6_route_table_provider,
400        v4_rule_table,
401        v6_rule_table,
402        ndp_option_watcher_provider,
403        socket_diagnostics,
404        socket_control,
405        neighbors_view,
406        neighbors_controller,
407    } = protocols;
408
409    let route_clients = ClientTable::default();
410    let (route_request_sink, route_request_stream) = mpsc::channel(1);
411
412    let route_event_loop = {
413        let route_clients = route_clients.clone();
414        async move {
415            let event_loop: RouteEventLoop<H, C::Sender<_>> = RouteEventLoop {
416                interfaces_proxy: root_interfaces,
417                interfaces_state_proxy: interfaces_state,
418                v4_routes_state,
419                v6_routes_state,
420                v4_main_route_table,
421                v6_main_route_table,
422                v4_route_table_provider,
423                v6_route_table_provider,
424                v4_rule_table,
425                v6_rule_table,
426                ndp_option_watcher_provider,
427                neighbors_view,
428                neighbors_controller,
429                route_clients,
430                request_stream: route_request_stream,
431                interfaces_handler,
432                async_work_receiver: route_async_work_receiver,
433            };
434
435            event_loop.run(on_route_initialized).await;
436        }
437    };
438
439    let route_client_receiver_loop = {
440        let access_control = access_control.clone();
441        async move {
442            // Accept new NETLINK_ROUTE clients.
443            connect_new_clients::<C, NetlinkRoute>(
444                route_clients,
445                route_client_receiver,
446                NetlinkRouteRequestHandler { unified_request_sink: route_request_sink },
447                access_control,
448            )
449            .await;
450            panic!("route_client_receiver stream unexpectedly finished");
451        }
452    };
453
454    let sock_diag_clients = ClientTable::default();
455    let (sock_diag_request_sink, sock_diag_request_stream) = mpsc::channel(1);
456
457    let sock_diag_event_loop = async move {
458        SockDiagEventLoop {
459            socket_diagnostics,
460            socket_control,
461            request_stream: sock_diag_request_stream,
462            async_work_receiver: sock_diag_async_work_receiver,
463        }
464        .run()
465        .await;
466    };
467
468    let sock_diag_client_receiver_loop = async move {
469        // Accept new NETLINK_SOCK_DIAG clients.
470        connect_new_clients::<C, NetlinkSockDiag>(
471            sock_diag_clients,
472            sock_diag_client_receiver,
473            NetlinkSockDiagRequestHandler { sock_diag_request_sink },
474            access_control,
475        )
476        .await;
477        panic!("sock_diag_client_receiver stream unexpectedly finished");
478    };
479
480    futures::future::join4(
481        route_event_loop,
482        route_client_receiver_loop,
483        sock_diag_event_loop,
484        sock_diag_client_receiver_loop,
485    )
486    .await;
487}
488
489/// Receives clients from the given receiver, adding them to the given table.
490///
491/// A "Request Handler" Task will be spawned for each received client. The given
492/// `request_handler_impl` defines how the requests will be handled.
493async fn connect_new_clients<C: NetlinkContext, F: ProtocolFamily>(
494    client_table: ClientTable<F, C::Sender<F::Response>>,
495    client_receiver: UnboundedReceiver<ClientWithReceiver<C, F>>,
496    request_handler_impl: F::RequestHandler<C::Sender<F::Response>>,
497    access_control: C::AccessControl<'_>,
498) {
499    client_receiver
500        // Drive each client concurrently with `for_each_concurrent`.
501        .for_each_concurrent(None, async |ClientWithReceiver { client, receiver }| {
502            client_table.add_client(client.clone());
503            let client = run_client_request_handler::<C, F>(
504                client,
505                receiver,
506                request_handler_impl.clone(),
507                access_control.clone(),
508            )
509            .await;
510            client_table.remove_client(client);
511        })
512        .await;
513}
514
515/// Reads messages from the `receiver` and handles them using the `handler`.
516///
517/// The task terminates when the underlying `Receiver` closes, yielding the
518/// original client.
519async fn run_client_request_handler<C: NetlinkContext, F: ProtocolFamily>(
520    client: InternalClient<F, C::Sender<F::Response>>,
521    receiver: C::Receiver<F::Request>,
522    handler: F::RequestHandler<C::Sender<F::Response>>,
523    access_control: C::AccessControl<'_>,
524) -> InternalClient<F, C::Sender<F::Response>> {
525    // State needed to handle an individual request, that is cycled through the
526    // `fold` combinator below.
527    struct FoldState<C, H, P> {
528        client: C,
529        handler: H,
530        access_control: P,
531    }
532
533    // Use `fold` for two reasons. First, it processes requests serially,
534    // ensuring requests are handled in order. Second, it allows us to
535    // "hand-off" the client/handler from one request to the other, avoiding
536    // copies for each request.
537    let FoldState { client, handler: _, access_control: _ } = receiver
538        .fold(
539            FoldState { client, handler, access_control },
540            |FoldState { mut client, mut handler, access_control }, req| async {
541                match req.validate_creds_and_get_message(&access_control) {
542                    Ok(req) => {
543                        log_debug!("{} Received request: {:?}", client, req);
544                        handler.handle_request(req, &mut client).await
545                    }
546                    Err(e) => {
547                        match &e {
548                            ValidationError::Parse(e) => {
549                                log_warn!("{client} failed to parse netlink message: {e:?}");
550                            }
551                            p @ ValidationError::Permission { .. } => {
552                                log_debug!("{client} permission check failed {p:?}")
553                            }
554                        }
555                        if let Some(rsp) = e.into_error_message() {
556                            client.send_unicast(rsp)
557                        }
558                    }
559                }
560                FoldState { client, handler, access_control }
561            },
562        )
563        .await;
564
565    client
566}
567
568#[cfg(test)]
569mod tests {
570    use super::*;
571    use fuchsia_async as fasync;
572    use futures::FutureExt as _;
573
574    use assert_matches::assert_matches;
575    use netlink_packet_core::{ErrorMessage, NetlinkPayload};
576    use std::num::NonZeroI32;
577    use std::pin::pin;
578
579    use crate::messaging::NetlinkMessageWithCreds;
580    use crate::messaging::testutil::{FakeCreds, SentMessage, TestNetlinkContext};
581    use crate::protocol_family::testutil::{
582        FakeNetlinkRequestHandler, FakeProtocolFamily, new_fake_netlink_message,
583        new_fake_netlink_message_with_creds,
584    };
585
586    #[fasync::run_singlethreaded(test)]
587    async fn test_run_client_request_handler() {
588        let (mut req_sender, req_receiver) = mpsc::channel(0);
589        let (mut client_sink, client, async_work_drain_task) =
590            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
591                crate::client::testutil::CLIENT_ID_1,
592                std::iter::empty(),
593            );
594        let join_handle = fasync::Task::spawn(async_work_drain_task);
595
596        {
597            let mut client_task = pin!(
598                run_client_request_handler::<TestNetlinkContext, FakeProtocolFamily>(
599                    client,
600                    req_receiver,
601                    FakeNetlinkRequestHandler,
602                    Default::default()
603                )
604                .fuse()
605            );
606
607            assert_matches!((&mut client_task).now_or_never(), None);
608            assert_eq!(&client_sink.take_messages()[..], &[]);
609
610            // Send a message and expect to see the response on the `client_sink`.
611            // NB: Use the sender's channel size as a synchronization method; If a
612            // second message could be sent, the first *must* have been handled.
613            req_sender
614                .try_send(new_fake_netlink_message_with_creds())
615                .expect("should send without error");
616            let mut could_send_fut =
617                pin!(futures::future::poll_fn(|ctx| req_sender.poll_ready(ctx)).fuse());
618            futures::select!(
619                res = could_send_fut => res.expect("should be able to send without error"),
620                _client = client_task => panic!("client task unexpectedly finished"),
621            );
622            assert_eq!(
623                &client_sink.take_messages()[..],
624                &[SentMessage::unicast(new_fake_netlink_message())]
625            );
626
627            // Close the sender, and expect the Task to exit.
628            req_sender.close_channel();
629            let _client = client_task.await;
630            assert_eq!(&client_sink.take_messages()[..], &[]);
631        }
632        join_handle.await;
633    }
634
635    #[fasync::run_singlethreaded(test)]
636    async fn test_connect_new_clients() {
637        let client_table = ClientTable::default();
638        let scope = fasync::Scope::new();
639        let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
640        let mut client_acceptor_fut = Box::pin(
641            connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
642                client_table.clone(),
643                client_receiver,
644                FakeNetlinkRequestHandler,
645                Default::default(),
646            )
647            .fuse(),
648        );
649
650        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
651
652        // Connect Client 1.
653        let (mut _client_sink1, client1, async_work_drain_task) =
654            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
655                crate::client::testutil::CLIENT_ID_1,
656                std::iter::empty(),
657            );
658        let _join_handle = scope.spawn(async_work_drain_task);
659        let (mut req_sender1, req_receiver1) = mpsc::channel(0);
660        client_sender
661            .unbounded_send(ClientWithReceiver { client: client1, receiver: req_receiver1 })
662            .expect("should send without error");
663
664        // Connect Client 2.
665        let (mut client_sink2, client2, async_work_drain_task) =
666            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
667                crate::client::testutil::CLIENT_ID_2,
668                std::iter::empty(),
669            );
670        let _join_handle = scope.spawn(async_work_drain_task);
671        let (mut req_sender2, req_receiver2) = mpsc::channel(0);
672        client_sender
673            .unbounded_send(ClientWithReceiver { client: client2, receiver: req_receiver2 })
674            .expect("should send without error");
675
676        // Send a request to Client 2, and verify it's handled despite Client 1
677        // being open (e.g. concurrent handling of requests across clients).
678        // NB: Use the sender's channel size as a synchronization method; If a
679        // second message could be sent, the first *must* have been handled.
680        req_sender2
681            .try_send(new_fake_netlink_message_with_creds())
682            .expect("should send without error");
683        let mut could_send_fut =
684            pin!(futures::future::poll_fn(|ctx| req_sender2.poll_ready(ctx)).fuse());
685        futures::select!(
686            res = could_send_fut => res.expect("should be able to send without error"),
687            () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
688        );
689        assert_eq!(
690            &client_table.client_ids()[..],
691            [client::testutil::CLIENT_ID_1, client::testutil::CLIENT_ID_2]
692        );
693        assert_eq!(
694            &client_sink2.take_messages()[..],
695            &[SentMessage::unicast(new_fake_netlink_message())]
696        );
697
698        // Close the two clients, and verify the acceptor fut is still pending.
699        req_sender1.close_channel();
700        req_sender2.close_channel();
701        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
702
703        // Close the client_sender, and verify the acceptor fut finishes.
704        client_sender.close_channel();
705        client_acceptor_fut.await;
706
707        // Confirm the clients have been cleaned up from the client table.
708        assert_eq!(&client_table.client_ids()[..], []);
709
710        drop(client_table);
711        scope.join().await;
712    }
713
714    #[fasync::run_singlethreaded(test)]
715    async fn test_permissions() {
716        let client_table = ClientTable::default();
717        let scope = fasync::Scope::new();
718        let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
719        let mut client_acceptor_fut = Box::pin(
720            connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
721                client_table.clone(),
722                client_receiver,
723                FakeNetlinkRequestHandler,
724                Default::default(),
725            )
726            .fuse(),
727        );
728        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
729
730        let (mut client_sink, client, async_work_drain_task) =
731            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
732                crate::client::testutil::CLIENT_ID_1,
733                std::iter::empty(),
734            );
735        let _join_handle = scope.spawn(async_work_drain_task);
736        let (mut req_sender, req_receiver) = mpsc::channel(0);
737        client_sender
738            .unbounded_send(ClientWithReceiver { client, receiver: req_receiver })
739            .expect("should send without error");
740
741        let message = NetlinkMessageWithCreds::new(
742            new_fake_netlink_message(),
743            FakeCreds::with_error(Errno::new(libc::EPERM).unwrap()),
744        );
745        req_sender.try_send(message).expect("should send without error");
746
747        let response = futures::select!(
748            res = client_sink.next_message().fuse() => res,
749            () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
750        );
751
752        assert_matches!(
753            response.message.payload,
754            NetlinkPayload::Error(ErrorMessage { code: Some(error_code), .. }) => {
755              assert_eq!(error_code , NonZeroI32::new(-libc::EPERM).unwrap());
756            }
757        );
758    }
759}