fidl_next_protocol/endpoints/
server.rs1use core::future::Future;
8use core::mem::ManuallyDrop;
9use core::num::NonZeroU32;
10use core::pin::Pin;
11use core::ptr;
12use core::task::{Context, Poll};
13
14use fidl_next_codec::{Constrained, Encode, EncodeError, EncoderExt as _, Wire};
15use pin_project::pin_project;
16
17use crate::concurrency::sync::Arc;
18use crate::concurrency::sync::atomic::{AtomicI64, Ordering};
19use crate::endpoints::connection::{Connection, SendFutureOutput, SendFutureState};
20use crate::{Flexibility, ProtocolError, SendFuture, Transport, decode_header, encode_header};
21
22struct ServerInner<T: Transport> {
23    connection: Connection<T>,
24    epitaph: AtomicI64,
25}
26
27impl<T: Transport> ServerInner<T> {
28    const EPITAPH_NONE: i64 = i64::MAX;
29
30    fn new(shared: T::Shared) -> Self {
31        Self { connection: Connection::new(shared), epitaph: AtomicI64::new(Self::EPITAPH_NONE) }
32    }
33
34    fn close_with_epitaph(&self, epitaph: Option<i32>) {
35        if let Some(epitaph) = epitaph {
36            self.epitaph.store(epitaph as i64, Ordering::Relaxed);
37        }
38        self.connection.stop();
39    }
40
41    fn epitaph(&self) -> Option<i32> {
42        let epitaph = self.epitaph.load(Ordering::Relaxed);
43        if epitaph != Self::EPITAPH_NONE { Some(epitaph as i32) } else { None }
44    }
45}
46
47pub struct Server<T: Transport> {
49    inner: Arc<ServerInner<T>>,
50}
51
52impl<T: Transport> Server<T> {
53    pub fn close(&self) {
55        self.inner.close_with_epitaph(None);
56    }
57
58    pub fn close_with_epitaph(&self, epitaph: i32) {
60        self.inner.close_with_epitaph(Some(epitaph));
61    }
62
63    pub fn send_event<W>(
65        &self,
66        ordinal: u64,
67        flexibility: Flexibility,
68        event: impl Encode<W, T::SendBuffer>,
69    ) -> Result<SendFuture<'_, T>, EncodeError>
70    where
71        W: Constrained<Constraint = ()> + Wire,
72    {
73        self.inner.connection.send_message(|buffer| {
74            encode_header::<T>(buffer, 0, ordinal, flexibility)?;
75            buffer.encode_next(event, ())
76        })
77    }
78}
79
80impl<T: Transport> Clone for Server<T> {
81    fn clone(&self) -> Self {
82        Self { inner: self.inner.clone() }
83    }
84}
85
86pub trait ServerHandler<T: Transport> {
92    fn on_one_way(
98        &mut self,
99        ordinal: u64,
100        flexibility: Flexibility,
101        buffer: T::RecvBuffer,
102    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
103
104    fn on_two_way(
110        &mut self,
111        ordinal: u64,
112        flexibility: Flexibility,
113        buffer: T::RecvBuffer,
114        responder: Responder<T>,
115    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
116}
117
118pub struct ServerDispatcher<T: Transport> {
126    inner: Arc<ServerInner<T>>,
127    exclusive: T::Exclusive,
128    is_terminated: bool,
129}
130
131impl<T: Transport> Drop for ServerDispatcher<T> {
132    fn drop(&mut self) {
133        if !self.is_terminated {
134            unsafe {
136                self.inner.connection.terminate(ProtocolError::Stopped);
137            }
138        }
139    }
140}
141
142impl<T: Transport> ServerDispatcher<T> {
143    pub fn new(transport: T) -> Self {
145        let (shared, exclusive) = transport.split();
146        Self { inner: Arc::new(ServerInner::new(shared)), exclusive, is_terminated: false }
147    }
148
149    pub fn server(&self) -> Server<T> {
151        Server { inner: self.inner.clone() }
152    }
153
154    pub async fn run<H>(mut self, mut handler: H) -> Result<H, ProtocolError<T::Error>>
156    where
157        H: ServerHandler<T>,
158    {
159        let error = loop {
165            let result = unsafe { self.run_one(&mut handler).await };
167            if let Err(error) = result {
168                break error;
169            }
170        };
171
172        if matches!(error, ProtocolError::Stopped) {
175            if let Some(epitaph) = self.inner.epitaph() {
176                let _ = unsafe { self.inner.connection.send_epitaph(epitaph).await };
181            }
182        }
183
184        unsafe {
186            self.inner.connection.terminate(error.clone());
187        }
188        self.is_terminated = true;
189
190        match error {
191            ProtocolError::Stopped | ProtocolError::PeerClosed => Ok(handler),
194
195            _ => Err(error),
197        }
198    }
199
200    async unsafe fn run_one<H>(&mut self, handler: &mut H) -> Result<(), ProtocolError<T::Error>>
204    where
205        H: ServerHandler<T>,
206    {
207        let mut buffer = unsafe { self.inner.connection.recv(&mut self.exclusive).await? };
209
210        let (txid, ordinal, flexibility) =
211            decode_header::<T>(&mut buffer).map_err(ProtocolError::InvalidMessageHeader)?;
212        if let Some(txid) = NonZeroU32::new(txid) {
213            let responder = Responder { server: self.server(), txid };
214            handler.on_two_way(ordinal, flexibility, buffer, responder).await?;
215        } else {
216            handler.on_one_way(ordinal, flexibility, buffer).await?;
217        }
218
219        Ok(())
220    }
221}
222
223#[must_use = "responders close the underlying FIDL connection when dropped"]
225pub struct Responder<T: Transport> {
226    server: Server<T>,
227    txid: NonZeroU32,
228}
229
230impl<T: Transport> Drop for Responder<T> {
231    fn drop(&mut self) {
232        self.server.close();
233    }
234}
235
236impl<T: Transport> Responder<T> {
237    pub fn respond<W>(
239        self,
240        ordinal: u64,
241        flexibility: Flexibility,
242        response: impl Encode<W, T::SendBuffer>,
243    ) -> Result<RespondFuture<T>, EncodeError>
244    where
245        W: Constrained<Constraint = ()> + Wire,
246    {
247        let state = self.server.inner.connection.send_message_raw(|buffer| {
248            encode_header::<T>(buffer, self.txid.get(), ordinal, flexibility)?;
249            buffer.encode_next(response, ())
250        })?;
251
252        let this = ManuallyDrop::new(self);
253        let server = unsafe { ptr::read(&this.server) };
256
257        Ok(RespondFuture { server, state })
258    }
259}
260
261#[must_use = "futures do nothing unless polled"]
263#[pin_project]
264pub struct RespondFuture<T: Transport> {
265    server: Server<T>,
266    #[pin]
267    state: SendFutureState<T>,
268}
269
270impl<T: Transport> Future for RespondFuture<T> {
271    type Output = SendFutureOutput<T>;
272
273    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
274        let this = self.project();
275
276        this.state.poll_send(cx, &this.server.inner.connection)
277    }
278}