fidl_next_util/
multiserver.rs1use core::cell::RefCell;
6use core::marker::PhantomData;
7
8use futures::channel::mpsc;
9use futures::stream::FuturesUnordered;
10use futures::{StreamExt as _, select_biased};
11use thiserror::Error;
12
13use fidl_next_bind::{
14 DispatchLocalServerMessage, DispatchServerMessage, HasTransport, ServerEnd,
15 ServerHandlerToProtocolAdapter,
16};
17use fidl_next_protocol::{
18 LocalServerHandler, Message, ProtocolError, Responder, ServerDispatcher,
19 ServerHandlerToLocalAdapter, Transport,
20};
21
22#[derive(Debug, Error)]
24pub enum MultiserverError {
25 #[error("failed to forward transport to multiserver dispatcher")]
27 ForwardError,
28}
29
30pub struct Multiserver<P, T: Transport = <P as HasTransport>::Transport> {
54 sender: mpsc::UnboundedSender<ServerEnd<P, T>>,
55 _protocol: PhantomData<P>,
56}
57
58impl<P, T: Transport> Clone for Multiserver<P, T> {
59 fn clone(&self) -> Self {
60 Self { sender: self.sender.clone(), _protocol: PhantomData }
61 }
62}
63
64impl<P, T: Transport> Multiserver<P, T> {
65 pub fn forward(&self, server_end: ServerEnd<P, T>) -> Result<(), MultiserverError> {
67 self.sender.unbounded_send(server_end).map_err(|_| MultiserverError::ForwardError)
68 }
69}
70
71struct RefCellServerHandler<'a, H> {
72 handler: &'a RefCell<H>,
73}
74
75impl<H, T: Transport> LocalServerHandler<T> for RefCellServerHandler<'_, H>
76where
77 H: LocalServerHandler<T>,
78{
79 async fn on_one_way(&mut self, message: Message<T>) -> Result<(), ProtocolError<T::Error>> {
80 self.handler.borrow_mut().on_one_way(message).await
81 }
82
83 async fn on_two_way(
84 &mut self,
85 message: Message<T>,
86 responder: Responder<T>,
87 ) -> Result<(), ProtocolError<T::Error>> {
88 self.handler.borrow_mut().on_two_way(message, responder).await
89 }
90}
91
92pub struct MultiserverDispatcher<P, T: Transport = <P as HasTransport>::Transport> {
97 receiver: mpsc::UnboundedReceiver<ServerEnd<P, T>>,
98 _protocol: PhantomData<P>,
99}
100
101pub fn multiserver<P, T: Transport>() -> (Multiserver<P, T>, MultiserverDispatcher<P, T>) {
105 let (sender, receiver) = mpsc::unbounded();
106 (
107 Multiserver { sender, _protocol: PhantomData },
108 MultiserverDispatcher { receiver, _protocol: PhantomData },
109 )
110}
111
112impl<P, T: Transport> MultiserverDispatcher<P, T> {
113 pub async fn run<H>(self, handler: H)
117 where
118 P: DispatchServerMessage<H, T>,
119 H: Send,
120 {
121 self.run_inner(ServerHandlerToLocalAdapter(
122 ServerHandlerToProtocolAdapter::<P, H>::from_untyped(handler),
123 ))
124 .await
125 }
126
127 pub async fn run_local<H>(self, handler: H)
131 where
132 P: DispatchLocalServerMessage<H, T>,
133 {
134 self.run_inner(ServerHandlerToProtocolAdapter::<P, H>::from_untyped(handler)).await
135 }
136
137 async fn run_inner<H>(mut self, handler: H)
138 where
139 H: LocalServerHandler<T>,
140 {
141 let handler = RefCell::new(handler);
142 let mut futures = FuturesUnordered::new();
143
144 let mut is_closed = false;
145 loop {
146 select_biased! {
147 transport = self.receiver.next() => {
148 if let Some(transport) = transport {
149 let dispatcher = ServerDispatcher::new(transport.into_untyped());
150 futures.push(dispatcher.run_local(RefCellServerHandler {
151 handler: &handler,
152 }));
153 } else {
154 is_closed = true;
155 if futures.is_empty() {
156 break;
157 }
158 }
159 }
160 output = futures.next() => {
161 if output.is_none() && is_closed {
162 break;
163 }
164 }
165 }
166 }
167 }
168}