fidl_next_protocol/endpoints/
server.rs1use core::future::Future;
8use core::mem::ManuallyDrop;
9use core::num::NonZeroU32;
10use core::pin::Pin;
11use core::ptr;
12use core::task::{Context, Poll};
13
14use fidl_next_codec::{
15 AsDecoder as _, DecoderExt as _, Encode, EncodeError, EncoderExt as _, Wire,
16};
17use pin_project::pin_project;
18
19use crate::concurrency::sync::Arc;
20use crate::concurrency::sync::atomic::{AtomicI64, Ordering};
21use crate::endpoints::connection::{Connection, SendFutureOutput, SendFutureState};
22use crate::wire::MessageHeader;
23use crate::{Body, Flexibility, ProtocolError, SendFuture, Transport};
24
25struct ServerInner<T: Transport> {
26 connection: Connection<T>,
27 epitaph: AtomicI64,
28}
29
30impl<T: Transport> ServerInner<T> {
31 const EPITAPH_NONE: i64 = i64::MAX;
32
33 fn new(shared: T::Shared) -> Self {
34 Self { connection: Connection::new(shared), epitaph: AtomicI64::new(Self::EPITAPH_NONE) }
35 }
36
37 fn close_with_epitaph(&self, epitaph: Option<i32>) {
38 if let Some(epitaph) = epitaph {
39 self.epitaph.store(epitaph as i64, Ordering::Relaxed);
40 }
41 self.connection.stop();
42 }
43
44 fn epitaph(&self) -> Option<i32> {
45 let epitaph = self.epitaph.load(Ordering::Relaxed);
46 if epitaph != Self::EPITAPH_NONE { Some(epitaph as i32) } else { None }
47 }
48}
49
50pub struct Server<T: Transport> {
52 inner: Arc<ServerInner<T>>,
53}
54
55impl<T: Transport> Server<T> {
56 pub fn close(&self) {
58 self.inner.close_with_epitaph(None);
59 }
60
61 pub fn close_with_epitaph(&self, epitaph: i32) {
63 self.inner.close_with_epitaph(Some(epitaph));
64 }
65
66 pub fn send_event<W>(
68 &self,
69 ordinal: u64,
70 flexibility: Flexibility,
71 event: impl Encode<W, T::SendBuffer>,
72 ) -> Result<SendFuture<'_, T>, EncodeError>
73 where
74 W: Wire<Constraint = ()>,
75 {
76 self.inner.connection.send_message(|buffer| {
77 buffer.encode_next(MessageHeader::new(0, ordinal, flexibility))?;
78 buffer.encode_next(event)
79 })
80 }
81}
82
83impl<T: Transport> Clone for Server<T> {
84 fn clone(&self) -> Self {
85 Self { inner: self.inner.clone() }
86 }
87}
88
89pub trait ServerHandler<T: Transport>: Send {
95 fn on_one_way(
101 &mut self,
102 ordinal: u64,
103 flexibility: Flexibility,
104 body: Body<T>,
105 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
106
107 fn on_two_way(
113 &mut self,
114 ordinal: u64,
115 flexibility: Flexibility,
116 body: Body<T>,
117 responder: Responder<T>,
118 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
119}
120
121pub trait LocalServerHandler<T: Transport> {
126 fn on_one_way(
130 &mut self,
131 ordinal: u64,
132 flexibility: Flexibility,
133 body: Body<T>,
134 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>>;
135
136 fn on_two_way(
140 &mut self,
141 ordinal: u64,
142 flexibility: Flexibility,
143 body: Body<T>,
144 responder: Responder<T>,
145 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>>;
146}
147
148#[repr(transparent)]
150pub struct ServerHandlerToLocalAdapter<H>(H);
151
152impl<T, H> LocalServerHandler<T> for ServerHandlerToLocalAdapter<H>
153where
154 T: Transport,
155 H: ServerHandler<T>,
156{
157 #[inline]
158 fn on_one_way(
159 &mut self,
160 ordinal: u64,
161 flexibility: Flexibility,
162 body: Body<T>,
163 ) -> impl Future<Output = Result<(), ProtocolError<<T as Transport>::Error>>> {
164 self.0.on_one_way(ordinal, flexibility, body)
165 }
166
167 #[inline]
168 fn on_two_way(
169 &mut self,
170 ordinal: u64,
171 flexibility: Flexibility,
172 body: Body<T>,
173 responder: Responder<T>,
174 ) -> impl Future<Output = Result<(), ProtocolError<<T as Transport>::Error>>> {
175 self.0.on_two_way(ordinal, flexibility, body, responder)
176 }
177}
178
179pub struct ServerDispatcher<T: Transport> {
187 inner: Arc<ServerInner<T>>,
188 exclusive: T::Exclusive,
189 is_terminated: bool,
190}
191
192impl<T: Transport> Drop for ServerDispatcher<T> {
193 fn drop(&mut self) {
194 if !self.is_terminated {
195 unsafe {
197 self.inner.connection.terminate(ProtocolError::Stopped);
198 }
199 }
200 }
201}
202
203impl<T: Transport> ServerDispatcher<T> {
204 pub fn new(transport: T) -> Self {
206 let (shared, exclusive) = transport.split();
207 Self { inner: Arc::new(ServerInner::new(shared)), exclusive, is_terminated: false }
208 }
209
210 pub fn server(&self) -> Server<T> {
212 Server { inner: self.inner.clone() }
213 }
214
215 pub async fn run<H>(self, handler: H) -> Result<H, ProtocolError<T::Error>>
217 where
218 H: ServerHandler<T>,
219 {
220 self.run_local(ServerHandlerToLocalAdapter(handler)).await.map(|adapter| adapter.0)
223 }
224
225 pub async fn run_local<H>(mut self, mut handler: H) -> Result<H, ProtocolError<T::Error>>
227 where
228 H: LocalServerHandler<T>,
229 {
230 let error = loop {
236 let result = unsafe { self.run_one(&mut handler).await };
238 if let Err(error) = result {
239 break error;
240 }
241 };
242
243 if matches!(error, ProtocolError::Stopped)
246 && let Some(epitaph) = self.inner.epitaph()
247 {
248 let _ = unsafe { self.inner.connection.send_epitaph(epitaph).await };
253 }
254
255 unsafe {
257 self.inner.connection.terminate(error.clone());
258 }
259 self.is_terminated = true;
260
261 match error {
262 ProtocolError::Stopped | ProtocolError::PeerClosed => Ok(handler),
265
266 _ => Err(error),
268 }
269 }
270
271 async unsafe fn run_one<H>(&mut self, handler: &mut H) -> Result<(), ProtocolError<T::Error>>
275 where
276 H: LocalServerHandler<T>,
277 {
278 let mut buffer = unsafe { self.inner.connection.recv(&mut self.exclusive).await? };
280
281 let header = buffer
282 .as_decoder()
283 .decode_prefix::<MessageHeader>()
284 .map_err(ProtocolError::InvalidMessageHeader)?;
285
286 if let Some(txid) = NonZeroU32::new(*header.txid) {
287 let responder = Responder { server: self.server(), txid };
288 handler
289 .on_two_way(*header.ordinal, header.flexibility(), Body::new(buffer), responder)
290 .await?;
291 } else {
292 handler.on_one_way(*header.ordinal, header.flexibility(), Body::new(buffer)).await?;
293 }
294
295 Ok(())
296 }
297}
298
299#[must_use = "responders close the underlying FIDL connection when dropped"]
301pub struct Responder<T: Transport> {
302 server: Server<T>,
303 txid: NonZeroU32,
304}
305
306impl<T: Transport> Drop for Responder<T> {
307 fn drop(&mut self) {
308 self.server.close();
309 }
310}
311
312impl<T: Transport> Responder<T> {
313 pub fn respond<W>(
315 self,
316 ordinal: u64,
317 flexibility: Flexibility,
318 response: impl Encode<W, T::SendBuffer>,
319 ) -> Result<RespondFuture<T>, EncodeError>
320 where
321 W: Wire<Constraint = ()>,
322 {
323 let state = self.server.inner.connection.send_message_raw(|buffer| {
324 buffer.encode_next(MessageHeader::new(self.txid.get(), ordinal, flexibility))?;
325 buffer.encode_next(response)
326 })?;
327
328 let this = ManuallyDrop::new(self);
329 let server = unsafe { ptr::read(&this.server) };
332
333 Ok(RespondFuture { server, state })
334 }
335}
336
337#[must_use = "futures do nothing unless polled"]
339#[pin_project]
340pub struct RespondFuture<T: Transport> {
341 server: Server<T>,
342 #[pin]
343 state: SendFutureState<T>,
344}
345
346impl<T: Transport> Future for RespondFuture<T> {
347 type Output = SendFutureOutput<T>;
348
349 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
350 let this = self.project();
351
352 this.state.poll_send(cx, &this.server.inner.connection)
353 }
354}