#![deny(missing_docs, unused)]
mod client;
mod errors;
pub(crate) mod eventloop;
pub mod interfaces;
pub(crate) mod logging;
pub mod messaging;
pub mod multicast_groups;
mod netlink_packet;
pub mod protocol_family;
mod routes;
mod rules;
pub(crate) mod util;
use fidl_fuchsia_net_interfaces as fnet_interfaces;
use fidl_fuchsia_net_root as fnet_root;
use fidl_fuchsia_net_routes_ext as fnet_routes_ext;
use fuchsia_async as fasync;
use fuchsia_component::client::connect_to_protocol;
use futures::{
channel::mpsc::{self, UnboundedReceiver, UnboundedSender},
future::Future,
FutureExt as _, StreamExt as _,
};
use net_types::ip::{Ipv4, Ipv6};
use netlink_packet_route::RouteNetlinkMessage;
use crate::{
client::{ClientIdGenerator, ClientTable, InternalClient},
eventloop::EventLoop,
logging::log_debug,
messaging::{Receiver, Sender, SenderReceiverProvider},
protocol_family::{
route::{NetlinkRoute, NetlinkRouteClient, NetlinkRouteRequestHandler},
NetlinkFamilyRequestHandler as _, ProtocolFamily,
},
};
pub const NETLINK_LOG_TAG: &'static str = "netlink";
pub struct Netlink<P: SenderReceiverProvider> {
id_generator: ClientIdGenerator,
route_client_sender: UnboundedSender<
ClientWithReceiver<
NetlinkRoute,
P::Sender<<NetlinkRoute as ProtocolFamily>::InnerMessage>,
P::Receiver<<NetlinkRoute as ProtocolFamily>::InnerMessage>,
>,
>,
}
impl<P: SenderReceiverProvider> Netlink<P> {
pub fn new<H: interfaces::InterfacesHandler>(
interfaces_handler: H,
) -> (Self, impl Future<Output = ()> + Send) {
let (route_client_sender, route_client_receiver) = mpsc::unbounded();
(
Netlink { id_generator: ClientIdGenerator::default(), route_client_sender },
run_netlink_worker(NetlinkWorkerParams::<_, P> {
interfaces_handler,
route_client_receiver,
}),
)
}
pub fn new_route_client(
&self,
sender: P::Sender<RouteNetlinkMessage>,
receiver: P::Receiver<RouteNetlinkMessage>,
) -> Result<NetlinkRouteClient, NewClientError> {
let Netlink { id_generator, route_client_sender } = self;
let (external_client, internal_client) =
client::new_client_pair::<NetlinkRoute, _>(id_generator.new_id(), sender);
route_client_sender
.unbounded_send(ClientWithReceiver { client: internal_client, receiver })
.map_err(|e| {
debug_assert!(e.is_disconnected());
NewClientError::Disconnected
})?;
Ok(NetlinkRouteClient(external_client))
}
}
struct ClientWithReceiver<
F: ProtocolFamily,
S: Sender<F::InnerMessage>,
R: Receiver<F::InnerMessage>,
> {
client: InternalClient<F, S>,
receiver: R,
}
pub enum NewClientError {
Disconnected,
}
struct NetlinkWorkerParams<H, P: SenderReceiverProvider> {
interfaces_handler: H,
route_client_receiver: UnboundedReceiver<
ClientWithReceiver<
NetlinkRoute,
P::Sender<<NetlinkRoute as ProtocolFamily>::InnerMessage>,
P::Receiver<<NetlinkRoute as ProtocolFamily>::InnerMessage>,
>,
>,
}
async fn run_netlink_worker<H: interfaces::InterfacesHandler, P: SenderReceiverProvider>(
params: NetlinkWorkerParams<H, P>,
) {
let NetlinkWorkerParams { interfaces_handler, route_client_receiver } = params;
let route_clients = ClientTable::default();
let (unified_request_sink, unified_request_stream) = mpsc::channel(1);
let unified_event_loop = fasync::Task::spawn({
let route_clients = route_clients.clone();
async move {
let interfaces_proxy = connect_to_protocol::<fnet_root::InterfacesMarker>()
.expect("connect to fuchsia.net.root.Interfaces");
let interfaces_state_proxy = connect_to_protocol::<fnet_interfaces::StateMarker>()
.expect("connect to fuchsia.net.interfaces");
let v4_routes_state =
connect_to_protocol::<<Ipv4 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
.expect("connect to fuchsia.net.routes");
let v6_routes_state =
connect_to_protocol::<<Ipv6 as fnet_routes_ext::FidlRouteIpExt>::StateMarker>()
.expect("connect to fuchsia.net.routes");
let v4_routes_set_provider = connect_to_protocol::<
<Ipv4 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
>()
.expect("connect to fuchsia.net.routes.admin");
let v6_routes_set_provider = connect_to_protocol::<
<Ipv6 as fnet_routes_ext::admin::FidlRouteAdminIpExt>::RouteTableMarker,
>()
.expect("connect to fuchsia.net.routes.admin");
let event_loop: EventLoop<H, P::Sender<_>> = EventLoop {
interfaces_proxy,
interfaces_state_proxy,
v4_routes_state,
v6_routes_state,
v4_routes_set_provider,
v6_routes_set_provider,
route_clients,
unified_request_stream,
interfaces_handler,
};
match event_loop.run().await {
Ok(never) => match never {},
Err(e) => panic!("error running event loop: {e:?}"),
}
}
});
let _: Vec<()> = futures::future::join_all([
{
let route_clients = route_clients.clone();
fasync::Task::spawn(async move {
connect_new_clients::<NetlinkRoute, _, _>(
route_clients,
route_client_receiver,
NetlinkRouteRequestHandler { unified_request_sink },
)
.await;
panic!("route_client_receiver stream unexpectedly finished")
})
},
unified_event_loop,
])
.await;
}
async fn connect_new_clients<
F: ProtocolFamily,
S: Sender<F::InnerMessage>,
R: Receiver<F::InnerMessage>,
>(
client_table: ClientTable<F, S>,
client_receiver: UnboundedReceiver<ClientWithReceiver<F, S, R>>,
request_handler_impl: F::RequestHandler<S>,
) {
client_receiver
.for_each_concurrent(None, |ClientWithReceiver { client, receiver }| {
client_table.add_client(client.clone());
spawn_client_request_handler::<F, S, R>(client, receiver, request_handler_impl.clone())
.then(|client| futures::future::ready(client_table.remove_client(client)))
})
.await
}
fn spawn_client_request_handler<
F: ProtocolFamily,
S: Sender<F::InnerMessage>,
R: Receiver<F::InnerMessage>,
>(
client: InternalClient<F, S>,
receiver: R,
handler: F::RequestHandler<S>,
) -> fasync::Task<InternalClient<F, S>> {
struct FoldState<C, H> {
client: C,
handler: H,
}
fasync::Task::spawn(
receiver
.fold(
FoldState { client, handler },
|FoldState { mut client, mut handler }, req| async {
log_debug!("{} Received request: {:?}", client, req);
handler.handle_request(req, &mut client).await;
FoldState { client, handler }
},
)
.map(|FoldState { client, handler: _ }: FoldState<_, _>| client),
)
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use std::pin::pin;
use crate::{
messaging::testutil::SentMessage,
protocol_family::testutil::{
new_fake_netlink_message, FakeNetlinkRequestHandler, FakeProtocolFamily,
},
};
#[fasync::run_singlethreaded(test)]
async fn test_spawn_client_request_handler() {
let (mut req_sender, req_receiver) = mpsc::channel(0);
let (mut client_sink, client) = crate::client::testutil::new_fake_client::<
FakeProtocolFamily,
>(crate::client::testutil::CLIENT_ID_1, &[]);
let mut client_task = pin!(spawn_client_request_handler::<FakeProtocolFamily, _, _>(
client,
req_receiver,
FakeNetlinkRequestHandler,
)
.fuse());
assert_matches!((&mut client_task).now_or_never(), None);
assert_eq!(&client_sink.take_messages()[..], &[]);
req_sender.try_send(new_fake_netlink_message()).expect("should send without error");
let mut could_send_fut =
pin!(futures::future::poll_fn(|ctx| req_sender.poll_ready(ctx)).fuse());
futures::select!(
res = could_send_fut => res.expect("should be able to send without error"),
_client = client_task => panic!("client task unexpectedly finished"),
);
assert_eq!(
&client_sink.take_messages()[..],
&[SentMessage::unicast(new_fake_netlink_message())]
);
req_sender.close_channel();
let _client = client_task.await;
assert_eq!(&client_sink.take_messages()[..], &[]);
}
#[fasync::run_singlethreaded(test)]
async fn test_connect_new_clients() {
let client_table = ClientTable::default();
let (client_sender, client_receiver) = futures::channel::mpsc::unbounded();
let mut client_acceptor_fut = Box::pin(
connect_new_clients::<FakeProtocolFamily, _, _>(
client_table.clone(),
client_receiver,
FakeNetlinkRequestHandler,
)
.fuse(),
);
assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
let (mut _client_sink1, client1) = crate::client::testutil::new_fake_client::<
FakeProtocolFamily,
>(crate::client::testutil::CLIENT_ID_1, &[]);
let (mut req_sender1, req_receiver1) = mpsc::channel(0);
client_sender
.unbounded_send(ClientWithReceiver { client: client1, receiver: req_receiver1 })
.expect("should send without error");
let (mut client_sink2, client2) = crate::client::testutil::new_fake_client::<
FakeProtocolFamily,
>(crate::client::testutil::CLIENT_ID_2, &[]);
let (mut req_sender2, req_receiver2) = mpsc::channel(0);
client_sender
.unbounded_send(ClientWithReceiver { client: client2, receiver: req_receiver2 })
.expect("should send without error");
req_sender2.try_send(new_fake_netlink_message()).expect("should send without error");
let mut could_send_fut =
pin!(futures::future::poll_fn(|ctx| req_sender2.poll_ready(ctx)).fuse());
futures::select!(
res = could_send_fut => res.expect("should be able to send without error"),
() = client_acceptor_fut => panic!("client acceptor unexpectedly finished"),
);
assert_eq!(
&client_table.client_ids()[..],
[client::testutil::CLIENT_ID_1, client::testutil::CLIENT_ID_2]
);
assert_eq!(
&client_sink2.take_messages()[..],
&[SentMessage::unicast(new_fake_netlink_message())]
);
req_sender1.close_channel();
req_sender2.close_channel();
assert_eq!((&mut client_acceptor_fut).now_or_never(), None);
client_sender.close_channel();
client_acceptor_fut.await;
assert_eq!(&client_table.client_ids()[..], []);
}
}