Skip to main content

fidl_next_util/
multiserver.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use core::cell::RefCell;
6use core::marker::PhantomData;
7
8use futures::channel::mpsc;
9use futures::stream::FuturesUnordered;
10use futures::{StreamExt as _, select_biased};
11use thiserror::Error;
12
13use fidl_next_bind::{
14    DispatchLocalServerMessage, DispatchServerMessage, HasTransport, ServerEnd,
15    ServerHandlerToProtocolAdapter,
16};
17use fidl_next_protocol::{
18    LocalServerHandler, Message, ProtocolError, Responder, ServerDispatcher,
19    ServerHandlerToLocalAdapter, Transport,
20};
21
22/// An error that can occur while using a multiserver.
23#[derive(Debug, Error)]
24pub enum MultiserverError {
25    /// Failed to forward a transport to the multiserver dispatcher.
26    #[error("failed to forward transport to multiserver dispatcher")]
27    ForwardError,
28}
29
30/// A server that handles incoming messages from multiple transports.
31///
32/// Multiservers are useful when you want to use a single server handler to
33/// handle messages from multiple served transports. They coalesce all of the
34/// incoming requests into a single conceptual stream, and invoke the handler on
35/// each one serially.
36///
37/// [`Multiserver`] is a handle that can forward server ends to its
38/// [`MultiserverDispatcher`]. The dispatcher is what handles each of the
39/// incoming messages. Call [`multiserver`] to create a handle and dispatcher
40/// pair.
41///
42/// # Example
43///
44/// ```ignore
45/// let (server, dispatcher) = multiserver();
46///
47/// spawn(dispatcher.run(my_handler));
48///
49/// while let Some(server_end) = incoming.next().await {
50///     server.forward(server_end);
51/// }
52/// ```
53pub struct Multiserver<P, T: Transport = <P as HasTransport>::Transport> {
54    sender: mpsc::UnboundedSender<ServerEnd<P, T>>,
55    _protocol: PhantomData<P>,
56}
57
58impl<P, T: Transport> Clone for Multiserver<P, T> {
59    fn clone(&self) -> Self {
60        Self { sender: self.sender.clone(), _protocol: PhantomData }
61    }
62}
63
64impl<P, T: Transport> Multiserver<P, T> {
65    /// Forwards a new transport to the multiserver dispatcher.
66    pub fn forward(&self, server_end: ServerEnd<P, T>) -> Result<(), MultiserverError> {
67        self.sender.unbounded_send(server_end).map_err(|_| MultiserverError::ForwardError)
68    }
69}
70
71struct RefCellServerHandler<'a, H> {
72    handler: &'a RefCell<H>,
73}
74
75impl<H, T: Transport> LocalServerHandler<T> for RefCellServerHandler<'_, H>
76where
77    H: LocalServerHandler<T>,
78{
79    async fn on_one_way(&mut self, message: Message<T>) -> Result<(), ProtocolError<T::Error>> {
80        self.handler.borrow_mut().on_one_way(message).await
81    }
82
83    async fn on_two_way(
84        &mut self,
85        message: Message<T>,
86        responder: Responder<T>,
87    ) -> Result<(), ProtocolError<T::Error>> {
88        self.handler.borrow_mut().on_two_way(message, responder).await
89    }
90}
91
92/// A dispatcher for a multiserver.
93///
94/// A multiserver runs servers for multiple transmports using the same handler.
95/// See [`Multiserver`] for usage details.
96pub struct MultiserverDispatcher<P, T: Transport = <P as HasTransport>::Transport> {
97    receiver: mpsc::UnboundedReceiver<ServerEnd<P, T>>,
98    _protocol: PhantomData<P>,
99}
100
101/// Creates a new multiserver and dispatcher pair.
102///
103/// See [`Multiserver`] for usage details.
104pub fn multiserver<P, T: Transport>() -> (Multiserver<P, T>, MultiserverDispatcher<P, T>) {
105    let (sender, receiver) = mpsc::unbounded();
106    (
107        Multiserver { sender, _protocol: PhantomData },
108        MultiserverDispatcher { receiver, _protocol: PhantomData },
109    )
110}
111
112impl<P, T: Transport> MultiserverDispatcher<P, T> {
113    /// Runs the multiserver with the provided handler.
114    ///
115    /// The handler will be called with messages from multiple transports.
116    pub async fn run<H>(self, handler: H)
117    where
118        P: DispatchServerMessage<H, T>,
119        H: Send,
120    {
121        self.run_inner(ServerHandlerToLocalAdapter(
122            ServerHandlerToProtocolAdapter::<P, H>::from_untyped(handler),
123        ))
124        .await
125    }
126
127    /// Runs the multiserver with the provided local handler.
128    ///
129    /// The handler will be called with messages from multiple transports.
130    pub async fn run_local<H>(self, handler: H)
131    where
132        P: DispatchLocalServerMessage<H, T>,
133    {
134        self.run_inner(ServerHandlerToProtocolAdapter::<P, H>::from_untyped(handler)).await
135    }
136
137    async fn run_inner<H>(mut self, handler: H)
138    where
139        H: LocalServerHandler<T>,
140    {
141        let handler = RefCell::new(handler);
142        let mut futures = FuturesUnordered::new();
143
144        let mut is_closed = false;
145        loop {
146            select_biased! {
147                transport = self.receiver.next() => {
148                    if let Some(transport) = transport {
149                        let dispatcher = ServerDispatcher::new(transport.into_untyped());
150                        futures.push(dispatcher.run_local(RefCellServerHandler {
151                            handler: &handler,
152                        }));
153                    } else {
154                        is_closed = true;
155                        if futures.is_empty() {
156                            break;
157                        }
158                    }
159                }
160                output = futures.next() => {
161                    if output.is_none() && is_closed {
162                        break;
163                    }
164                }
165            }
166        }
167    }
168}