fidl_next_protocol/endpoints/
server.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! FIDL protocol servers.
6
7use core::future::Future;
8use core::num::NonZeroU32;
9
10use fidl_next_codec::{Encode, EncodeError, EncoderExt as _};
11
12use crate::concurrency::sync::Arc;
13use crate::concurrency::sync::atomic::{AtomicI64, Ordering};
14
15use crate::{ProtocolError, Transport, decode_header, encode_header};
16
17use super::connection::{Connection, SendFuture};
18
19/// A responder for a two-way message.
20#[must_use]
21pub struct Responder {
22    txid: NonZeroU32,
23}
24
25struct ServerSenderInner<T: Transport> {
26    connection: Connection<T>,
27    epitaph: AtomicI64,
28}
29
30impl<T: Transport> ServerSenderInner<T> {
31    const EPITAPH_NONE: i64 = i64::MAX;
32
33    fn new(shared: T::Shared) -> Self {
34        Self { connection: Connection::new(shared), epitaph: AtomicI64::new(Self::EPITAPH_NONE) }
35    }
36
37    fn close_with_epitaph(&self, epitaph: Option<i32>) {
38        if let Some(epitaph) = epitaph {
39            self.epitaph.store(epitaph as i64, Ordering::Relaxed);
40        }
41        self.connection.stop();
42    }
43
44    fn epitaph(&self) -> Option<i32> {
45        let epitaph = self.epitaph.load(Ordering::Relaxed);
46        if epitaph != Self::EPITAPH_NONE { Some(epitaph as i32) } else { None }
47    }
48}
49
50/// A sender for a server endpoint.
51pub struct ServerSender<T: Transport> {
52    inner: Arc<ServerSenderInner<T>>,
53}
54
55impl<T: Transport> ServerSender<T> {
56    /// Closes the channel from the server end.
57    pub fn close(&self) {
58        self.inner.close_with_epitaph(None);
59    }
60
61    /// Closes the channel from the server end after sending an epitaph message.
62    pub fn close_with_epitaph(&self, epitaph: i32) {
63        self.inner.close_with_epitaph(Some(epitaph));
64    }
65
66    /// Send an event.
67    pub fn send_event<M>(&self, ordinal: u64, event: M) -> Result<SendFuture<'_, T>, EncodeError>
68    where
69        M: Encode<T::SendBuffer>,
70    {
71        self.inner.connection.send_with(|buffer| {
72            encode_header::<T>(buffer, 0, ordinal)?;
73            buffer.encode_next(event)
74        })
75    }
76
77    /// Send a response to a two-way message.
78    pub fn send_response<M>(
79        &self,
80        responder: Responder,
81        ordinal: u64,
82        response: M,
83    ) -> Result<SendFuture<'_, T>, EncodeError>
84    where
85        M: Encode<T::SendBuffer>,
86    {
87        self.inner.connection.send_with(|buffer| {
88            encode_header::<T>(buffer, responder.txid.get(), ordinal)?;
89            buffer.encode_next(response)
90        })
91    }
92}
93
94impl<T: Transport> Clone for ServerSender<T> {
95    fn clone(&self) -> Self {
96        Self { inner: self.inner.clone() }
97    }
98}
99
100/// A type which handles incoming events for a server.
101///
102/// The futures returned by `on_one_way` and `on_two_way` are required to be `Send`. See
103/// `LocalServerHandler` for a version of this trait which does not require the returned futures to
104/// be `Send`.
105pub trait ServerHandler<T: Transport> {
106    /// Handles a received one-way server message.
107    ///
108    /// The server cannot handle more messages until `on_one_way` completes. If `on_one_way` may
109    /// block, perform asynchronous work, or take a long time to process a message, it should
110    /// offload work to an async task.
111    fn on_one_way(
112        &mut self,
113        sender: &ServerSender<T>,
114        ordinal: u64,
115        buffer: T::RecvBuffer,
116    ) -> impl Future<Output = ()> + Send;
117
118    /// Handles a received two-way server message.
119    ///
120    /// The server cannot handle more messages until `on_two_way` completes. If `on_two_way` may
121    /// block, perform asynchronous work, or take a long time to process a message, it should
122    /// offload work to an async task.
123    fn on_two_way(
124        &mut self,
125        sender: &ServerSender<T>,
126        ordinal: u64,
127        buffer: T::RecvBuffer,
128        responder: Responder,
129    ) -> impl Future<Output = ()> + Send;
130}
131
132/// A server for an endpoint.
133pub struct Server<T: Transport> {
134    sender: ServerSender<T>,
135    exclusive: T::Exclusive,
136    is_terminated: bool,
137}
138
139impl<T: Transport> Drop for Server<T> {
140    fn drop(&mut self) {
141        if !self.is_terminated {
142            // SAFETY: We checked that the connection has not been terminated.
143            unsafe {
144                self.sender.inner.connection.terminate(ProtocolError::Stopped);
145            }
146        }
147    }
148}
149
150impl<T: Transport> Server<T> {
151    /// Creates a new server from a transport.
152    pub fn new(transport: T) -> Self {
153        let (shared, exclusive) = transport.split();
154        Self {
155            sender: ServerSender { inner: Arc::new(ServerSenderInner::new(shared)) },
156            exclusive,
157            is_terminated: false,
158        }
159    }
160
161    /// Returns the sender for the server.
162    pub fn sender(&self) -> &ServerSender<T> {
163        &self.sender
164    }
165
166    /// Runs the server with the provided handler.
167    pub async fn run<H>(mut self, mut handler: H) -> Result<H, ProtocolError<T::Error>>
168    where
169        H: ServerHandler<T>,
170    {
171        // We may assume that the connection has not been terminated because
172        // connections are only terminated by `run` and `drop`. Neither of those
173        // could have been called before this method because `run` consumes
174        // `self` and `drop` is only ever called once.
175
176        let error = loop {
177            // SAFETY: The connection has not been terminated.
178            let result = unsafe { self.run_one(&mut handler).await };
179            if let Err(error) = result {
180                break error;
181            }
182        };
183
184        // If we closed locally, we may have an epitaph to send before
185        // terminating the connection.
186        if matches!(error, ProtocolError::Stopped) {
187            if let Some(epitaph) = self.sender.inner.epitaph() {
188                // Note that we don't care whether sending the epitaph succeeds
189                // or fails; it's best-effort.
190
191                // SAFETY: The connection has not been terminated.
192                let _ = unsafe { self.sender.inner.connection.send_epitaph(epitaph).await };
193            }
194        }
195
196        // SAFETY: The connection has not been terminated.
197        unsafe {
198            self.sender.inner.connection.terminate(error.clone());
199        }
200        self.is_terminated = true;
201
202        match error {
203            // We consider servers to have finished successfully if they stop
204            // themselves manually, or if the client disconnects.
205            ProtocolError::Stopped | ProtocolError::PeerClosed => Ok(handler),
206
207            // Otherwise, the server finished with an error.
208            _ => Err(error),
209        }
210    }
211
212    /// # Safety
213    ///
214    /// The connection must not be terminated.
215    async unsafe fn run_one<H>(&mut self, handler: &mut H) -> Result<(), ProtocolError<T::Error>>
216    where
217        H: ServerHandler<T>,
218    {
219        // SAFETY: The caller guaranteed that the connection is not terminated.
220        let mut buffer = unsafe { self.sender.inner.connection.recv(&mut self.exclusive).await? };
221
222        let (txid, ordinal) =
223            decode_header::<T>(&mut buffer).map_err(ProtocolError::InvalidMessageHeader)?;
224        if let Some(txid) = NonZeroU32::new(txid) {
225            handler.on_two_way(&self.sender, ordinal, buffer, Responder { txid }).await;
226        } else {
227            handler.on_one_way(&self.sender, ordinal, buffer).await;
228        }
229
230        Ok(())
231    }
232}