fidl_next_protocol/endpoints/
server.rs1use core::future::Future;
8use core::mem::ManuallyDrop;
9use core::num::NonZeroU32;
10use core::pin::Pin;
11use core::ptr;
12use core::task::{Context, Poll};
13
14use fidl_next_codec::{
15 AsDecoder as _, DecoderExt as _, Encode, EncodeError, EncoderExt as _, Wire,
16};
17use pin_project::pin_project;
18
19use crate::concurrency::sync::Arc;
20use crate::concurrency::sync::atomic::{AtomicI64, Ordering};
21use crate::endpoints::connection::{Connection, SendFutureOutput, SendFutureState};
22use crate::wire::MessageHeader;
23use crate::{Body, Flexibility, ProtocolError, SendFuture, Transport};
24
25struct ServerInner<T: Transport> {
26 connection: Connection<T>,
27 epitaph: AtomicI64,
28}
29
30impl<T: Transport> ServerInner<T> {
31 const EPITAPH_NONE: i64 = i64::MAX;
32
33 fn new(shared: T::Shared) -> Self {
34 Self { connection: Connection::new(shared), epitaph: AtomicI64::new(Self::EPITAPH_NONE) }
35 }
36
37 fn close_with_epitaph(&self, epitaph: Option<i32>) {
38 if let Some(epitaph) = epitaph {
39 self.epitaph.store(epitaph as i64, Ordering::Relaxed);
40 }
41 self.connection.stop();
42 }
43
44 fn epitaph(&self) -> Option<i32> {
45 let epitaph = self.epitaph.load(Ordering::Relaxed);
46 if epitaph != Self::EPITAPH_NONE { Some(epitaph as i32) } else { None }
47 }
48}
49
50pub struct Server<T: Transport> {
52 inner: Arc<ServerInner<T>>,
53}
54
55impl<T: Transport> Server<T> {
56 pub fn close(&self) {
58 self.inner.close_with_epitaph(None);
59 }
60
61 pub fn close_with_epitaph(&self, epitaph: i32) {
63 self.inner.close_with_epitaph(Some(epitaph));
64 }
65
66 pub fn send_event<W>(
68 &self,
69 ordinal: u64,
70 flexibility: Flexibility,
71 event: impl Encode<W, T::SendBuffer>,
72 ) -> Result<SendFuture<'_, T>, EncodeError>
73 where
74 W: Wire<Constraint = ()>,
75 {
76 self.inner.connection.send_message(|buffer| {
77 buffer.encode_next(MessageHeader::new(0, ordinal, flexibility))?;
78 buffer.encode_next(event)
79 })
80 }
81}
82
83impl<T: Transport> Clone for Server<T> {
84 fn clone(&self) -> Self {
85 Self { inner: self.inner.clone() }
86 }
87}
88
89pub trait ServerHandler<T: Transport> {
95 fn on_one_way(
101 &mut self,
102 ordinal: u64,
103 flexibility: Flexibility,
104 body: Body<T>,
105 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
106
107 fn on_two_way(
113 &mut self,
114 ordinal: u64,
115 flexibility: Flexibility,
116 body: Body<T>,
117 responder: Responder<T>,
118 ) -> impl Future<Output = Result<(), ProtocolError<T::Error>>> + Send;
119}
120
121pub struct ServerDispatcher<T: Transport> {
129 inner: Arc<ServerInner<T>>,
130 exclusive: T::Exclusive,
131 is_terminated: bool,
132}
133
134impl<T: Transport> Drop for ServerDispatcher<T> {
135 fn drop(&mut self) {
136 if !self.is_terminated {
137 unsafe {
139 self.inner.connection.terminate(ProtocolError::Stopped);
140 }
141 }
142 }
143}
144
145impl<T: Transport> ServerDispatcher<T> {
146 pub fn new(transport: T) -> Self {
148 let (shared, exclusive) = transport.split();
149 Self { inner: Arc::new(ServerInner::new(shared)), exclusive, is_terminated: false }
150 }
151
152 pub fn server(&self) -> Server<T> {
154 Server { inner: self.inner.clone() }
155 }
156
157 pub async fn run<H>(mut self, mut handler: H) -> Result<H, ProtocolError<T::Error>>
159 where
160 H: ServerHandler<T>,
161 {
162 let error = loop {
168 let result = unsafe { self.run_one(&mut handler).await };
170 if let Err(error) = result {
171 break error;
172 }
173 };
174
175 if matches!(error, ProtocolError::Stopped)
178 && let Some(epitaph) = self.inner.epitaph()
179 {
180 let _ = unsafe { self.inner.connection.send_epitaph(epitaph).await };
185 }
186
187 unsafe {
189 self.inner.connection.terminate(error.clone());
190 }
191 self.is_terminated = true;
192
193 match error {
194 ProtocolError::Stopped | ProtocolError::PeerClosed => Ok(handler),
197
198 _ => Err(error),
200 }
201 }
202
203 async unsafe fn run_one<H>(&mut self, handler: &mut H) -> Result<(), ProtocolError<T::Error>>
207 where
208 H: ServerHandler<T>,
209 {
210 let mut buffer = unsafe { self.inner.connection.recv(&mut self.exclusive).await? };
212
213 let header = buffer
214 .as_decoder()
215 .decode_prefix::<MessageHeader>()
216 .map_err(ProtocolError::InvalidMessageHeader)?;
217
218 if let Some(txid) = NonZeroU32::new(*header.txid) {
219 let responder = Responder { server: self.server(), txid };
220 handler
221 .on_two_way(*header.ordinal, header.flexibility(), Body::new(buffer), responder)
222 .await?;
223 } else {
224 handler.on_one_way(*header.ordinal, header.flexibility(), Body::new(buffer)).await?;
225 }
226
227 Ok(())
228 }
229}
230
231#[must_use = "responders close the underlying FIDL connection when dropped"]
233pub struct Responder<T: Transport> {
234 server: Server<T>,
235 txid: NonZeroU32,
236}
237
238impl<T: Transport> Drop for Responder<T> {
239 fn drop(&mut self) {
240 self.server.close();
241 }
242}
243
244impl<T: Transport> Responder<T> {
245 pub fn respond<W>(
247 self,
248 ordinal: u64,
249 flexibility: Flexibility,
250 response: impl Encode<W, T::SendBuffer>,
251 ) -> Result<RespondFuture<T>, EncodeError>
252 where
253 W: Wire<Constraint = ()>,
254 {
255 let state = self.server.inner.connection.send_message_raw(|buffer| {
256 buffer.encode_next(MessageHeader::new(self.txid.get(), ordinal, flexibility))?;
257 buffer.encode_next(response)
258 })?;
259
260 let this = ManuallyDrop::new(self);
261 let server = unsafe { ptr::read(&this.server) };
264
265 Ok(RespondFuture { server, state })
266 }
267}
268
269#[must_use = "futures do nothing unless polled"]
271#[pin_project]
272pub struct RespondFuture<T: Transport> {
273 server: Server<T>,
274 #[pin]
275 state: SendFutureState<T>,
276}
277
278impl<T: Transport> Future for RespondFuture<T> {
279 type Output = SendFutureOutput<T>;
280
281 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
282 let this = self.project();
283
284 this.state.poll_send(cx, &this.server.inner.connection)
285 }
286}