1use fidl_fuchsia_fdomain as proto;
6use std::collections::VecDeque;
7use std::num::NonZeroU32;
8use std::task::{Context, Poll, Waker};
9
10use proto::f_domain_ordinals as ordinals;
11
12#[pin_project::pin_project]
19pub struct FDomainCodec {
20 #[pin]
21 fdomain: crate::FDomain,
22 outgoing: VecDeque<Box<[u8]>>,
23 wakers: Vec<Waker>,
24}
25
26impl FDomainCodec {
27 pub fn new(fdomain: crate::FDomain) -> FDomainCodec {
29 FDomainCodec { fdomain, outgoing: VecDeque::new(), wakers: Vec::new() }
30 }
31
32 pub fn message(&mut self, data: &[u8]) -> fidl::Result<()> {
34 let (header, rest) = fidl_message::decode_transaction_header(data)?;
35 let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
36 return Err(fidl::Error::UnknownOrdinal {
37 ordinal: header.ordinal,
38 protocol_name:
39 <proto::FDomainMarker as fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
40 });
41 };
42
43 match header.ordinal {
44 ordinals::GET_NAMESPACE => {
45 let request = fidl_message::decode_message::<proto::FDomainGetNamespaceRequest>(
46 header, rest,
47 )?;
48 let result = self.fdomain.get_namespace(request);
49 self.send_response(tx_id, header.ordinal, result)?;
50 }
51 ordinals::CREATE_CHANNEL => {
52 let request = fidl_message::decode_message::<proto::ChannelCreateChannelRequest>(
53 header, rest,
54 )?;
55 let result = self.fdomain.create_channel(request);
56 self.send_response(tx_id, header.ordinal, result)?;
57 }
58 ordinals::CREATE_SOCKET => {
59 let request =
60 fidl_message::decode_message::<proto::SocketCreateSocketRequest>(header, rest)?;
61 let result = self.fdomain.create_socket(request);
62 self.send_response(tx_id, header.ordinal, result)?;
63 }
64 ordinals::CREATE_EVENT_PAIR => {
65 let request = fidl_message::decode_message::<proto::EventPairCreateEventPairRequest>(
66 header, rest,
67 )?;
68 let result = self.fdomain.create_event_pair(request);
69 self.send_response(tx_id, header.ordinal, result)?;
70 }
71 ordinals::CREATE_EVENT => {
72 let request =
73 fidl_message::decode_message::<proto::EventCreateEventRequest>(header, rest)?;
74 let result = self.fdomain.create_event(request);
75 self.send_response(tx_id, header.ordinal, result)?;
76 }
77 ordinals::SET_SOCKET_DISPOSITION => {
78 let request = fidl_message::decode_message::<
79 proto::SocketSetSocketDispositionRequest,
80 >(header, rest)?;
81 self.fdomain.set_socket_disposition(tx_id, request);
82 }
83 ordinals::READ_SOCKET => {
84 let request =
85 fidl_message::decode_message::<proto::SocketReadSocketRequest>(header, rest)?;
86 self.fdomain.read_socket(tx_id, request);
87 }
88 ordinals::READ_CHANNEL => {
89 let request =
90 fidl_message::decode_message::<proto::ChannelReadChannelRequest>(header, rest)?;
91 self.fdomain.read_channel(tx_id, request);
92 }
93 ordinals::WRITE_SOCKET => {
94 let request =
95 fidl_message::decode_message::<proto::SocketWriteSocketRequest>(header, rest)?;
96 self.fdomain.write_socket(tx_id, request);
97 }
98 ordinals::WRITE_CHANNEL => {
99 let request = fidl_message::decode_message::<proto::ChannelWriteChannelRequest>(
100 header, rest,
101 )?;
102 self.fdomain.write_channel(tx_id, request);
103 }
104 ordinals::WAIT_FOR_SIGNALS => {
105 let request = fidl_message::decode_message::<proto::FDomainWaitForSignalsRequest>(
106 header, rest,
107 )?;
108 self.fdomain.wait_for_signals(tx_id, request);
109 }
110 ordinals::CLOSE => {
111 let request =
112 fidl_message::decode_message::<proto::FDomainCloseRequest>(header, rest)?;
113 self.fdomain.close(tx_id, request);
114 }
115 ordinals::DUPLICATE => {
116 let request =
117 fidl_message::decode_message::<proto::FDomainDuplicateRequest>(header, rest)?;
118 let result = self.fdomain.duplicate(request);
119 self.send_response(tx_id, header.ordinal, result)?;
120 }
121 ordinals::REPLACE => {
122 let request =
123 fidl_message::decode_message::<proto::FDomainReplaceRequest>(header, rest)?;
124 let result = self.fdomain.replace(tx_id, request);
125 self.send_response(tx_id, header.ordinal, result)?;
126 }
127 ordinals::SIGNAL => {
128 let request =
129 fidl_message::decode_message::<proto::FDomainSignalRequest>(header, rest)?;
130 let result = self.fdomain.signal(request);
131 self.send_response(tx_id, header.ordinal, result)?;
132 }
133 ordinals::SIGNAL_PEER => {
134 let request =
135 fidl_message::decode_message::<proto::FDomainSignalPeerRequest>(header, rest)?;
136 let result = self.fdomain.signal_peer(request);
137 self.send_response(tx_id, header.ordinal, result)?;
138 }
139 ordinals::READ_CHANNEL_STREAMING_START => {
140 let request = fidl_message::decode_message::<
141 proto::ChannelReadChannelStreamingStartRequest,
142 >(header, rest)?;
143 self.fdomain.read_channel_streaming_start(tx_id, request);
144 }
145 ordinals::READ_CHANNEL_STREAMING_STOP => {
146 let request = fidl_message::decode_message::<
147 proto::ChannelReadChannelStreamingStopRequest,
148 >(header, rest)?;
149 self.fdomain.read_channel_streaming_stop(tx_id, request);
150 }
151 ordinals::READ_SOCKET_STREAMING_START => {
152 let request = fidl_message::decode_message::<
153 proto::SocketReadSocketStreamingStartRequest,
154 >(header, rest)?;
155 self.fdomain.read_socket_streaming_start(tx_id, request);
156 }
157 ordinals::READ_SOCKET_STREAMING_STOP => {
158 let request = fidl_message::decode_message::<
159 proto::SocketReadSocketStreamingStopRequest,
160 >(header, rest)?;
161 self.fdomain.read_socket_streaming_stop(tx_id, request);
162 }
163 ordinals::GET_KOID => {
164 let request =
165 fidl_message::decode_message::<proto::FDomainGetKoidRequest>(header, rest)?;
166 let result = self.fdomain.get_koid(request);
167 self.send_response(tx_id, header.ordinal, result)?;
168 }
169 unknown if header.dynamic_flags().contains(fidl_message::DynamicFlags::FLEXIBLE) => {
170 if header.tx_id != 0 {
171 let header = fidl_message::TransactionHeader::new(
172 header.tx_id,
173 unknown,
174 fidl_message::DynamicFlags::FLEXIBLE,
175 );
176 self.enqueue_outgoing::<Vec<u8>>(
177 fidl_message::encode_response_flexible_unknown(header)?.into(),
178 );
179 }
180 }
181 _ => {
182 return Err(fidl::Error::UnknownOrdinal {
183 ordinal: header.ordinal,
184 protocol_name:
185 <proto::FDomainMarker as fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
186 });
187 }
188 }
189
190 Ok(())
191 }
192
193 fn enqueue_outgoing<T: Into<Box<[u8]>>>(&mut self, msg: T) {
196 self.outgoing.push_back(msg.into());
197 self.wakers.drain(..).for_each(Waker::wake);
198 }
199
200 fn send_response<T: fidl_message::Body, E: fidl_message::ErrorType>(
204 &mut self,
205 tx_id: NonZeroU32,
206 ordinal: u64,
207 body: Result<T, E>,
208 ) -> fidl::Result<()>
209where
210 for<'a> <<T as fidl_message::Body>::MarkerInResultUnion as fidl::encoding::ValueTypeMarker>::Borrowed<'a>:
211 fidl::encoding::Encode<T::MarkerInResultUnion, fidl::encoding::NoHandleResourceDialect>,
212 for<'a> <<E as fidl_message::ErrorType>::Marker as fidl::encoding::ValueTypeMarker>::Borrowed<'a>:
213 fidl::encoding::Encode<E::Marker, fidl::encoding::NoHandleResourceDialect>,
214 {
215 let header = fidl_message::TransactionHeader::new(
216 tx_id.into(),
217 ordinal,
218 fidl_message::DynamicFlags::FLEXIBLE,
219 );
220 self.enqueue_outgoing(fidl_message::encode_response_result::<T, E>(header, body)?);
221
222 Ok(())
223 }
224
225 fn send_event<T: fidl_message::Body>(&mut self, ordinal: u64, body: T) -> fidl::Result<()>
229 where
230 for<'a> <<T as fidl_message::Body>::MarkerAtTopLevel as fidl::encoding::ValueTypeMarker>::Borrowed<
231 'a,
232 >: fidl::encoding::Encode<T::MarkerAtTopLevel, fidl::encoding::NoHandleResourceDialect>,
233 {
234 let header =
235 fidl_message::TransactionHeader::new(0, ordinal, fidl_message::DynamicFlags::empty());
236 self.enqueue_outgoing(fidl_message::encode_message(header, body)?);
237
238 Ok(())
239 }
240}
241
242impl futures::Stream for FDomainCodec {
243 type Item = fidl::Result<Box<[u8]>>;
244
245 fn poll_next(
246 mut self: std::pin::Pin<&mut Self>,
247 ctx: &mut Context<'_>,
248 ) -> Poll<Option<Self::Item>> {
249 while let Poll::Ready(Some(event)) = self.as_mut().project().fdomain.poll_next(ctx) {
250 let result = match event {
251 crate::FDomainEvent::ChannelStreamingReadStart(tx_id, msg) => {
252 self.send_response(tx_id, ordinals::READ_CHANNEL_STREAMING_START, msg)
253 }
254 crate::FDomainEvent::ChannelStreamingReadStop(tx_id, msg) => {
255 self.send_response(tx_id, ordinals::READ_CHANNEL_STREAMING_STOP, msg)
256 }
257 crate::FDomainEvent::SocketStreamingReadStart(tx_id, msg) => {
258 self.send_response(tx_id, ordinals::READ_SOCKET_STREAMING_START, msg)
259 }
260 crate::FDomainEvent::SocketStreamingReadStop(tx_id, msg) => {
261 self.send_response(tx_id, ordinals::READ_SOCKET_STREAMING_STOP, msg)
262 }
263 crate::FDomainEvent::WaitForSignals(tx_id, msg) => {
264 self.send_response(tx_id, ordinals::WAIT_FOR_SIGNALS, msg)
265 }
266 crate::FDomainEvent::SocketData(tx_id, msg) => {
267 self.send_response(tx_id, ordinals::READ_SOCKET, msg)
268 }
269 crate::FDomainEvent::SocketStreamingData(msg) => {
270 self.send_event(ordinals::ON_SOCKET_STREAMING_DATA, msg)
271 }
272 crate::FDomainEvent::SocketDispositionSet(tx_id, msg) => {
273 self.send_response(tx_id, ordinals::SET_SOCKET_DISPOSITION, msg)
274 }
275 crate::FDomainEvent::WroteSocket(tx_id, msg) => {
276 self.send_response(tx_id, ordinals::WRITE_SOCKET, msg)
277 }
278 crate::FDomainEvent::ChannelData(tx_id, msg) => {
279 self.send_response(tx_id, ordinals::READ_CHANNEL, msg)
280 }
281 crate::FDomainEvent::ChannelStreamingData(msg) => {
282 self.send_event(ordinals::ON_CHANNEL_STREAMING_DATA, msg)
283 }
284 crate::FDomainEvent::WroteChannel(tx_id, msg) => {
285 self.send_response(tx_id, ordinals::WRITE_CHANNEL, msg)
286 }
287 crate::FDomainEvent::ClosedHandle(tx_id, msg) => {
288 self.send_response(tx_id, ordinals::CLOSE, msg)
289 }
290 crate::FDomainEvent::ReplacedHandle(tx_id, msg) => {
291 self.send_response(tx_id, ordinals::REPLACE, msg)
292 }
293 };
294
295 if let Err(e) = result {
296 return Poll::Ready(Some(Err(e)));
297 }
298 }
299
300 if let Some(got) = self.outgoing.pop_front() {
301 Poll::Ready(Some(Ok(got)))
302 } else {
303 self.wakers.push(ctx.waker().clone());
304 Poll::Pending
305 }
306 }
307}