Skip to main content

netlink/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! An implementation of Linux's Netlink API for Fuchsia.
6//!
7//! Netlink is a socket-based API provided by Linux that user space applications
8//! can use to interact with the kernel. The API is split up into several
9//! protocol families each offering different functionality. This crate targets
10//! the implementation of families related to networking.
11
12#![warn(missing_docs, unused)]
13
14mod client;
15pub mod interfaces;
16pub(crate) mod logging;
17pub mod messaging;
18pub mod multicast_groups;
19mod nduseropt;
20pub mod neighbors;
21mod netlink_packet;
22pub mod protocol_family;
23pub(crate) mod route_eventloop;
24pub(crate) mod route_tables;
25pub mod routes;
26mod rules;
27pub(crate) mod util;
28
29use std::num::NonZeroU64;
30
31use fidl_fuchsia_net_interfaces as fnet_interfaces;
32use fidl_fuchsia_net_ndp as fnet_ndp;
33use fidl_fuchsia_net_neighbor as fnet_neighbor;
34use fidl_fuchsia_net_root as fnet_root;
35use fidl_fuchsia_net_routes as fnet_routes;
36use fidl_fuchsia_net_routes_admin as fnet_routes_admin;
37use fidl_fuchsia_net_routes_ext as fnet_routes_ext;
38use fidl_fuchsia_net_sockets as fnet_sockets;
39use fuchsia_component::client::connect_to_protocol;
40use futures::StreamExt as _;
41use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
42use futures::channel::oneshot;
43use net_types::ip::{Ipv4, Ipv6};
44use netlink_packet_route::RouteNetlinkMessage;
45use netlink_packet_sock_diag::{SockDiagRequest, SockDiagResponse};
46use protocol_family::route::NetlinkRouteNotifiedGroup;
47
48use crate::client::{AsyncWorkItem, ClientIdGenerator, ClientTable, InternalClient};
49use crate::logging::{log_debug, log_warn};
50use crate::messaging::{NetlinkContext, UnvalidatedNetlinkMessage as _, ValidationError};
51pub use crate::netlink_packet::errno::Errno;
52use crate::protocol_family::route::{
53    NetlinkRoute, NetlinkRouteClient, NetlinkRouteRequestHandler, RouteAsyncWork,
54};
55use crate::protocol_family::sock_diag::{
56    NetlinkSockDiag, NetlinkSockDiagClient, NetlinkSockDiagRequestHandler, SockDiagEventLoop,
57};
58use crate::protocol_family::{NetlinkFamilyRequestHandler as _, ProtocolFamily};
59use crate::route_eventloop::RouteEventLoop;
60
61/// The tag added to all logs generated by this crate.
62pub const NETLINK_LOG_TAG: &'static str = "netlink";
63
64/// Selects the interface for the sysctl.
65#[derive(Debug, Clone, Copy)]
66pub enum SysctlInterfaceSelector {
67    /// "all" interfaces.
68    ///
69    /// This is supposed to change all interfaces' settings, but this is a
70    /// lie for most of the sysctls, they have no effect at all when written.
71    All,
72    /// "default" interface, all interface created after this write will inherit the value.
73    Default,
74    /// The id of the interface to change.
75    Id(NonZeroU64),
76}
77
78/// The implementation of the Netlink protocol suite.
79pub struct Netlink<C: NetlinkContext> {
80    /// Generator of new Client IDs.
81    id_generator: ClientIdGenerator,
82    /// Sender to attach new `NETLINK_ROUTE` clients to the Netlink worker.
83    route_client_sender: UnboundedSender<ClientWithReceiver<C, NetlinkRoute>>,
84    /// Sender to send other async work items to the Netlink worker.
85    route_async_work_sink: mpsc::UnboundedSender<AsyncWorkItem<NetlinkRoute>>,
86    /// Sender to attach new `NETLINK_SOCK_DIAG` clients to the Netlink worker.
87    sock_diag_client_sender: UnboundedSender<ClientWithReceiver<C, NetlinkSockDiag>>,
88    /// Sender to send other async work items to the Netlink worker.
89    sock_diag_async_work_sink: mpsc::UnboundedSender<AsyncWorkItem<NetlinkSockDiag>>,
90}
91
92impl<C: NetlinkContext> Netlink<C> {
93    /// Returns a newly instantiated [`Netlink`] and parameters used to start the
94    /// asynchronous worker.
95    ///
96    /// Caller is expected to run the worker by calling `run_netlink_worker()`.
97    pub fn new<H: interfaces::InterfacesHandler>(
98        interfaces_handler: H,
99    ) -> (Self, NetlinkWorkerParams<H, C>) {
100        let (route_client_sender, route_client_receiver) = mpsc::unbounded();
101        let (route_async_work_sink, async_work_receiver) = mpsc::unbounded();
102        let (sock_diag_client_sender, sock_diag_client_receiver) = mpsc::unbounded();
103        let (sock_diag_async_work_sink, sock_diag_async_work_receiver) = mpsc::unbounded();
104        (
105            Netlink {
106                id_generator: ClientIdGenerator::default(),
107                route_client_sender,
108                sock_diag_client_sender,
109                route_async_work_sink,
110                sock_diag_async_work_sink,
111            },
112            NetlinkWorkerParams {
113                interfaces_handler,
114                route_client_receiver,
115                route_async_work_receiver: async_work_receiver,
116                sock_diag_client_receiver,
117                sock_diag_async_work_receiver,
118            },
119        )
120    }
121
122    /// Writes the accept_ra_rt_table sysctl for the selected interface.
123    pub fn write_accept_ra_rt_table(
124        &self,
125        interface: SysctlInterfaceSelector,
126        value: i32,
127    ) -> Result<(), SysctlError> {
128        let (responder, receiver) = oneshot_sync::channel();
129        self.route_async_work_sink
130            .unbounded_send(AsyncWorkItem::Inner(RouteAsyncWork::SetAcceptRaRtTable {
131                interface,
132                value: value.into(),
133                responder,
134            }))
135            .map_err(|_| SysctlError::Disconnected)?;
136        receiver.receive().map_err(|_| SysctlError::Disconnected)?
137    }
138
139    /// Reads the accept_ra_rt_table sysctl for the selected interface.
140    pub fn read_accept_ra_rt_table(
141        &self,
142        interface: SysctlInterfaceSelector,
143    ) -> Result<i32, SysctlError> {
144        let (responder, receiver) = oneshot_sync::channel();
145        self.route_async_work_sink
146            .unbounded_send(AsyncWorkItem::Inner(RouteAsyncWork::GetAcceptRaRtTable {
147                interface,
148                responder,
149            }))
150            .map_err(|_| SysctlError::Disconnected)?;
151        Ok(receiver.receive().map_err(|_| SysctlError::Disconnected)??.into())
152    }
153
154    /// Creates a new client of the `NETLINK_ROUTE` protocol family.
155    ///
156    /// `sender` is used by Netlink to send messages to the client.
157    /// `receiver` is used by Netlink to receive messages from the client.
158    ///
159    /// Closing the `receiver` will close this client, disconnecting `sender`.
160    pub fn new_route_client(
161        &self,
162        sender: C::Sender<RouteNetlinkMessage>,
163        receiver: C::Receiver<RouteNetlinkMessage>,
164    ) -> Result<NetlinkRouteClient, NewClientError> {
165        let Netlink {
166            id_generator,
167            route_client_sender,
168            route_async_work_sink,
169            sock_diag_client_sender: _,
170            sock_diag_async_work_sink: _,
171        } = self;
172        let (external_client, internal_client) = client::new_client_pair::<NetlinkRoute, _>(
173            id_generator.new_id(),
174            sender,
175            route_async_work_sink.clone(),
176        );
177        route_client_sender
178            .unbounded_send(ClientWithReceiver { client: internal_client, receiver })
179            .map_err(|e| {
180                // Sending on an `UnboundedSender` can never fail with `is_full()`.
181                debug_assert!(e.is_disconnected());
182                NewClientError::Disconnected
183            })?;
184        Ok(NetlinkRouteClient(external_client))
185    }
186
187    /// Creates a new client of the `NETLINK_SOCK_DIAG` protocol family.
188    ///
189    /// `sender` is used by Netlink to send messages to the client.
190    /// `receiver` is used by Netlink to receive messages from the client.
191    ///
192    /// Closing the `receiver` will close this client, disconnecting `sender`.
193    pub fn new_sock_diag_client(
194        &self,
195        sender: C::Sender<SockDiagResponse>,
196        receiver: C::Receiver<SockDiagRequest>,
197    ) -> Result<NetlinkSockDiagClient, NewClientError> {
198        let Netlink {
199            id_generator,
200            route_client_sender: _,
201            route_async_work_sink: _,
202            sock_diag_client_sender,
203            sock_diag_async_work_sink,
204        } = self;
205        let (external_client, internal_client) = client::new_client_pair::<NetlinkSockDiag, _>(
206            id_generator.new_id(),
207            sender,
208            sock_diag_async_work_sink.clone(),
209        );
210        sock_diag_client_sender
211            .unbounded_send(ClientWithReceiver { client: internal_client, receiver })
212            .map_err(|e| {
213                // Sending on an `UnboundedSender` can never fail with `is_full()`.
214                debug_assert!(e.is_disconnected());
215                NewClientError::Disconnected
216            })?;
217        Ok(NetlinkSockDiagClient(external_client))
218    }
219}
220
221/// A wrapper to hold an [`InternalClient`], and its [`Receiver`] of requests.
222struct ClientWithReceiver<C: NetlinkContext, F: ProtocolFamily> {
223    client: InternalClient<F, C::Sender<F::Response>>,
224    receiver: C::Receiver<F::Request>,
225}
226
227/// The possible error types when instantiating a new client.
228#[derive(Debug)]
229pub enum NewClientError {
230    /// The [`Netlink`] is disconnected from its associated worker, perhaps as a
231    /// result of dropping the worker.
232    Disconnected,
233}
234
235/// The possible error types when trying to access a sysctl.
236#[derive(Debug)]
237pub enum SysctlError {
238    /// The [`Netlink`] is disconnected from its associated worker.
239    Disconnected,
240    /// The interface went away.
241    NoInterface,
242    /// The written value requests for an unsupported operation.
243    Unsupported,
244}
245
246/// Parameters used to start the Netlink asynchronous worker.
247pub struct NetlinkWorkerParams<H, C: NetlinkContext> {
248    interfaces_handler: H,
249    /// Receiver of newly created `NETLINK_ROUTE` clients.
250    route_client_receiver: UnboundedReceiver<ClientWithReceiver<C, NetlinkRoute>>,
251    route_async_work_receiver:
252        futures::channel::mpsc::UnboundedReceiver<AsyncWorkItem<NetlinkRoute>>,
253    /// Receiver of newly created `NETLINK_SOCK_DIAG` clients.
254    #[allow(dead_code)]
255    sock_diag_client_receiver: UnboundedReceiver<ClientWithReceiver<C, NetlinkSockDiag>>,
256    #[allow(dead_code)]
257    sock_diag_async_work_receiver:
258        futures::channel::mpsc::UnboundedReceiver<AsyncWorkItem<NetlinkSockDiag>>,
259}
260
261/// All of the protocols that the netlink worker connects to.
262#[allow(missing_docs)]
263pub struct NetlinkWorkerDiscoverableProtocols {
264    pub root_interfaces: fnet_root::InterfacesProxy,
265    pub interfaces_state: fnet_interfaces::StateProxy,
266    pub v4_routes_state: fnet_routes::StateV4Proxy,
267    pub v6_routes_state: fnet_routes::StateV6Proxy,
268    pub v4_main_route_table: fnet_routes_admin::RouteTableV4Proxy,
269    pub v6_main_route_table: fnet_routes_admin::RouteTableV6Proxy,
270    pub v4_route_table_provider: fnet_routes_admin::RouteTableProviderV4Proxy,
271    pub v6_route_table_provider: fnet_routes_admin::RouteTableProviderV6Proxy,
272    pub v4_rule_table: fnet_routes_admin::RuleTableV4Proxy,
273    pub v6_rule_table: fnet_routes_admin::RuleTableV6Proxy,
274    pub ndp_option_watcher_provider: fnet_ndp::RouterAdvertisementOptionWatcherProviderProxy,
275    pub socket_diagnostics: fnet_sockets::DiagnosticsProxy,
276    pub socket_control: fnet_sockets::ControlProxy,
277    pub neighbors_view: fnet_neighbor::ViewProxy,
278    pub neighbors_controller: fnet_neighbor::ControllerProxy,
279}
280
281impl NetlinkWorkerDiscoverableProtocols {
282    fn from_environment() -> Self {
283        let root_interfaces = connect_to_protocol::<fnet_root::InterfacesMarker>()
284            .expect("connect to fuchsia.net.root.Interfaces");
285        let interfaces_state = connect_to_protocol::<fnet_interfaces::StateMarker>()
286            .expect("connect to fuchsia.net.interfaces.State");
287        let v4_routes_state =
288            connect_to_protocol::<<Ipv4 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
289                .expect("connect to fuchsia.net.routes.StateV4");
290        let v6_routes_state =
291            connect_to_protocol::<<Ipv6 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
292                .expect("connect to fuchsia.net.routes.StateV6");
293        let v4_main_route_table = connect_to_protocol::<
294            <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
295        >()
296        .expect("connect to fuchsia.net.routes.admin.RouteTableV4");
297        let v6_main_route_table = connect_to_protocol::<
298            <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
299        >()
300        .expect("connect to fuchsia.net.routes.admin.RouteTableV6");
301        let v4_route_table_provider = connect_to_protocol::<
302            <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
303        >()
304        .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV4");
305        let v6_route_table_provider = connect_to_protocol::<
306            <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
307        >()
308        .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV6");
309        let v4_rule_table = connect_to_protocol::<
310            <Ipv4 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
311        >()
312        .expect("connect to fuchsia.net.routes.admin.RuleTableV4");
313        let v6_rule_table = connect_to_protocol::<
314            <Ipv6 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
315        >()
316        .expect("connect to fuchsia.net.routes.admin.RuleTableV6");
317        let ndp_option_watcher_provider =
318            connect_to_protocol::<fnet_ndp::RouterAdvertisementOptionWatcherProviderMarker>()
319                .expect("connect to fuchsia.net.ndp.RouterAdvertisementOptionWatcherProvider");
320        let socket_diagnostics = connect_to_protocol::<fnet_sockets::DiagnosticsMarker>()
321            .expect("connect to fuchsia.net.sockets.Diagnostics");
322        let socket_control = connect_to_protocol::<fnet_sockets::ControlMarker>()
323            .expect("connect to fuchsia.net.sockets.Control");
324        let neighbors_view = connect_to_protocol::<fnet_neighbor::ViewMarker>()
325            .expect("connect to fuchsia.net.neighbor.View");
326        let neighbors_controller = connect_to_protocol::<fnet_neighbor::ControllerMarker>()
327            .expect("connect to fuchsia.net.neighbor.Controller");
328
329        Self {
330            root_interfaces,
331            interfaces_state,
332            v4_routes_state,
333            v6_routes_state,
334            v4_main_route_table,
335            v6_main_route_table,
336            v4_route_table_provider,
337            v6_route_table_provider,
338            v4_rule_table,
339            v6_rule_table,
340            ndp_option_watcher_provider,
341            socket_diagnostics,
342            socket_control,
343            neighbors_view,
344            neighbors_controller,
345        }
346    }
347}
348
349/// The worker encompassing all asynchronous Netlink work.
350///
351/// The worker is never expected to complete.
352///
353/// `protocols` is taken as a closure because we need to avoid creating asynchronous FIDL proxies
354/// until an executor is running, so it's helpful to defer creation until the event loop starts
355/// running.
356///
357/// # Panics
358///
359/// Panics if a non-recoverable error is encountered by the worker. For example,
360/// a FIDL error on one of the FIDL connections with the netstack.
361pub async fn run_netlink_worker<H: interfaces::InterfacesHandler, C: NetlinkContext>(
362    params: NetlinkWorkerParams<H, C>,
363    access_control: C::AccessControl<'_>,
364) {
365    run_netlink_worker_with_protocols(
366        params,
367        NetlinkWorkerDiscoverableProtocols::from_environment(),
368        None,
369        access_control,
370    )
371    .await;
372}
373
374/// Same as `run_netlink_worker()`, but allows to pass custom
375/// `NetlinkWorkerDiscoverableProtocols`.
376pub async fn run_netlink_worker_with_protocols<
377    H: interfaces::InterfacesHandler,
378    C: NetlinkContext,
379>(
380    params: NetlinkWorkerParams<H, C>,
381    protocols: NetlinkWorkerDiscoverableProtocols,
382    on_route_initialized: Option<oneshot::Sender<()>>,
383    access_control: C::AccessControl<'_>,
384) {
385    let NetlinkWorkerParams {
386        interfaces_handler,
387        route_client_receiver,
388        route_async_work_receiver,
389        sock_diag_client_receiver,
390        sock_diag_async_work_receiver,
391    } = params;
392
393    let NetlinkWorkerDiscoverableProtocols {
394        root_interfaces,
395        interfaces_state,
396        v4_routes_state,
397        v6_routes_state,
398        v4_main_route_table,
399        v6_main_route_table,
400        v4_route_table_provider,
401        v6_route_table_provider,
402        v4_rule_table,
403        v6_rule_table,
404        ndp_option_watcher_provider,
405        socket_diagnostics,
406        socket_control,
407        neighbors_view,
408        neighbors_controller,
409    } = protocols;
410
411    let route_clients = ClientTable::default();
412    let (route_request_sink, route_request_stream) = mpsc::channel(1);
413
414    let route_event_loop = {
415        let route_clients = route_clients.clone();
416        async move {
417            let event_loop: RouteEventLoop<H, C::Sender<_>> = RouteEventLoop {
418                interfaces_proxy: root_interfaces,
419                interfaces_state_proxy: interfaces_state,
420                v4_routes_state,
421                v6_routes_state,
422                v4_main_route_table,
423                v6_main_route_table,
424                v4_route_table_provider,
425                v6_route_table_provider,
426                v4_rule_table,
427                v6_rule_table,
428                ndp_option_watcher_provider,
429                neighbors_view,
430                neighbors_controller,
431                route_clients,
432                request_stream: route_request_stream,
433                interfaces_handler,
434                async_work_receiver: route_async_work_receiver,
435            };
436
437            event_loop.run(on_route_initialized).await;
438        }
439    };
440
441    let route_client_receiver_loop = {
442        let access_control = access_control.clone();
443        async move {
444            // Accept new NETLINK_ROUTE clients.
445            connect_new_clients::<C, NetlinkRoute>(
446                route_clients,
447                route_client_receiver,
448                NetlinkRouteRequestHandler { unified_request_sink: route_request_sink },
449                access_control,
450            )
451            .await;
452            panic!("route_client_receiver stream unexpectedly finished");
453        }
454    };
455
456    let sock_diag_clients = ClientTable::default();
457    let (sock_diag_request_sink, sock_diag_request_stream) = mpsc::channel(1);
458
459    let sock_diag_event_loop = async move {
460        SockDiagEventLoop {
461            socket_diagnostics,
462            socket_control,
463            request_stream: sock_diag_request_stream,
464            async_work_receiver: sock_diag_async_work_receiver,
465        }
466        .run()
467        .await;
468    };
469
470    let sock_diag_client_receiver_loop = async move {
471        // Accept new NETLINK_SOCK_DIAG clients.
472        connect_new_clients::<C, NetlinkSockDiag>(
473            sock_diag_clients,
474            sock_diag_client_receiver,
475            NetlinkSockDiagRequestHandler { sock_diag_request_sink },
476            access_control,
477        )
478        .await;
479        panic!("sock_diag_client_receiver stream unexpectedly finished");
480    };
481
482    futures::future::join4(
483        route_event_loop,
484        route_client_receiver_loop,
485        sock_diag_event_loop,
486        sock_diag_client_receiver_loop,
487    )
488    .await;
489}
490
491/// Receives clients from the given receiver, adding them to the given table.
492///
493/// A "Request Handler" Task will be spawned for each received client. The given
494/// `request_handler_impl` defines how the requests will be handled.
495async fn connect_new_clients<C: NetlinkContext, F: ProtocolFamily>(
496    client_table: ClientTable<F, C::Sender<F::Response>>,
497    client_receiver: UnboundedReceiver<ClientWithReceiver<C, F>>,
498    request_handler_impl: F::RequestHandler<C::Sender<F::Response>>,
499    access_control: C::AccessControl<'_>,
500) {
501    client_receiver
502        // Drive each client concurrently with `for_each_concurrent`.
503        .for_each_concurrent(None, async |ClientWithReceiver { client, receiver }| {
504            client_table.add_client(client.clone());
505            let client = run_client_request_handler::<C, F>(
506                client,
507                receiver,
508                request_handler_impl.clone(),
509                access_control.clone(),
510            )
511            .await;
512            client_table.remove_client(client);
513        })
514        .await;
515}
516
517/// Reads messages from the `receiver` and handles them using the `handler`.
518///
519/// The task terminates when the underlying `Receiver` closes, yielding the
520/// original client.
521async fn run_client_request_handler<C: NetlinkContext, F: ProtocolFamily>(
522    client: InternalClient<F, C::Sender<F::Response>>,
523    receiver: C::Receiver<F::Request>,
524    handler: F::RequestHandler<C::Sender<F::Response>>,
525    access_control: C::AccessControl<'_>,
526) -> InternalClient<F, C::Sender<F::Response>> {
527    // State needed to handle an individual request, that is cycled through the
528    // `fold` combinator below.
529    struct FoldState<C, H, P> {
530        client: C,
531        handler: H,
532        access_control: P,
533    }
534
535    // Use `fold` for two reasons. First, it processes requests serially,
536    // ensuring requests are handled in order. Second, it allows us to
537    // "hand-off" the client/handler from one request to the other, avoiding
538    // copies for each request.
539    let FoldState { client, handler: _, access_control: _ } = receiver
540        .fold(
541            FoldState { client, handler, access_control },
542            |FoldState { mut client, mut handler, access_control }, req| async {
543                match req.validate_creds_and_get_message(&access_control) {
544                    Ok(req) => {
545                        log_debug!("{} Received request: {:?}", client, req);
546                        handler.handle_request(req, &mut client).await
547                    }
548                    Err(e) => {
549                        match &e {
550                            ValidationError::Parse(e) => {
551                                log_warn!("{client} failed to parse netlink message: {e:?}");
552                            }
553                            p @ ValidationError::Permission { .. } => {
554                                log_debug!("{client} permission check failed {p:?}")
555                            }
556                        }
557                        if let Some(rsp) = e.into_error_message() {
558                            client.send_unicast(rsp)
559                        }
560                    }
561                }
562                FoldState { client, handler, access_control }
563            },
564        )
565        .await;
566
567    client
568}
569
570#[cfg(test)]
571mod tests {
572    use super::*;
573    use fuchsia_async as fasync;
574    use futures::FutureExt as _;
575
576    use assert_matches::assert_matches;
577    use netlink_packet_core::{ErrorMessage, NetlinkPayload};
578    use std::num::NonZeroI32;
579    use std::pin::pin;
580
581    use crate::messaging::NetlinkMessageWithCreds;
582    use crate::messaging::testutil::{FakeCreds, SentMessage, TestNetlinkContext};
583    use crate::protocol_family::testutil::{
584        FakeNetlinkRequestHandler, FakeProtocolFamily, new_fake_netlink_message,
585        new_fake_netlink_message_with_creds,
586    };
587
588    #[fasync::run_singlethreaded(test)]
589    async fn test_run_client_request_handler() {
590        let (mut req_sender, req_receiver) = mpsc::channel(0);
591        let (mut client_sink, client, async_work_drain_task) =
592            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
593                crate::client::testutil::CLIENT_ID_1,
594                std::iter::empty(),
595            );
596        let join_handle = fasync::Task::spawn(async_work_drain_task);
597
598        {
599            let mut client_task = pin!(
600                run_client_request_handler::<TestNetlinkContext, FakeProtocolFamily>(
601                    client,
602                    req_receiver,
603                    FakeNetlinkRequestHandler,
604                    Default::default()
605                )
606                .fuse()
607            );
608
609            assert_matches!((&mut client_task).now_or_never(), None);
610            assert_eq!(&client_sink.take_messages()[..], &[]);
611
612            // Send a message and expect to see the response on the `client_sink`.
613            // NB: Use the sender's channel size as a synchronization method; If a
614            // second message could be sent, the first *must* have been handled.
615            req_sender
616                .try_send(new_fake_netlink_message_with_creds())
617                .expect("should send without error");
618            let mut could_send_fut =
619                pin!(futures::future::poll_fn(|ctx| req_sender.poll_ready(ctx)).fuse());
620            futures::select!(
621                res = could_send_fut => res.expect("should be able to send without error"),
622                _client = client_task => panic!("client task unexpectedly finished"),
623            );
624            assert_eq!(
625                &client_sink.take_messages()[..],
626                &[SentMessage::unicast(new_fake_netlink_message())]
627            );
628
629            // Close the sender, and expect the Task to exit.
630            req_sender.close_channel();
631            let _client = client_task.await;
632            assert_eq!(&client_sink.take_messages()[..], &[]);
633        }
634        join_handle.await;
635    }
636
637    #[fasync::run_singlethreaded(test)]
638    async fn test_connect_new_clients() {
639        let client_table = ClientTable::default();
640        let scope = fasync::Scope::new();
641        let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
642        let mut client_acceptor_fut = Box::pin(
643            connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
644                client_table.clone(),
645                client_receiver,
646                FakeNetlinkRequestHandler,
647                Default::default(),
648            )
649            .fuse(),
650        );
651
652        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
653
654        // Connect Client 1.
655        let (mut _client_sink1, client1, async_work_drain_task) =
656            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
657                crate::client::testutil::CLIENT_ID_1,
658                std::iter::empty(),
659            );
660        let _join_handle = scope.spawn(async_work_drain_task);
661        let (mut req_sender1, req_receiver1) = mpsc::channel(0);
662        client_sender
663            .unbounded_send(ClientWithReceiver { client: client1, receiver: req_receiver1 })
664            .expect("should send without error");
665
666        // Connect Client 2.
667        let (mut client_sink2, client2, async_work_drain_task) =
668            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
669                crate::client::testutil::CLIENT_ID_2,
670                std::iter::empty(),
671            );
672        let _join_handle = scope.spawn(async_work_drain_task);
673        let (mut req_sender2, req_receiver2) = mpsc::channel(0);
674        client_sender
675            .unbounded_send(ClientWithReceiver { client: client2, receiver: req_receiver2 })
676            .expect("should send without error");
677
678        // Send a request to Client 2, and verify it's handled despite Client 1
679        // being open (e.g. concurrent handling of requests across clients).
680        // NB: Use the sender's channel size as a synchronization method; If a
681        // second message could be sent, the first *must* have been handled.
682        req_sender2
683            .try_send(new_fake_netlink_message_with_creds())
684            .expect("should send without error");
685        let mut could_send_fut =
686            pin!(futures::future::poll_fn(|ctx| req_sender2.poll_ready(ctx)).fuse());
687        futures::select!(
688            res = could_send_fut => res.expect("should be able to send without error"),
689            () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
690        );
691        assert_eq!(
692            &client_table.client_ids()[..],
693            [client::testutil::CLIENT_ID_1, client::testutil::CLIENT_ID_2]
694        );
695        assert_eq!(
696            &client_sink2.take_messages()[..],
697            &[SentMessage::unicast(new_fake_netlink_message())]
698        );
699
700        // Close the two clients, and verify the acceptor fut is still pending.
701        req_sender1.close_channel();
702        req_sender2.close_channel();
703        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
704
705        // Close the client_sender, and verify the acceptor fut finishes.
706        client_sender.close_channel();
707        client_acceptor_fut.await;
708
709        // Confirm the clients have been cleaned up from the client table.
710        assert_eq!(&client_table.client_ids()[..], []);
711
712        drop(client_table);
713        scope.join().await;
714    }
715
716    #[fasync::run_singlethreaded(test)]
717    async fn test_permissions() {
718        let client_table = ClientTable::default();
719        let scope = fasync::Scope::new();
720        let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
721        let mut client_acceptor_fut = Box::pin(
722            connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
723                client_table.clone(),
724                client_receiver,
725                FakeNetlinkRequestHandler,
726                Default::default(),
727            )
728            .fuse(),
729        );
730        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
731
732        let (mut client_sink, client, async_work_drain_task) =
733            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
734                crate::client::testutil::CLIENT_ID_1,
735                std::iter::empty(),
736            );
737        let _join_handle = scope.spawn(async_work_drain_task);
738        let (mut req_sender, req_receiver) = mpsc::channel(0);
739        client_sender
740            .unbounded_send(ClientWithReceiver { client, receiver: req_receiver })
741            .expect("should send without error");
742
743        let message = NetlinkMessageWithCreds::new(
744            new_fake_netlink_message(),
745            FakeCreds::with_error(Errno::new(libc::EPERM).unwrap()),
746        );
747        req_sender.try_send(message).expect("should send without error");
748
749        let response = futures::select!(
750            res = client_sink.next_message().fuse() => res,
751            () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
752        );
753
754        assert_matches!(
755            response.message.payload,
756            NetlinkPayload::Error(ErrorMessage { code: Some(error_code), .. }) => {
757              assert_eq!(error_code , NonZeroI32::new(-libc::EPERM).unwrap());
758            }
759        );
760    }
761}