Skip to main content

fidl_next_protocol/endpoints/
server.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! FIDL protocol servers.
6
7use core::future::Future;
8use core::mem::ManuallyDrop;
9use core::num::NonZeroU32;
10use core::pin::Pin;
11use core::ptr;
12use core::task::{Context, Poll};
13
14use fidl_next_codec::{
15    AsDecoder as _, DecoderExt as _, Encode, EncodeError, EncoderExt as _, Wire,
16};
17use pin_project::pin_project;
18
19use crate::concurrency::sync::Arc;
20use crate::concurrency::sync::atomic::{AtomicI64, Ordering};
21use crate::endpoints::connection::{Connection, SendFutureOutput, SendFutureState};
22use crate::wire::MessageHeader;
23use crate::{Body, Flexibility, ProtocolError, SendFuture, Transport};
24
25struct ServerInner<T: Transport> {
26    connection: Connection<T>,
27    epitaph: AtomicI64,
28}
29
30impl<T: Transport> ServerInner<T> {
31    const EPITAPH_NONE: i64 = i64::MAX;
32
33    fn new(shared: T::Shared) -> Self {
34        Self { connection: Connection::new(shared), epitaph: AtomicI64::new(Self::EPITAPH_NONE) }
35    }
36
37    fn close_with_epitaph(&self, epitaph: Option<i32>) {
38        if let Some(epitaph) = epitaph {
39            self.epitaph.store(epitaph as i64, Ordering::Relaxed);
40        }
41        self.connection.stop();
42    }
43
44    fn epitaph(&self) -> Option<i32> {
45        let epitaph = self.epitaph.load(Ordering::Relaxed);
46        if epitaph != Self::EPITAPH_NONE { Some(epitaph as i32) } else { None }
47    }
48}
49
50/// A server endpoint.
51pub struct Server<T: Transport> {
52    inner: Arc<ServerInner<T>>,
53}
54
55impl<T: Transport> Server<T> {
56    /// Closes the channel from the server end.
57    pub fn close(&self) {
58        self.inner.close_with_epitaph(None);
59    }
60
61    /// Closes the channel from the server end after sending an epitaph message.
62    pub fn close_with_epitaph(&self, epitaph: i32) {
63        self.inner.close_with_epitaph(Some(epitaph));
64    }
65
66    /// Send an event.
67    pub fn send_event<W>(
68        &self,
69        ordinal: u64,
70        flexibility: Flexibility,
71        event: impl Encode<W, T::SendBuffer>,
72    ) -> Result<SendFuture<'_, T>, EncodeError>
73    where
74        W: Wire<Constraint = ()>,
75    {
76        self.inner.connection.send_message(|buffer| {
77            buffer.encode_next(MessageHeader::new(0, ordinal, flexibility))?;
78            buffer.encode_next(event)
79        })
80    }
81}
82
83impl<T: Transport> Clone for Server<T> {
84    fn clone(&self) -> Self {
85        Self { inner: self.inner.clone() }
86    }
87}
88
89/// A type which handles incoming events for a server.
90///
91/// The futures returned by `on_one_way` and `on_two_way` are required to be `Send`. See
92/// `LocalServerHandler` for a version of this trait which does not require the returned futures to
93/// be `Send`.
94pub trait ServerHandler<T: Transport> {
95    /// Handles a received one-way server message.
96    ///
97    /// The server cannot handle more messages until `on_one_way` completes. If `on_one_way` may
98    /// block, perform asynchronous work, or take a long time to process a message, it should
99    /// offload work to an async task.
100    fn on_one_way(
101        &mut self,
102        ordinal: u64,
103        flexibility: Flexibility,
104        body: Body<T>,
105    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
106
107    /// Handles a received two-way server message.
108    ///
109    /// The server cannot handle more messages until `on_two_way` completes. If `on_two_way` may
110    /// block, perform asynchronous work, or take a long time to process a message, it should
111    /// offload work to an async task.
112    fn on_two_way(
113        &mut self,
114        ordinal: u64,
115        flexibility: Flexibility,
116        body: Body<T>,
117        responder: Responder<T>,
118    ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
119}
120
121/// A dispatcher for a server endpoint.
122///
123/// A server dispatcher receives all of the incoming requests and dispatches them to the server
124/// handler. It acts as the message pump for the server.
125///
126/// The dispatcher must be actively polled to receive requests. If the dispatcher is not
127/// [`run`](ServerDispatcher::run), then requests will not be received.
128pub struct ServerDispatcher<T: Transport> {
129    inner: Arc<ServerInner<T>>,
130    exclusive: T::Exclusive,
131    is_terminated: bool,
132}
133
134impl<T: Transport> Drop for ServerDispatcher<T> {
135    fn drop(&mut self) {
136        if !self.is_terminated {
137            // SAFETY: We checked that the connection has not been terminated.
138            unsafe {
139                self.inner.connection.terminate(ProtocolError::Stopped);
140            }
141        }
142    }
143}
144
145impl<T: Transport> ServerDispatcher<T> {
146    /// Creates a new server from a transport.
147    pub fn new(transport: T) -> Self {
148        let (shared, exclusive) = transport.split();
149        Self { inner: Arc::new(ServerInner::new(shared)), exclusive, is_terminated: false }
150    }
151
152    /// Returns the dispatcher's server.
153    pub fn server(&self) -> Server<T> {
154        Server { inner: self.inner.clone() }
155    }
156
157    /// Runs the server with the provided handler.
158    pub async fn run<H>(mut self, mut handler: H) -> Result<H, ProtocolError<T::Error>>
159    where
160        H: ServerHandler<T>,
161    {
162        // We may assume that the connection has not been terminated because
163        // connections are only terminated by `run` and `drop`. Neither of those
164        // could have been called before this method because `run` consumes
165        // `self` and `drop` is only ever called once.
166
167        let error = loop {
168            // SAFETY: The connection has not been terminated.
169            let result = unsafe { self.run_one(&mut handler).await };
170            if let Err(error) = result {
171                break error;
172            }
173        };
174
175        // If we closed locally, we may have an epitaph to send before
176        // terminating the connection.
177        if matches!(error, ProtocolError::Stopped)
178            && let Some(epitaph) = self.inner.epitaph()
179        {
180            // Note that we don't care whether sending the epitaph succeeds
181            // or fails; it's best-effort.
182
183            // SAFETY: The connection has not been terminated.
184            let _ = unsafe { self.inner.connection.send_epitaph(epitaph).await };
185        }
186
187        // SAFETY: The connection has not been terminated.
188        unsafe {
189            self.inner.connection.terminate(error.clone());
190        }
191        self.is_terminated = true;
192
193        match error {
194            // We consider servers to have finished successfully if they stop
195            // themselves manually, or if the client disconnects.
196            ProtocolError::Stopped | ProtocolError::PeerClosed => Ok(handler),
197
198            // Otherwise, the server finished with an error.
199            _ => Err(error),
200        }
201    }
202
203    /// # Safety
204    ///
205    /// The connection must not be terminated.
206    async unsafe fn run_one<H>(&mut self, handler: &mut H) -> Result<(), ProtocolError<T::Error>>
207    where
208        H: ServerHandler<T>,
209    {
210        // SAFETY: The caller guaranteed that the connection is not terminated.
211        let mut buffer = unsafe { self.inner.connection.recv(&mut self.exclusive).await? };
212
213        let header = buffer
214            .as_decoder()
215            .decode_prefix::<MessageHeader>()
216            .map_err(ProtocolError::InvalidMessageHeader)?;
217
218        if let Some(txid) = NonZeroU32::new(*header.txid) {
219            let responder = Responder { server: self.server(), txid };
220            handler
221                .on_two_way(*header.ordinal, header.flexibility(), Body::new(buffer), responder)
222                .await?;
223        } else {
224            handler.on_one_way(*header.ordinal, header.flexibility(), Body::new(buffer)).await?;
225        }
226
227        Ok(())
228    }
229}
230
231/// A responder for a two-way message.
232#[must_use = "responders close the underlying FIDL connection when dropped"]
233pub struct Responder<T: Transport> {
234    server: Server<T>,
235    txid: NonZeroU32,
236}
237
238impl<T: Transport> Drop for Responder<T> {
239    fn drop(&mut self) {
240        self.server.close();
241    }
242}
243
244impl<T: Transport> Responder<T> {
245    /// Send a response to a two-way message.
246    pub fn respond<W>(
247        self,
248        ordinal: u64,
249        flexibility: Flexibility,
250        response: impl Encode<W, T::SendBuffer>,
251    ) -> Result<RespondFuture<T>, EncodeError>
252    where
253        W: Wire<Constraint = ()>,
254    {
255        let state = self.server.inner.connection.send_message_raw(|buffer| {
256            buffer.encode_next(MessageHeader::new(self.txid.get(), ordinal, flexibility))?;
257            buffer.encode_next(response)
258        })?;
259
260        let this = ManuallyDrop::new(self);
261        // SAFETY: `this` is a `ManuallyDrop` and so `server` won't be dropped
262        // twice.
263        let server = unsafe { ptr::read(&this.server) };
264
265        Ok(RespondFuture { server, state })
266    }
267}
268
269/// A future which responds to a request over a connection.
270#[must_use = "futures do nothing unless polled"]
271#[pin_project]
272pub struct RespondFuture<T: Transport> {
273    server: Server<T>,
274    #[pin]
275    state: SendFutureState<T>,
276}
277
278impl<T: Transport> Future for RespondFuture<T> {
279    type Output = SendFutureOutput<T>;
280
281    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
282        let this = self.project();
283
284        this.state.poll_send(cx, &this.server.inner.connection)
285    }
286}