fidl_next_protocol/endpoints/
server.rs1use core::future::Future;
8use core::num::NonZeroU32;
9
10use fidl_next_codec::{Encode, EncodeError, EncoderExt as _};
11
12use crate::concurrency::sync::Arc;
13use crate::concurrency::sync::atomic::{AtomicI64, Ordering};
14
15use crate::{ProtocolError, Transport, decode_header, encode_header};
16
17use super::connection::{Connection, SendFuture};
18
19#[must_use]
21pub struct Responder {
22 txid: NonZeroU32,
23}
24
25struct ServerSenderInner<T: Transport> {
26 connection: Connection<T>,
27 epitaph: AtomicI64,
28}
29
30impl<T: Transport> ServerSenderInner<T> {
31 const EPITAPH_NONE: i64 = i64::MAX;
32
33 fn new(shared: T::Shared) -> Self {
34 Self { connection: Connection::new(shared), epitaph: AtomicI64::new(Self::EPITAPH_NONE) }
35 }
36
37 fn close_with_epitaph(&self, epitaph: Option<i32>) {
38 if let Some(epitaph) = epitaph {
39 self.epitaph.store(epitaph as i64, Ordering::Relaxed);
40 }
41 self.connection.stop();
42 }
43
44 fn epitaph(&self) -> Option<i32> {
45 let epitaph = self.epitaph.load(Ordering::Relaxed);
46 if epitaph != Self::EPITAPH_NONE { Some(epitaph as i32) } else { None }
47 }
48}
49
50pub struct ServerSender<T: Transport> {
52 inner: Arc<ServerSenderInner<T>>,
53}
54
55impl<T: Transport> ServerSender<T> {
56 pub fn close(&self) {
58 self.inner.close_with_epitaph(None);
59 }
60
61 pub fn close_with_epitaph(&self, epitaph: i32) {
63 self.inner.close_with_epitaph(Some(epitaph));
64 }
65
66 pub fn send_event<M>(&self, ordinal: u64, event: M) -> Result<SendFuture<'_, T>, EncodeError>
68 where
69 M: Encode<T::SendBuffer>,
70 {
71 self.inner.connection.send_with(|buffer| {
72 encode_header::<T>(buffer, 0, ordinal)?;
73 buffer.encode_next(event)
74 })
75 }
76
77 pub fn send_response<M>(
79 &self,
80 responder: Responder,
81 ordinal: u64,
82 response: M,
83 ) -> Result<SendFuture<'_, T>, EncodeError>
84 where
85 M: Encode<T::SendBuffer>,
86 {
87 self.inner.connection.send_with(|buffer| {
88 encode_header::<T>(buffer, responder.txid.get(), ordinal)?;
89 buffer.encode_next(response)
90 })
91 }
92}
93
94impl<T: Transport> Clone for ServerSender<T> {
95 fn clone(&self) -> Self {
96 Self { inner: self.inner.clone() }
97 }
98}
99
100pub trait ServerHandler<T: Transport> {
106 fn on_one_way(
112 &mut self,
113 sender: &ServerSender<T>,
114 ordinal: u64,
115 buffer: T::RecvBuffer,
116 ) -> impl Future<Output = ()> + Send;
117
118 fn on_two_way(
124 &mut self,
125 sender: &ServerSender<T>,
126 ordinal: u64,
127 buffer: T::RecvBuffer,
128 responder: Responder,
129 ) -> impl Future<Output = ()> + Send;
130}
131
132pub struct Server<T: Transport> {
134 sender: ServerSender<T>,
135 exclusive: T::Exclusive,
136 is_terminated: bool,
137}
138
139impl<T: Transport> Drop for Server<T> {
140 fn drop(&mut self) {
141 if !self.is_terminated {
142 unsafe {
144 self.sender.inner.connection.terminate(ProtocolError::Stopped);
145 }
146 }
147 }
148}
149
150impl<T: Transport> Server<T> {
151 pub fn new(transport: T) -> Self {
153 let (shared, exclusive) = transport.split();
154 Self {
155 sender: ServerSender { inner: Arc::new(ServerSenderInner::new(shared)) },
156 exclusive,
157 is_terminated: false,
158 }
159 }
160
161 pub fn sender(&self) -> &ServerSender<T> {
163 &self.sender
164 }
165
166 pub async fn run<H>(mut self, mut handler: H) -> Result<H, ProtocolError<T::Error>>
168 where
169 H: ServerHandler<T>,
170 {
171 let error = loop {
177 let result = unsafe { self.run_one(&mut handler).await };
179 if let Err(error) = result {
180 break error;
181 }
182 };
183
184 if matches!(error, ProtocolError::Stopped) {
187 if let Some(epitaph) = self.sender.inner.epitaph() {
188 let _ = unsafe { self.sender.inner.connection.send_epitaph(epitaph).await };
193 }
194 }
195
196 unsafe {
198 self.sender.inner.connection.terminate(error.clone());
199 }
200 self.is_terminated = true;
201
202 match error {
203 ProtocolError::Stopped | ProtocolError::PeerClosed => Ok(handler),
206
207 _ => Err(error),
209 }
210 }
211
212 async unsafe fn run_one<H>(&mut self, handler: &mut H) -> Result<(), ProtocolError<T::Error>>
216 where
217 H: ServerHandler<T>,
218 {
219 let mut buffer = unsafe { self.sender.inner.connection.recv(&mut self.exclusive).await? };
221
222 let (txid, ordinal) =
223 decode_header::<T>(&mut buffer).map_err(ProtocolError::InvalidMessageHeader)?;
224 if let Some(txid) = NonZeroU32::new(txid) {
225 handler.on_two_way(&self.sender, ordinal, buffer, Responder { txid }).await;
226 } else {
227 handler.on_one_way(&self.sender, ordinal, buffer).await;
228 }
229
230 Ok(())
231 }
232}