netlink/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! An implementation of Linux's Netlink API for Fuchsia.
6//!
7//! Netlink is a socket-based API provided by Linux that user space applications
8//! can use to interact with the kernel. The API is split up into several
9//! protocol families each offering different functionality. This crate targets
10//! the implementation of families related to networking.
11
12#![warn(missing_docs, unused)]
13
14mod client;
15pub mod interfaces;
16pub(crate) mod logging;
17pub mod messaging;
18pub mod multicast_groups;
19mod nduseropt;
20pub mod neighbors;
21mod netlink_packet;
22pub mod protocol_family;
23pub(crate) mod route_eventloop;
24pub(crate) mod route_tables;
25pub mod routes;
26mod rules;
27pub(crate) mod util;
28
29use std::num::NonZeroU64;
30
31use fuchsia_component::client::connect_to_protocol;
32use futures::StreamExt as _;
33use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
34use futures::channel::oneshot;
35use net_types::ip::{Ipv4, Ipv6};
36use netlink_packet_route::RouteNetlinkMessage;
37use netlink_packet_sock_diag::{SockDiagRequest, SockDiagResponse};
38use protocol_family::route::NetlinkRouteNotifiedGroup;
39use {
40    fidl_fuchsia_net_interfaces as fnet_interfaces, fidl_fuchsia_net_ndp as fnet_ndp,
41    fidl_fuchsia_net_neighbor as fnet_neighbor, fidl_fuchsia_net_root as fnet_root,
42    fidl_fuchsia_net_routes as fnet_routes, fidl_fuchsia_net_routes_admin as fnet_routes_admin,
43    fidl_fuchsia_net_routes_ext as fnet_routes_ext, fidl_fuchsia_net_sockets as fnet_sockets,
44};
45
46use crate::client::{AsyncWorkItem, ClientIdGenerator, ClientTable, InternalClient};
47use crate::logging::{log_debug, log_warn};
48use crate::messaging::{NetlinkContext, UnvalidatedNetlinkMessage as _, ValidationError};
49pub use crate::netlink_packet::errno::Errno;
50use crate::protocol_family::route::{
51    NetlinkRoute, NetlinkRouteClient, NetlinkRouteRequestHandler, RouteAsyncWork,
52};
53use crate::protocol_family::sock_diag::{
54    NetlinkSockDiag, NetlinkSockDiagClient, NetlinkSockDiagRequestHandler, SockDiagEventLoop,
55};
56use crate::protocol_family::{NetlinkFamilyRequestHandler as _, ProtocolFamily};
57use crate::route_eventloop::RouteEventLoop;
58
59/// The tag added to all logs generated by this crate.
60pub const NETLINK_LOG_TAG: &'static str = "netlink";
61
62/// Selects the interface for the sysctl.
63#[derive(Debug, Clone, Copy)]
64pub enum SysctlInterfaceSelector {
65    /// "all" interfaces.
66    ///
67    /// This is supposed to change all interfaces' settings, but this is a
68    /// lie for most of the sysctls, they have no effect at all when written.
69    All,
70    /// "default" interface, all interface created after this write will inherit the value.
71    Default,
72    /// The id of the interface to change.
73    Id(NonZeroU64),
74}
75
76/// The implementation of the Netlink protocol suite.
77pub struct Netlink<C: NetlinkContext> {
78    /// Generator of new Client IDs.
79    id_generator: ClientIdGenerator,
80    /// Sender to attach new `NETLINK_ROUTE` clients to the Netlink worker.
81    route_client_sender: UnboundedSender<ClientWithReceiver<C, NetlinkRoute>>,
82    /// Sender to send other async work items to the Netlink worker.
83    route_async_work_sink: mpsc::UnboundedSender<AsyncWorkItem<NetlinkRoute>>,
84    /// Sender to attach new `NETLINK_SOCK_DIAG` clients to the Netlink worker.
85    sock_diag_client_sender: UnboundedSender<ClientWithReceiver<C, NetlinkSockDiag>>,
86    /// Sender to send other async work items to the Netlink worker.
87    sock_diag_async_work_sink: mpsc::UnboundedSender<AsyncWorkItem<NetlinkSockDiag>>,
88}
89
90impl<C: NetlinkContext> Netlink<C> {
91    /// Returns a newly instantiated [`Netlink`] and parameters used to start the
92    /// asynchronous worker.
93    ///
94    /// Caller is expected to run the worker by calling `run_netlink_worker()`.
95    pub fn new<H: interfaces::InterfacesHandler>(
96        interfaces_handler: H,
97    ) -> (Self, NetlinkWorkerParams<H, C>) {
98        let (route_client_sender, route_client_receiver) = mpsc::unbounded();
99        let (route_async_work_sink, async_work_receiver) = mpsc::unbounded();
100        let (sock_diag_client_sender, sock_diag_client_receiver) = mpsc::unbounded();
101        let (sock_diag_async_work_sink, sock_diag_async_work_receiver) = mpsc::unbounded();
102        (
103            Netlink {
104                id_generator: ClientIdGenerator::default(),
105                route_client_sender,
106                sock_diag_client_sender,
107                route_async_work_sink,
108                sock_diag_async_work_sink,
109            },
110            NetlinkWorkerParams {
111                interfaces_handler,
112                route_client_receiver,
113                route_async_work_receiver: async_work_receiver,
114                sock_diag_client_receiver,
115                sock_diag_async_work_receiver,
116            },
117        )
118    }
119
120    /// Writes the accept_ra_rt_table sysctl for the selected interface.
121    pub fn write_accept_ra_rt_table(
122        &self,
123        interface: SysctlInterfaceSelector,
124        value: i32,
125    ) -> Result<(), SysctlError> {
126        let (responder, receiver) = oneshot_sync::channel();
127        self.route_async_work_sink
128            .unbounded_send(AsyncWorkItem::Inner(RouteAsyncWork::SetAcceptRaRtTable {
129                interface,
130                value: value.into(),
131                responder,
132            }))
133            .map_err(|_| SysctlError::Disconnected)?;
134        receiver.receive().map_err(|_| SysctlError::Disconnected)?
135    }
136
137    /// Reads the accept_ra_rt_table sysctl for the selected interface.
138    pub fn read_accept_ra_rt_table(
139        &self,
140        interface: SysctlInterfaceSelector,
141    ) -> Result<i32, SysctlError> {
142        let (responder, receiver) = oneshot_sync::channel();
143        self.route_async_work_sink
144            .unbounded_send(AsyncWorkItem::Inner(RouteAsyncWork::GetAcceptRaRtTable {
145                interface,
146                responder,
147            }))
148            .map_err(|_| SysctlError::Disconnected)?;
149        Ok(receiver.receive().map_err(|_| SysctlError::Disconnected)??.into())
150    }
151
152    /// Creates a new client of the `NETLINK_ROUTE` protocol family.
153    ///
154    /// `sender` is used by Netlink to send messages to the client.
155    /// `receiver` is used by Netlink to receive messages from the client.
156    ///
157    /// Closing the `receiver` will close this client, disconnecting `sender`.
158    pub fn new_route_client(
159        &self,
160        sender: C::Sender<RouteNetlinkMessage>,
161        receiver: C::Receiver<RouteNetlinkMessage>,
162    ) -> Result<NetlinkRouteClient, NewClientError> {
163        let Netlink {
164            id_generator,
165            route_client_sender,
166            route_async_work_sink,
167            sock_diag_client_sender: _,
168            sock_diag_async_work_sink: _,
169        } = self;
170        let (external_client, internal_client) = client::new_client_pair::<NetlinkRoute, _>(
171            id_generator.new_id(),
172            sender,
173            route_async_work_sink.clone(),
174        );
175        route_client_sender
176            .unbounded_send(ClientWithReceiver { client: internal_client, receiver })
177            .map_err(|e| {
178                // Sending on an `UnboundedSender` can never fail with `is_full()`.
179                debug_assert!(e.is_disconnected());
180                NewClientError::Disconnected
181            })?;
182        Ok(NetlinkRouteClient(external_client))
183    }
184
185    /// Creates a new client of the `NETLINK_SOCK_DIAG` protocol family.
186    ///
187    /// `sender` is used by Netlink to send messages to the client.
188    /// `receiver` is used by Netlink to receive messages from the client.
189    ///
190    /// Closing the `receiver` will close this client, disconnecting `sender`.
191    pub fn new_sock_diag_client(
192        &self,
193        sender: C::Sender<SockDiagResponse>,
194        receiver: C::Receiver<SockDiagRequest>,
195    ) -> Result<NetlinkSockDiagClient, NewClientError> {
196        let Netlink {
197            id_generator,
198            route_client_sender: _,
199            route_async_work_sink: _,
200            sock_diag_client_sender,
201            sock_diag_async_work_sink,
202        } = self;
203        let (external_client, internal_client) = client::new_client_pair::<NetlinkSockDiag, _>(
204            id_generator.new_id(),
205            sender,
206            sock_diag_async_work_sink.clone(),
207        );
208        sock_diag_client_sender
209            .unbounded_send(ClientWithReceiver { client: internal_client, receiver })
210            .map_err(|e| {
211                // Sending on an `UnboundedSender` can never fail with `is_full()`.
212                debug_assert!(e.is_disconnected());
213                NewClientError::Disconnected
214            })?;
215        Ok(NetlinkSockDiagClient(external_client))
216    }
217}
218
219/// A wrapper to hold an [`InternalClient`], and its [`Receiver`] of requests.
220struct ClientWithReceiver<C: NetlinkContext, F: ProtocolFamily> {
221    client: InternalClient<F, C::Sender<F::Response>>,
222    receiver: C::Receiver<F::Request>,
223}
224
225/// The possible error types when instantiating a new client.
226#[derive(Debug)]
227pub enum NewClientError {
228    /// The [`Netlink`] is disconnected from its associated worker, perhaps as a
229    /// result of dropping the worker.
230    Disconnected,
231}
232
233/// The possible error types when trying to access a sysctl.
234#[derive(Debug)]
235pub enum SysctlError {
236    /// The [`Netlink`] is disconnected from its associated worker.
237    Disconnected,
238    /// The interface went away.
239    NoInterface,
240    /// The written value requests for an unsupported operation.
241    Unsupported,
242}
243
244/// Parameters used to start the Netlink asynchronous worker.
245pub struct NetlinkWorkerParams<H, C: NetlinkContext> {
246    interfaces_handler: H,
247    /// Receiver of newly created `NETLINK_ROUTE` clients.
248    route_client_receiver: UnboundedReceiver<ClientWithReceiver<C, NetlinkRoute>>,
249    route_async_work_receiver:
250        futures::channel::mpsc::UnboundedReceiver<AsyncWorkItem<NetlinkRoute>>,
251    /// Receiver of newly created `NETLINK_SOCK_DIAG` clients.
252    #[allow(dead_code)]
253    sock_diag_client_receiver: UnboundedReceiver<ClientWithReceiver<C, NetlinkSockDiag>>,
254    #[allow(dead_code)]
255    sock_diag_async_work_receiver:
256        futures::channel::mpsc::UnboundedReceiver<AsyncWorkItem<NetlinkSockDiag>>,
257}
258
259/// All of the protocols that the netlink worker connects to.
260#[allow(missing_docs)]
261pub struct NetlinkWorkerDiscoverableProtocols {
262    pub root_interfaces: fnet_root::InterfacesProxy,
263    pub interfaces_state: fnet_interfaces::StateProxy,
264    pub v4_routes_state: fnet_routes::StateV4Proxy,
265    pub v6_routes_state: fnet_routes::StateV6Proxy,
266    pub v4_main_route_table: fnet_routes_admin::RouteTableV4Proxy,
267    pub v6_main_route_table: fnet_routes_admin::RouteTableV6Proxy,
268    pub v4_route_table_provider: fnet_routes_admin::RouteTableProviderV4Proxy,
269    pub v6_route_table_provider: fnet_routes_admin::RouteTableProviderV6Proxy,
270    pub v4_rule_table: fnet_routes_admin::RuleTableV4Proxy,
271    pub v6_rule_table: fnet_routes_admin::RuleTableV6Proxy,
272    pub ndp_option_watcher_provider: fnet_ndp::RouterAdvertisementOptionWatcherProviderProxy,
273    pub socket_diagnostics: fnet_sockets::DiagnosticsProxy,
274    pub socket_control: fnet_sockets::ControlProxy,
275    pub neighbors_view: fnet_neighbor::ViewProxy,
276}
277
278impl NetlinkWorkerDiscoverableProtocols {
279    fn from_environment() -> Self {
280        let root_interfaces = connect_to_protocol::<fnet_root::InterfacesMarker>()
281            .expect("connect to fuchsia.net.root.Interfaces");
282        let interfaces_state = connect_to_protocol::<fnet_interfaces::StateMarker>()
283            .expect("connect to fuchsia.net.interfaces.State");
284        let v4_routes_state =
285            connect_to_protocol::<<Ipv4 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
286                .expect("connect to fuchsia.net.routes.StateV4");
287        let v6_routes_state =
288            connect_to_protocol::<<Ipv6 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
289                .expect("connect to fuchsia.net.routes.StateV6");
290        let v4_main_route_table = connect_to_protocol::<
291            <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
292        >()
293        .expect("connect to fuchsia.net.routes.admin.RouteTableV4");
294        let v6_main_route_table = connect_to_protocol::<
295            <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
296        >()
297        .expect("connect to fuchsia.net.routes.admin.RouteTableV6");
298        let v4_route_table_provider = connect_to_protocol::<
299            <Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
300        >()
301        .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV4");
302        let v6_route_table_provider = connect_to_protocol::<
303            <Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableProviderMarker,
304        >()
305        .expect("connect to fuchsia.net.routes.admin.RouteTableProviderV6");
306        let v4_rule_table = connect_to_protocol::<
307            <Ipv4 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
308        >()
309        .expect("connect to fuchsia.net.routes.admin.RuleTableV4");
310        let v6_rule_table = connect_to_protocol::<
311            <Ipv6 as fnet_routes_ext::rules::FidlRuleAdminIpExt>::RuleTableMarker,
312        >()
313        .expect("connect to fuchsia.net.routes.admin.RuleTableV6");
314        let ndp_option_watcher_provider =
315            connect_to_protocol::<fnet_ndp::RouterAdvertisementOptionWatcherProviderMarker>()
316                .expect("connect to fuchsia.net.ndp.RouterAdvertisementOptionWatcherProvider");
317        let socket_diagnostics = connect_to_protocol::<fnet_sockets::DiagnosticsMarker>()
318            .expect("connect to fuchsia.net.sockets.Diagnostics");
319        let socket_control = connect_to_protocol::<fnet_sockets::ControlMarker>()
320            .expect("connect to fuchsia.net.sockets.Control");
321        let neighbors_view = connect_to_protocol::<fnet_neighbor::ViewMarker>()
322            .expect("connect to fuchsia.net.neighbor.View");
323
324        Self {
325            root_interfaces,
326            interfaces_state,
327            v4_routes_state,
328            v6_routes_state,
329            v4_main_route_table,
330            v6_main_route_table,
331            v4_route_table_provider,
332            v6_route_table_provider,
333            v4_rule_table,
334            v6_rule_table,
335            ndp_option_watcher_provider,
336            socket_diagnostics,
337            socket_control,
338            neighbors_view,
339        }
340    }
341}
342
343/// The worker encompassing all asynchronous Netlink work.
344///
345/// The worker is never expected to complete.
346///
347/// `protocols` is taken as a closure because we need to avoid creating asynchronous FIDL proxies
348/// until an executor is running, so it's helpful to defer creation until the event loop starts
349/// running.
350///
351/// # Panics
352///
353/// Panics if a non-recoverable error is encountered by the worker. For example,
354/// a FIDL error on one of the FIDL connections with the netstack.
355pub async fn run_netlink_worker<H: interfaces::InterfacesHandler, C: NetlinkContext>(
356    params: NetlinkWorkerParams<H, C>,
357    access_control: C::AccessControl<'_>,
358) {
359    run_netlink_worker_with_protocols(
360        params,
361        NetlinkWorkerDiscoverableProtocols::from_environment(),
362        None,
363        access_control,
364    )
365    .await;
366}
367
368/// Same as `run_netlink_worker()`, but allows to pass custom
369/// `NetlinkWorkerDiscoverableProtocols`.
370pub async fn run_netlink_worker_with_protocols<
371    H: interfaces::InterfacesHandler,
372    C: NetlinkContext,
373>(
374    params: NetlinkWorkerParams<H, C>,
375    protocols: NetlinkWorkerDiscoverableProtocols,
376    on_route_initialized: Option<oneshot::Sender<()>>,
377    access_control: C::AccessControl<'_>,
378) {
379    let NetlinkWorkerParams {
380        interfaces_handler,
381        route_client_receiver,
382        route_async_work_receiver,
383        sock_diag_client_receiver,
384        sock_diag_async_work_receiver,
385    } = params;
386
387    let NetlinkWorkerDiscoverableProtocols {
388        root_interfaces,
389        interfaces_state,
390        v4_routes_state,
391        v6_routes_state,
392        v4_main_route_table,
393        v6_main_route_table,
394        v4_route_table_provider,
395        v6_route_table_provider,
396        v4_rule_table,
397        v6_rule_table,
398        ndp_option_watcher_provider,
399        socket_diagnostics,
400        socket_control,
401        neighbors_view,
402    } = protocols;
403
404    let route_clients = ClientTable::default();
405    let (route_request_sink, route_request_stream) = mpsc::channel(1);
406
407    let route_event_loop = {
408        let route_clients = route_clients.clone();
409        async move {
410            let event_loop: RouteEventLoop<H, C::Sender<_>> = RouteEventLoop {
411                interfaces_proxy: root_interfaces,
412                interfaces_state_proxy: interfaces_state,
413                v4_routes_state,
414                v6_routes_state,
415                v4_main_route_table,
416                v6_main_route_table,
417                v4_route_table_provider,
418                v6_route_table_provider,
419                v4_rule_table,
420                v6_rule_table,
421                ndp_option_watcher_provider,
422                neighbors_view,
423                route_clients,
424                request_stream: route_request_stream,
425                interfaces_handler,
426                async_work_receiver: route_async_work_receiver,
427            };
428
429            event_loop.run(on_route_initialized).await;
430        }
431    };
432
433    let route_client_receiver_loop = {
434        let access_control = access_control.clone();
435        async move {
436            // Accept new NETLINK_ROUTE clients.
437            connect_new_clients::<C, NetlinkRoute>(
438                route_clients,
439                route_client_receiver,
440                NetlinkRouteRequestHandler { unified_request_sink: route_request_sink },
441                access_control,
442            )
443            .await;
444            panic!("route_client_receiver stream unexpectedly finished");
445        }
446    };
447
448    let sock_diag_clients = ClientTable::default();
449    let (sock_diag_request_sink, sock_diag_request_stream) = mpsc::channel(1);
450
451    let sock_diag_event_loop = async move {
452        SockDiagEventLoop {
453            socket_diagnostics,
454            socket_control,
455            request_stream: sock_diag_request_stream,
456            async_work_receiver: sock_diag_async_work_receiver,
457        }
458        .run()
459        .await;
460    };
461
462    let sock_diag_client_receiver_loop = async move {
463        // Accept new NETLINK_SOCK_DIAG clients.
464        connect_new_clients::<C, NetlinkSockDiag>(
465            sock_diag_clients,
466            sock_diag_client_receiver,
467            NetlinkSockDiagRequestHandler { sock_diag_request_sink },
468            access_control,
469        )
470        .await;
471        panic!("sock_diag_client_receiver stream unexpectedly finished");
472    };
473
474    futures::future::join4(
475        route_event_loop,
476        route_client_receiver_loop,
477        sock_diag_event_loop,
478        sock_diag_client_receiver_loop,
479    )
480    .await;
481}
482
483/// Receives clients from the given receiver, adding them to the given table.
484///
485/// A "Request Handler" Task will be spawned for each received client. The given
486/// `request_handler_impl` defines how the requests will be handled.
487async fn connect_new_clients<C: NetlinkContext, F: ProtocolFamily>(
488    client_table: ClientTable<F, C::Sender<F::Response>>,
489    client_receiver: UnboundedReceiver<ClientWithReceiver<C, F>>,
490    request_handler_impl: F::RequestHandler<C::Sender<F::Response>>,
491    access_control: C::AccessControl<'_>,
492) {
493    client_receiver
494        // Drive each client concurrently with `for_each_concurrent`.
495        .for_each_concurrent(None, async |ClientWithReceiver { client, receiver }| {
496            client_table.add_client(client.clone());
497            let client = run_client_request_handler::<C, F>(
498                client,
499                receiver,
500                request_handler_impl.clone(),
501                access_control.clone(),
502            )
503            .await;
504            client_table.remove_client(client);
505        })
506        .await;
507}
508
509/// Reads messages from the `receiver` and handles them using the `handler`.
510///
511/// The task terminates when the underlying `Receiver` closes, yielding the
512/// original client.
513async fn run_client_request_handler<C: NetlinkContext, F: ProtocolFamily>(
514    client: InternalClient<F, C::Sender<F::Response>>,
515    receiver: C::Receiver<F::Request>,
516    handler: F::RequestHandler<C::Sender<F::Response>>,
517    access_control: C::AccessControl<'_>,
518) -> InternalClient<F, C::Sender<F::Response>> {
519    // State needed to handle an individual request, that is cycled through the
520    // `fold` combinator below.
521    struct FoldState<C, H, P> {
522        client: C,
523        handler: H,
524        access_control: P,
525    }
526
527    // Use `fold` for two reasons. First, it processes requests serially,
528    // ensuring requests are handled in order. Second, it allows us to
529    // "hand-off" the client/handler from one request to the other, avoiding
530    // copies for each request.
531    let FoldState { client, handler: _, access_control: _ } = receiver
532        .fold(
533            FoldState { client, handler, access_control },
534            |FoldState { mut client, mut handler, access_control }, req| async {
535                match req.validate_creds_and_get_message(&access_control) {
536                    Ok(req) => {
537                        log_debug!("{} Received request: {:?}", client, req);
538                        handler.handle_request(req, &mut client).await
539                    }
540                    Err(e) => {
541                        match &e {
542                            ValidationError::Parse(e) => {
543                                log_warn!("{client} failed to parse netlink message: {e:?}");
544                            }
545                            p @ ValidationError::Permission { .. } => {
546                                log_debug!("{client} permission check failed {p:?}")
547                            }
548                        }
549                        if let Some(rsp) = e.into_error_message() {
550                            client.send_unicast(rsp)
551                        }
552                    }
553                }
554                FoldState { client, handler, access_control }
555            },
556        )
557        .await;
558
559    client
560}
561
562#[cfg(test)]
563mod tests {
564    use super::*;
565    use fuchsia_async as fasync;
566    use futures::FutureExt as _;
567
568    use assert_matches::assert_matches;
569    use netlink_packet_core::{ErrorMessage, NetlinkPayload};
570    use std::num::NonZeroI32;
571    use std::pin::pin;
572
573    use crate::messaging::NetlinkMessageWithCreds;
574    use crate::messaging::testutil::{FakeCreds, SentMessage, TestNetlinkContext};
575    use crate::protocol_family::testutil::{
576        FakeNetlinkRequestHandler, FakeProtocolFamily, new_fake_netlink_message,
577        new_fake_netlink_message_with_creds,
578    };
579
580    #[fasync::run_singlethreaded(test)]
581    async fn test_run_client_request_handler() {
582        let (mut req_sender, req_receiver) = mpsc::channel(0);
583        let (mut client_sink, client, async_work_drain_task) =
584            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
585                crate::client::testutil::CLIENT_ID_1,
586                std::iter::empty(),
587            );
588        let join_handle = fasync::Task::spawn(async_work_drain_task);
589
590        {
591            let mut client_task = pin!(
592                run_client_request_handler::<TestNetlinkContext, FakeProtocolFamily>(
593                    client,
594                    req_receiver,
595                    FakeNetlinkRequestHandler,
596                    Default::default()
597                )
598                .fuse()
599            );
600
601            assert_matches!((&mut client_task).now_or_never(), None);
602            assert_eq!(&client_sink.take_messages()[..], &[]);
603
604            // Send a message and expect to see the response on the `client_sink`.
605            // NB: Use the sender's channel size as a synchronization method; If a
606            // second message could be sent, the first *must* have been handled.
607            req_sender
608                .try_send(new_fake_netlink_message_with_creds())
609                .expect("should send without error");
610            let mut could_send_fut =
611                pin!(futures::future::poll_fn(|ctx| req_sender.poll_ready(ctx)).fuse());
612            futures::select!(
613                res = could_send_fut => res.expect("should be able to send without error"),
614                _client = client_task => panic!("client task unexpectedly finished"),
615            );
616            assert_eq!(
617                &client_sink.take_messages()[..],
618                &[SentMessage::unicast(new_fake_netlink_message())]
619            );
620
621            // Close the sender, and expect the Task to exit.
622            req_sender.close_channel();
623            let _client = client_task.await;
624            assert_eq!(&client_sink.take_messages()[..], &[]);
625        }
626        join_handle.await;
627    }
628
629    #[fasync::run_singlethreaded(test)]
630    async fn test_connect_new_clients() {
631        let client_table = ClientTable::default();
632        let scope = fasync::Scope::new();
633        let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
634        let mut client_acceptor_fut = Box::pin(
635            connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
636                client_table.clone(),
637                client_receiver,
638                FakeNetlinkRequestHandler,
639                Default::default(),
640            )
641            .fuse(),
642        );
643
644        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
645
646        // Connect Client 1.
647        let (mut _client_sink1, client1, async_work_drain_task) =
648            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
649                crate::client::testutil::CLIENT_ID_1,
650                std::iter::empty(),
651            );
652        let _join_handle = scope.spawn(async_work_drain_task);
653        let (mut req_sender1, req_receiver1) = mpsc::channel(0);
654        client_sender
655            .unbounded_send(ClientWithReceiver { client: client1, receiver: req_receiver1 })
656            .expect("should send without error");
657
658        // Connect Client 2.
659        let (mut client_sink2, client2, async_work_drain_task) =
660            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
661                crate::client::testutil::CLIENT_ID_2,
662                std::iter::empty(),
663            );
664        let _join_handle = scope.spawn(async_work_drain_task);
665        let (mut req_sender2, req_receiver2) = mpsc::channel(0);
666        client_sender
667            .unbounded_send(ClientWithReceiver { client: client2, receiver: req_receiver2 })
668            .expect("should send without error");
669
670        // Send a request to Client 2, and verify it's handled despite Client 1
671        // being open (e.g. concurrent handling of requests across clients).
672        // NB: Use the sender's channel size as a synchronization method; If a
673        // second message could be sent, the first *must* have been handled.
674        req_sender2
675            .try_send(new_fake_netlink_message_with_creds())
676            .expect("should send without error");
677        let mut could_send_fut =
678            pin!(futures::future::poll_fn(|ctx| req_sender2.poll_ready(ctx)).fuse());
679        futures::select!(
680            res = could_send_fut => res.expect("should be able to send without error"),
681            () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
682        );
683        assert_eq!(
684            &client_table.client_ids()[..],
685            [client::testutil::CLIENT_ID_1, client::testutil::CLIENT_ID_2]
686        );
687        assert_eq!(
688            &client_sink2.take_messages()[..],
689            &[SentMessage::unicast(new_fake_netlink_message())]
690        );
691
692        // Close the two clients, and verify the acceptor fut is still pending.
693        req_sender1.close_channel();
694        req_sender2.close_channel();
695        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
696
697        // Close the client_sender, and verify the acceptor fut finishes.
698        client_sender.close_channel();
699        client_acceptor_fut.await;
700
701        // Confirm the clients have been cleaned up from the client table.
702        assert_eq!(&client_table.client_ids()[..], []);
703
704        drop(client_table);
705        scope.join().await;
706    }
707
708    #[fasync::run_singlethreaded(test)]
709    async fn test_permissions() {
710        let client_table = ClientTable::default();
711        let scope = fasync::Scope::new();
712        let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
713        let mut client_acceptor_fut = Box::pin(
714            connect_new_clients::<TestNetlinkContext, FakeProtocolFamily>(
715                client_table.clone(),
716                client_receiver,
717                FakeNetlinkRequestHandler,
718                Default::default(),
719            )
720            .fuse(),
721        );
722        assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
723
724        let (mut client_sink, client, async_work_drain_task) =
725            crate::client::testutil::new_fake_client::<FakeProtocolFamily>(
726                crate::client::testutil::CLIENT_ID_1,
727                std::iter::empty(),
728            );
729        let _join_handle = scope.spawn(async_work_drain_task);
730        let (mut req_sender, req_receiver) = mpsc::channel(0);
731        client_sender
732            .unbounded_send(ClientWithReceiver { client, receiver: req_receiver })
733            .expect("should send without error");
734
735        let message = NetlinkMessageWithCreds::new(
736            new_fake_netlink_message(),
737            FakeCreds::with_error(Errno::new(libc::EPERM).unwrap()),
738        );
739        req_sender.try_send(message).expect("should send without error");
740
741        let response = futures::select!(
742            res = client_sink.next_message().fuse() => res,
743            () = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
744        );
745
746        assert_matches!(
747            response.message.payload,
748            NetlinkPayload::Error(ErrorMessage { code: Some(error_code), .. }) => {
749              assert_eq!(error_code , NonZeroI32::new(-libc::EPERM).unwrap());
750            }
751        );
752    }
753}