Skip to main content

fidl_next_protocol/endpoints/
server.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! FIDL protocol servers.
6
7use core::future::Future;
8use core::mem::ManuallyDrop;
9use core::num::NonZeroU32;
10use core::pin::Pin;
11use core::ptr;
12use core::task::{Context, Poll};
13
14use fidl_next_codec::{
15    AsDecoder as _, DecoderExt as _, Encode, EncodeError, EncoderExt as _, Wire,
16};
17use pin_project::pin_project;
18
19use crate::concurrency::sync::Arc;
20use crate::concurrency::sync::atomic::{AtomicI64, Ordering};
21use crate::endpoints::connection::{Connection, SendFutureOutput, SendFutureState};
22use crate::wire::MessageHeader;
23use crate::{Body, Flexibility, ProtocolError, SendFuture, Transport};
24
25struct ServerInner<T: Transport> {
26    connection: Connection<T>,
27    epitaph: AtomicI64,
28}
29
30impl<T: Transport> ServerInner<T> {
31    const EPITAPH_NONE: i64 = i64::MAX;
32
33    fn new(shared: T::Shared) -> Self {
34        Self { connection: Connection::new(shared), epitaph: AtomicI64::new(Self::EPITAPH_NONE) }
35    }
36
37    fn close_with_epitaph(&self, epitaph: Option<i32>) {
38        if let Some(epitaph) = epitaph {
39            self.epitaph.store(epitaph as i64, Ordering::Relaxed);
40        }
41        self.connection.stop();
42    }
43
44    fn epitaph(&self) -> Option<i32> {
45        let epitaph = self.epitaph.load(Ordering::Relaxed);
46        if epitaph != Self::EPITAPH_NONE { Some(epitaph as i32) } else { None }
47    }
48}
49
50/// A server endpoint.
51pub struct Server<T: Transport> {
52    inner: Arc<ServerInner<T>>,
53}
54
55impl<T: Transport> Server<T> {
56    /// Closes the channel from the server end.
57    pub fn close(&self) {
58        self.inner.close_with_epitaph(None);
59    }
60
61    /// Closes the channel from the server end after sending an epitaph message.
62    pub fn close_with_epitaph(&self, epitaph: i32) {
63        self.inner.close_with_epitaph(Some(epitaph));
64    }
65
66    /// Send an event.
67    pub fn send_event<W>(
68        &self,
69        ordinal: u64,
70        flexibility: Flexibility,
71        event: impl Encode<W, T::SendBuffer>,
72    ) -> Result<SendFuture<'_, T>, EncodeError>
73    where
74        W: Wire<Constraint = ()>,
75    {
76        self.inner.connection.send_message(|buffer| {
77            buffer.encode_next(MessageHeader::new(0, ordinal, flexibility))?;
78            buffer.encode_next(event)
79        })
80    }
81}
82
83impl<T: Transport> Clone for Server<T> {
84    fn clone(&self) -> Self {
85        Self { inner: self.inner.clone() }
86    }
87}
88
89/// A type which handles incoming events for a server.
90///
91/// The futures returned by `on_one_way` and `on_two_way` are required to be `Send`. See
92/// `LocalServerHandler` for a version of this trait which does not require the returned futures to
93/// be `Send`.
94pub trait ServerHandler<T: Transport>: Send {
95    /// Handles a received one-way server message.
96    ///
97    /// The client cannot handle more messages until `on_one_way` completes. If
98    /// `on_one_way` may block, or would perform asynchronous work that takes a
99    /// long time, it should offload work to an async task and return.
100    fn on_one_way(
101        &mut self,
102        ordinal: u64,
103        flexibility: Flexibility,
104        body: Body<T>,
105    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
106
107    /// Handles a received two-way server message.
108    ///
109    /// The client cannot handle more messages until `on_two_way` completes. If
110    /// `on_two_way` may block, or would perform asynchronous work that takes a
111    /// long time, it should offload work to an async task and return.
112    fn on_two_way(
113        &mut self,
114        ordinal: u64,
115        flexibility: Flexibility,
116        body: Body<T>,
117        responder: Responder<T>,
118    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
119}
120
121/// A type which handles incoming events for a local server.
122///
123/// This is a variant of [`ServerHandler`] that does not require implementing
124/// `Send` and only supports local-thread executors.
125pub trait LocalServerHandler<T: Transport> {
126    /// Handles a received one-way server message.
127    ///
128    /// See [`ServerHandler::on_one_way`] for more information.
129    fn on_one_way(
130        &mut self,
131        ordinal: u64,
132        flexibility: Flexibility,
133        body: Body<T>,
134    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>>;
135
136    /// Handles a received two-way server message.
137    ///
138    /// See [`ServerHandler::on_two_way`] for more information.
139    fn on_two_way(
140        &mut self,
141        ordinal: u64,
142        flexibility: Flexibility,
143        body: Body<T>,
144        responder: Responder<T>,
145    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>>;
146}
147
148/// An adapter for a [`ServerHandler`] which implements [`LocalServerHandler`].
149#[repr(transparent)]
150pub struct ServerHandlerToLocalAdapter<H>(H);
151
152impl<T, H> LocalServerHandler<T> for ServerHandlerToLocalAdapter<H>
153where
154    T: Transport,
155    H: ServerHandler<T>,
156{
157    #[inline]
158    fn on_one_way(
159        &mut self,
160        ordinal: u64,
161        flexibility: Flexibility,
162        body: Body<T>,
163    ) -> impl Future<Output = Result<(), ProtocolError<<T as Transport>::Error>>> {
164        self.0.on_one_way(ordinal, flexibility, body)
165    }
166
167    #[inline]
168    fn on_two_way(
169        &mut self,
170        ordinal: u64,
171        flexibility: Flexibility,
172        body: Body<T>,
173        responder: Responder<T>,
174    ) -> impl Future<Output = Result<(), ProtocolError<<T as Transport>::Error>>> {
175        self.0.on_two_way(ordinal, flexibility, body, responder)
176    }
177}
178
179/// A dispatcher for a server endpoint.
180///
181/// A server dispatcher receives all of the incoming requests and dispatches them to the server
182/// handler. It acts as the message pump for the server.
183///
184/// The dispatcher must be actively polled to receive requests. If the dispatcher is not
185/// [`run`](ServerDispatcher::run), then requests will not be received.
186pub struct ServerDispatcher<T: Transport> {
187    inner: Arc<ServerInner<T>>,
188    exclusive: T::Exclusive,
189    is_terminated: bool,
190}
191
192impl<T: Transport> Drop for ServerDispatcher<T> {
193    fn drop(&mut self) {
194        if !self.is_terminated {
195            // SAFETY: We checked that the connection has not been terminated.
196            unsafe {
197                self.inner.connection.terminate(ProtocolError::Stopped);
198            }
199        }
200    }
201}
202
203impl<T: Transport> ServerDispatcher<T> {
204    /// Creates a new server from a transport.
205    pub fn new(transport: T) -> Self {
206        let (shared, exclusive) = transport.split();
207        Self { inner: Arc::new(ServerInner::new(shared)), exclusive, is_terminated: false }
208    }
209
210    /// Returns the dispatcher's server.
211    pub fn server(&self) -> Server<T> {
212        Server { inner: self.inner.clone() }
213    }
214
215    /// Runs the server with the provided handler.
216    pub async fn run<H>(self, handler: H) -> Result<H, ProtocolError<T::Error>>
217    where
218        H: ServerHandler<T>,
219    {
220        // The bounds on `H` prove that the future returned by `run_local` is
221        // `Send`.
222        self.run_local(ServerHandlerToLocalAdapter(handler)).await.map(|adapter| adapter.0)
223    }
224
225    /// Runs the server with the provided local handler.
226    pub async fn run_local<H>(mut self, mut handler: H) -> Result<H, ProtocolError<T::Error>>
227    where
228        H: LocalServerHandler<T>,
229    {
230        // We may assume that the connection has not been terminated because
231        // connections are only terminated by `run` and `drop`. Neither of those
232        // could have been called before this method because `run` consumes
233        // `self` and `drop` is only ever called once.
234
235        let error = loop {
236            // SAFETY: The connection has not been terminated.
237            let result = unsafe { self.run_one(&mut handler).await };
238            if let Err(error) = result {
239                break error;
240            }
241        };
242
243        // If we closed locally, we may have an epitaph to send before
244        // terminating the connection.
245        if matches!(error, ProtocolError::Stopped)
246            && let Some(epitaph) = self.inner.epitaph()
247        {
248            // Note that we don't care whether sending the epitaph succeeds
249            // or fails; it's best-effort.
250
251            // SAFETY: The connection has not been terminated.
252            let _ = unsafe { self.inner.connection.send_epitaph(epitaph).await };
253        }
254
255        // SAFETY: The connection has not been terminated.
256        unsafe {
257            self.inner.connection.terminate(error.clone());
258        }
259        self.is_terminated = true;
260
261        match error {
262            // We consider servers to have finished successfully if they stop
263            // themselves manually, or if the client disconnects.
264            ProtocolError::Stopped | ProtocolError::PeerClosed => Ok(handler),
265
266            // Otherwise, the server finished with an error.
267            _ => Err(error),
268        }
269    }
270
271    /// # Safety
272    ///
273    /// The connection must not be terminated.
274    async unsafe fn run_one<H>(&mut self, handler: &mut H) -> Result<(), ProtocolError<T::Error>>
275    where
276        H: LocalServerHandler<T>,
277    {
278        // SAFETY: The caller guaranteed that the connection is not terminated.
279        let mut buffer = unsafe { self.inner.connection.recv(&mut self.exclusive).await? };
280
281        let header = buffer
282            .as_decoder()
283            .decode_prefix::<MessageHeader>()
284            .map_err(ProtocolError::InvalidMessageHeader)?;
285
286        if let Some(txid) = NonZeroU32::new(*header.txid) {
287            let responder = Responder { server: self.server(), txid };
288            handler
289                .on_two_way(*header.ordinal, header.flexibility(), Body::new(buffer), responder)
290                .await?;
291        } else {
292            handler.on_one_way(*header.ordinal, header.flexibility(), Body::new(buffer)).await?;
293        }
294
295        Ok(())
296    }
297}
298
299/// A responder for a two-way message.
300#[must_use = "responders close the underlying FIDL connection when dropped"]
301pub struct Responder<T: Transport> {
302    server: Server<T>,
303    txid: NonZeroU32,
304}
305
306impl<T: Transport> Drop for Responder<T> {
307    fn drop(&mut self) {
308        self.server.close();
309    }
310}
311
312impl<T: Transport> Responder<T> {
313    /// Send a response to a two-way message.
314    pub fn respond<W>(
315        self,
316        ordinal: u64,
317        flexibility: Flexibility,
318        response: impl Encode<W, T::SendBuffer>,
319    ) -> Result<RespondFuture<T>, EncodeError>
320    where
321        W: Wire<Constraint = ()>,
322    {
323        let state = self.server.inner.connection.send_message_raw(|buffer| {
324            buffer.encode_next(MessageHeader::new(self.txid.get(), ordinal, flexibility))?;
325            buffer.encode_next(response)
326        })?;
327
328        let this = ManuallyDrop::new(self);
329        // SAFETY: `this` is a `ManuallyDrop` and so `server` won't be dropped
330        // twice.
331        let server = unsafe { ptr::read(&this.server) };
332
333        Ok(RespondFuture { server, state })
334    }
335}
336
337/// A future which responds to a request over a connection.
338#[must_use = "futures do nothing unless polled"]
339#[pin_project]
340pub struct RespondFuture<T: Transport> {
341    server: Server<T>,
342    #[pin]
343    state: SendFutureState<T>,
344}
345
346impl<T: Transport> Future for RespondFuture<T> {
347    type Output = SendFutureOutput<T>;
348
349    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
350        let this = self.project();
351
352        this.state.poll_send(cx, &this.server.inner.connection)
353    }
354}