1use fidl::AsHandleRef;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_fdomain as proto;
8use fidl_fuchsia_io as fio;
9use fuchsia_async as fasync;
10use futures::prelude::*;
11use replace_with::replace_with;
12use std::collections::hash_map::Entry;
13use std::collections::{HashMap, VecDeque};
14use std::num::NonZeroU32;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::task::{Context, Poll, Waker};
19
20mod handles;
21pub mod wire;
22
23#[cfg(test)]
24mod test;
25
26pub type Result<T, E = proto::Error> = std::result::Result<T, E>;
27
28use handles::{AnyHandle, HandleType as _, IsDatagramSocket};
29
30struct Queue<T>(VecDeque<T>, Option<Waker>);
33
34impl<T> Queue<T> {
35 fn new() -> Self {
37 Queue(VecDeque::new(), None)
38 }
39
40 fn is_empty(&self) -> bool {
42 self.0.is_empty()
43 }
44
45 fn destroy_front(&mut self) {
50 assert!(self.0.pop_front().is_some(), "Expected to find a value!");
51 }
52
53 fn pop_front(&mut self, ctx: &mut Context<'_>) -> Poll<T> {
55 if let Some(t) = self.0.pop_front() {
56 Poll::Ready(t)
57 } else {
58 self.1 = Some(ctx.waker().clone());
59 Poll::Pending
60 }
61 }
62
63 fn push_front_no_wake(&mut self, t: T) {
70 self.0.push_front(t)
71 }
72
73 fn push_back(&mut self, t: T) {
75 self.0.push_back(t);
76 self.1.take().map(Waker::wake);
77 }
78
79 fn front_mut(&mut self, ctx: &mut Context<'_>) -> Poll<&mut T> {
81 if let Some(t) = self.0.front_mut() {
82 Poll::Ready(t)
83 } else {
84 self.1 = Some(ctx.waker().clone());
85 Poll::Pending
86 }
87 }
88}
89
90const ASYNC_READ_BUFSIZE: u64 = 40960;
92
93#[derive(Debug)]
95pub enum FDomainEvent {
96 ChannelStreamingReadStart(NonZeroU32, Result<()>),
97 ChannelStreamingReadStop(NonZeroU32, Result<()>),
98 SocketStreamingReadStart(NonZeroU32, Result<()>),
99 SocketStreamingReadStop(NonZeroU32, Result<()>),
100 WaitForSignals(NonZeroU32, Result<proto::FDomainWaitForSignalsResponse>),
101 SocketData(NonZeroU32, Result<proto::SocketData>),
102 SocketStreamingData(proto::SocketOnSocketStreamingDataRequest),
103 SocketDispositionSet(NonZeroU32, Result<()>),
104 WroteSocket(NonZeroU32, Result<proto::SocketWriteSocketResponse, proto::WriteSocketError>),
105 ChannelData(NonZeroU32, Result<proto::ChannelMessage>),
106 ChannelStreamingData(proto::ChannelOnChannelStreamingDataRequest),
107 WroteChannel(NonZeroU32, Result<(), proto::WriteChannelError>),
108 ClosedHandle(NonZeroU32, Result<()>),
109 ReplacedHandle(NonZeroU32, Result<()>),
110}
111
112enum UnprocessedFDomainEvent {
116 Ready(FDomainEvent),
117 ChannelData(NonZeroU32, fidl::MessageBufEtc),
118 ChannelStreamingData(proto::HandleId, fidl::MessageBufEtc),
119}
120
121impl From<FDomainEvent> for UnprocessedFDomainEvent {
122 fn from(other: FDomainEvent) -> UnprocessedFDomainEvent {
123 UnprocessedFDomainEvent::Ready(other)
124 }
125}
126
127enum ReadOp {
129 StreamingChannel(NonZeroU32, bool),
131 StreamingSocket(NonZeroU32, bool),
133 Socket(NonZeroU32, u64),
134 Channel(NonZeroU32),
135}
136
137struct SocketWrite {
141 tid: NonZeroU32,
142 wrote: usize,
143 to_write: Vec<u8>,
144}
145
146enum WriteOp {
148 Socket(SocketWrite),
149 Channel(NonZeroU32, Vec<u8>, HandlesToWrite),
150 SetDisposition(NonZeroU32, proto::SocketDisposition, proto::SocketDisposition),
151}
152
153enum ShuttingDownHandle {
158 InUse(proto::HandleId, HandleState),
159 Ready(AnyHandle),
160}
161
162impl ShuttingDownHandle {
163 fn poll_ready(
164 &mut self,
165 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
166 ctx: &mut Context<'_>,
167 ) -> Poll<()> {
168 replace_with(self, |this| match this {
169 this @ ShuttingDownHandle::Ready(_) => this,
170 ShuttingDownHandle::InUse(hid, mut state) => {
171 state.poll(event_queue, ctx);
172
173 if state.write_queue.is_empty() {
174 while let Poll::Ready(op) = state.read_queue.pop_front(ctx) {
175 match op {
176 ReadOp::StreamingChannel(tid, start) => {
177 let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
178 id: hid.id,
179 }));
180 let event = if start {
181 FDomainEvent::ChannelStreamingReadStart(tid, err)
182 } else {
183 FDomainEvent::ChannelStreamingReadStop(tid, err)
184 };
185 event_queue.push_back(event.into());
186 }
187 ReadOp::StreamingSocket(tid, start) => {
188 let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
189 id: hid.id,
190 }));
191 let event = if start {
192 FDomainEvent::SocketStreamingReadStart(tid, err)
193 } else {
194 FDomainEvent::SocketStreamingReadStop(tid, err)
195 };
196 event_queue.push_back(event.into());
197 }
198 ReadOp::Channel(tid) => {
199 let err = state
200 .handle
201 .expected_type(fidl::ObjectType::CHANNEL)
202 .err()
203 .unwrap_or(proto::Error::ClosedDuringRead(
204 proto::ClosedDuringRead,
205 ));
206 event_queue
207 .push_back(FDomainEvent::ChannelData(tid, Err(err)).into());
208 }
209 ReadOp::Socket(tid, _max_bytes) => {
210 let err = state
211 .handle
212 .expected_type(fidl::ObjectType::SOCKET)
213 .err()
214 .unwrap_or(proto::Error::ClosedDuringRead(
215 proto::ClosedDuringRead,
216 ));
217 event_queue
218 .push_back(FDomainEvent::SocketData(tid, Err(err)).into());
219 }
220 }
221 }
222
223 if state.async_read_in_progress {
224 match &*state.handle {
225 AnyHandle::Channel(_) => event_queue.push_back(
226 FDomainEvent::ChannelStreamingData(
227 proto::ChannelOnChannelStreamingDataRequest {
228 handle: hid,
229 channel_sent: proto::ChannelSent::Stopped(
230 proto::AioStopped { error: None },
231 ),
232 },
233 )
234 .into(),
235 ),
236 AnyHandle::Socket(_) => event_queue.push_back(
237 FDomainEvent::SocketStreamingData(
238 proto::SocketOnSocketStreamingDataRequest {
239 handle: hid,
240 socket_message: proto::SocketMessage::Stopped(
241 proto::AioStopped { error: None },
242 ),
243 },
244 )
245 .into(),
246 ),
247 AnyHandle::EventPair(_)
248 | AnyHandle::Event(_)
249 | AnyHandle::Unknown(_) => unreachable!(),
250 }
251 }
252
253 state.signal_waiters.clear();
254 state.io_waiter = None;
255
256 ShuttingDownHandle::Ready(
257 Arc::into_inner(state.handle).expect("Unaccounted-for handle reference!"),
258 )
259 } else {
260 ShuttingDownHandle::InUse(hid, state)
261 }
262 }
263 });
264
265 if matches!(self, ShuttingDownHandle::Ready(_)) { Poll::Ready(()) } else { Poll::Pending }
266 }
267}
268
269enum HandlesToWrite {
273 SomeInUse(Vec<(ShuttingDownHandle, fidl::Rights)>),
274 AllReady(Vec<fidl::HandleDisposition<'static>>),
275}
276
277impl HandlesToWrite {
278 fn poll_ready(
279 &mut self,
280 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
281 ctx: &mut Context<'_>,
282 ) -> Poll<&mut Vec<fidl::HandleDisposition<'static>>> {
283 match self {
284 HandlesToWrite::AllReady(s) => Poll::Ready(s),
285 HandlesToWrite::SomeInUse(handles) => {
286 let mut ready = true;
287 for (handle, _) in handles.iter_mut() {
288 ready = ready && handle.poll_ready(event_queue, ctx).is_ready();
289 }
290
291 if !ready {
292 return Poll::Pending;
293 }
294
295 *self = HandlesToWrite::AllReady(
296 handles
297 .drain(..)
298 .map(|(handle, rights)| {
299 let ShuttingDownHandle::Ready(handle) = handle else { unreachable!() };
300
301 fidl::HandleDisposition::new(
302 fidl::HandleOp::Move(handle.into()),
303 fidl::ObjectType::NONE,
304 rights,
305 fidl::Status::OK,
306 )
307 })
308 .collect(),
309 );
310
311 let HandlesToWrite::AllReady(s) = self else { unreachable!() };
312 Poll::Ready(s)
313 }
314 }
315 }
316}
317
318struct AnyHandleRef(Arc<AnyHandle>);
319
320impl AsHandleRef for AnyHandleRef {
321 fn as_handle_ref(&self) -> fidl::HandleRef<'_> {
322 self.0.as_handle_ref()
323 }
324}
325
326#[cfg(target_os = "fuchsia")]
327type OnSignals = fasync::OnSignals<'static, AnyHandleRef>;
328
329#[cfg(not(target_os = "fuchsia"))]
330type OnSignals = fasync::OnSignalsRef<'static>;
331
332struct SignalWaiter {
335 tid: NonZeroU32,
336 waiter: Pin<Box<OnSignals>>,
337}
338
339struct HandleState {
341 handle: Arc<AnyHandle>,
343 hid: proto::HandleId,
345 is_datagram_socket: bool,
349 async_read_in_progress: bool,
354 read_queue: Queue<ReadOp>,
360 write_queue: Queue<WriteOp>,
367 signal_waiters: Vec<SignalWaiter>,
369 io_waiter: Option<Pin<Box<OnSignals>>>,
372}
373
374impl HandleState {
375 fn new(handle: AnyHandle, hid: proto::HandleId) -> Result<Self, proto::Error> {
376 let is_datagram_socket = match handle.is_datagram_socket() {
377 IsDatagramSocket::Unknown => {
378 return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown {
379 type_: proto::SocketType::unknown(),
380 }));
381 }
382 other => other.is_datagram(),
383 };
384 Ok(HandleState {
385 handle: Arc::new(handle),
386 hid,
387 async_read_in_progress: false,
388 is_datagram_socket,
389 read_queue: Queue::new(),
390 write_queue: Queue::new(),
391 signal_waiters: Vec::new(),
392 io_waiter: None,
393 })
394 }
395
396 fn poll(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>, ctx: &mut Context<'_>) {
399 self.signal_waiters.retain_mut(|x| {
400 let Poll::Ready(result) = x.waiter.poll_unpin(ctx) else {
401 return true;
402 };
403
404 event_queue.push_back(
405 FDomainEvent::WaitForSignals(
406 x.tid,
407 result
408 .map(|x| proto::FDomainWaitForSignalsResponse { signals: x.bits() })
409 .map_err(|e| proto::Error::TargetError(e.into_raw())),
410 )
411 .into(),
412 );
413
414 false
415 });
416
417 let read_signals = self.handle.read_signals();
418 let write_signals = self.handle.write_signals();
419
420 loop {
421 if let Some(signal_waiter) = self.io_waiter.as_mut() {
422 if let Poll::Ready(sigs) = signal_waiter.poll_unpin(ctx) {
423 if let Ok(sigs) = sigs {
424 if sigs.intersects(read_signals) {
425 self.process_read_queue(event_queue, ctx);
426 }
427 if sigs.intersects(write_signals) {
428 self.process_write_queue(event_queue, ctx);
429 }
430 }
431 } else {
432 let need_read = matches!(
433 self.read_queue.front_mut(ctx),
434 Poll::Ready(ReadOp::StreamingChannel(_, _) | ReadOp::StreamingSocket(_, _))
435 );
436 let need_write = matches!(
437 self.write_queue.front_mut(ctx),
438 Poll::Ready(WriteOp::SetDisposition(_, _, _))
439 );
440
441 self.process_read_queue(event_queue, ctx);
442 self.process_write_queue(event_queue, ctx);
443
444 if !(need_read || need_write) {
445 break;
446 }
447 }
448 }
449
450 let subscribed_signals =
451 if self.async_read_in_progress || !self.read_queue.is_empty() {
452 read_signals
453 } else {
454 fidl::Signals::NONE
455 } | if !self.write_queue.is_empty() { write_signals } else { fidl::Signals::NONE };
456
457 if !subscribed_signals.is_empty() {
458 self.io_waiter = Some(Box::pin(OnSignals::new(
459 AnyHandleRef(Arc::clone(&self.handle)),
460 subscribed_signals,
461 )));
462 } else {
463 self.io_waiter = None;
464 break;
465 }
466 }
467 }
468
469 fn try_enable_async_read(&mut self) -> Result<()> {
471 if self.async_read_in_progress {
472 Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress))
473 } else {
474 self.async_read_in_progress = true;
475 Ok(())
476 }
477 }
478
479 fn try_disable_async_read(&mut self) -> Result<()> {
481 if !self.async_read_in_progress {
482 Err(proto::Error::NoReadInProgress(proto::NoReadInProgress))
483 } else {
484 self.async_read_in_progress = false;
485 Ok(())
486 }
487 }
488
489 fn process_read_queue(
491 &mut self,
492 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
493 ctx: &mut Context<'_>,
494 ) {
495 while let Poll::Ready(op) = self.read_queue.front_mut(ctx) {
496 match op {
497 ReadOp::StreamingChannel(tid, true) => {
498 let tid = *tid;
499 let result = self.try_enable_async_read();
500 event_queue
501 .push_back(FDomainEvent::ChannelStreamingReadStart(tid, result).into());
502 self.read_queue.destroy_front();
503 }
504 ReadOp::StreamingChannel(tid, false) => {
505 let tid = *tid;
506 let result = self.try_disable_async_read();
507 event_queue
508 .push_back(FDomainEvent::ChannelStreamingReadStop(tid, result).into());
509 self.read_queue.destroy_front();
510 }
511 ReadOp::StreamingSocket(tid, true) => {
512 let tid = *tid;
513 let result = self.try_enable_async_read();
514 event_queue
515 .push_back(FDomainEvent::SocketStreamingReadStart(tid, result).into());
516 self.read_queue.destroy_front();
517 }
518 ReadOp::StreamingSocket(tid, false) => {
519 let tid = *tid;
520 let result = self.try_disable_async_read();
521 event_queue
522 .push_back(FDomainEvent::SocketStreamingReadStop(tid, result).into());
523 self.read_queue.destroy_front();
524 }
525 ReadOp::Socket(tid, max_bytes) => {
526 let (tid, max_bytes) = (*tid, *max_bytes);
527 if let Some(event) = self.do_read_socket(tid, max_bytes) {
528 let _ = self.read_queue.pop_front(ctx);
529 event_queue.push_back(event.into());
530 } else {
531 break;
532 }
533 }
534 ReadOp::Channel(tid) => {
535 let tid = *tid;
536 if let Some(event) = self.do_read_channel(tid) {
537 let _ = self.read_queue.pop_front(ctx);
538 event_queue.push_back(event.into());
539 } else {
540 break;
541 }
542 }
543 }
544 }
545
546 if self.async_read_in_progress {
547 assert!(self.read_queue.is_empty());
550 self.process_async_read(event_queue);
551 }
552 }
553
554 fn process_async_read(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>) {
555 assert!(self.async_read_in_progress);
556
557 match &*self.handle {
558 AnyHandle::Channel(_) => {
559 'read_loop: while let Some(result) = self.handle.read_channel().transpose() {
560 match result {
561 Ok(msg) => event_queue.push_back(
562 UnprocessedFDomainEvent::ChannelStreamingData(self.hid, msg),
563 ),
564 Err(e) => {
565 event_queue.push_back(
566 FDomainEvent::ChannelStreamingData(
567 proto::ChannelOnChannelStreamingDataRequest {
568 handle: self.hid,
569 channel_sent: proto::ChannelSent::Stopped(
570 proto::AioStopped { error: Some(Box::new(e)) },
571 ),
572 },
573 )
574 .into(),
575 );
576 self.async_read_in_progress = false;
577 break 'read_loop;
578 }
579 }
580 }
581 }
582
583 AnyHandle::Socket(_) => {
584 'read_loop: while let Some(result) =
585 self.handle.read_socket(ASYNC_READ_BUFSIZE).transpose()
586 {
587 match result {
588 Ok(data) => {
589 event_queue.push_back(
590 FDomainEvent::SocketStreamingData(
591 proto::SocketOnSocketStreamingDataRequest {
592 handle: self.hid,
593 socket_message: proto::SocketMessage::Data(
594 proto::SocketData {
595 data,
596 is_datagram: self.is_datagram_socket,
597 },
598 ),
599 },
600 )
601 .into(),
602 );
603 }
604 Err(e) => {
605 event_queue.push_back(
606 FDomainEvent::SocketStreamingData(
607 proto::SocketOnSocketStreamingDataRequest {
608 handle: self.hid,
609 socket_message: proto::SocketMessage::Stopped(
610 proto::AioStopped { error: Some(Box::new(e)) },
611 ),
612 },
613 )
614 .into(),
615 );
616 self.async_read_in_progress = false;
617 break 'read_loop;
618 }
619 }
620 }
621 }
622
623 _ => unreachable!("Processed async read for unreadable handle type!"),
624 }
625 }
626
627 fn process_write_queue(
629 &mut self,
630 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
631 ctx: &mut Context<'_>,
632 ) {
633 while let Poll::Ready(op) = self.write_queue.pop_front(ctx) {
638 match op {
639 WriteOp::Socket(mut op) => {
640 if let Some(event) = self.do_write_socket(&mut op) {
641 event_queue.push_back(event.into());
642 } else {
643 self.write_queue.push_front_no_wake(WriteOp::Socket(op));
644 break;
645 }
646 }
647 WriteOp::SetDisposition(tid, disposition, disposition_peer) => {
648 let result = { self.handle.socket_disposition(disposition, disposition_peer) };
649 event_queue.push_back(FDomainEvent::SocketDispositionSet(tid, result).into())
650 }
651 WriteOp::Channel(tid, data, mut handles) => {
652 if self
653 .do_write_channel(tid, &data, &mut handles, event_queue, ctx)
654 .is_pending()
655 {
656 self.write_queue.push_front_no_wake(WriteOp::Channel(tid, data, handles));
657 break;
658 }
659 }
660 }
661 }
662 }
663
664 fn do_read_socket(&mut self, tid: NonZeroU32, max_bytes: u64) -> Option<FDomainEvent> {
668 if self.async_read_in_progress {
669 return Some(
670 FDomainEvent::SocketData(
671 tid,
672 Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
673 )
674 .into(),
675 );
676 }
677
678 let max_bytes = if self.is_datagram_socket {
679 let AnyHandle::Socket(s) = &*self.handle else {
680 unreachable!("Read socket from state that wasn't for a socket!");
681 };
682 match s.info() {
683 Ok(x) => x.rx_buf_available as u64,
684 Err(e) => {
690 return Some(FDomainEvent::SocketData(
691 tid,
692 Err(proto::Error::TargetError(e.into_raw())),
693 ));
694 }
695 }
696 } else {
697 max_bytes
698 };
699 self.handle.read_socket(max_bytes).transpose().map(|x| {
700 FDomainEvent::SocketData(
701 tid,
702 x.map(|data| proto::SocketData { data, is_datagram: self.is_datagram_socket }),
703 )
704 })
705 }
706
707 fn do_write_socket(&mut self, op: &mut SocketWrite) -> Option<FDomainEvent> {
711 match self.handle.write_socket(&op.to_write) {
712 Ok(wrote) => {
713 op.wrote += wrote;
714 op.to_write.drain(..wrote);
715
716 if op.to_write.is_empty() {
717 Some(FDomainEvent::WroteSocket(
718 op.tid,
719 Ok(proto::SocketWriteSocketResponse {
720 wrote: op.wrote.try_into().unwrap(),
721 }),
722 ))
723 } else {
724 None
725 }
726 }
727 Err(error) => Some(FDomainEvent::WroteSocket(
728 op.tid,
729 Err(proto::WriteSocketError { error, wrote: op.wrote.try_into().unwrap() }),
730 )),
731 }
732 }
733
734 fn do_write_channel(
738 &mut self,
739 tid: NonZeroU32,
740 data: &[u8],
741 handles: &mut HandlesToWrite,
742 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
743 ctx: &mut Context<'_>,
744 ) -> Poll<()> {
745 let Poll::Ready(handles) = handles.poll_ready(event_queue, ctx) else {
746 return Poll::Pending;
747 };
748
749 let ret = self.handle.write_channel(data, handles);
750 if let Some(ret) = ret {
751 event_queue.push_back(FDomainEvent::WroteChannel(tid, ret).into())
752 }
753 Poll::Ready(())
754 }
755
756 fn do_read_channel(&mut self, tid: NonZeroU32) -> Option<UnprocessedFDomainEvent> {
760 if self.async_read_in_progress {
761 return Some(
762 FDomainEvent::ChannelData(
763 tid,
764 Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
765 )
766 .into(),
767 );
768 }
769 match self.handle.read_channel() {
770 Ok(x) => x.map(|x| UnprocessedFDomainEvent::ChannelData(tid, x)),
771 Err(e) => Some(FDomainEvent::ChannelData(tid, Err(e)).into()),
772 }
773 }
774}
775
776struct ClosingHandle {
779 action: Arc<CloseAction>,
780 state: Option<ShuttingDownHandle>,
781}
782
783impl ClosingHandle {
784 fn poll_ready(&mut self, fdomain: &mut FDomain, ctx: &mut Context<'_>) -> Poll<()> {
785 if let Some(state) = self.state.as_mut() {
786 if state.poll_ready(&mut fdomain.event_queue, ctx).is_ready() {
787 let state = self.state.take().unwrap();
788 let ShuttingDownHandle::Ready(handle) = state else {
789 unreachable!();
790 };
791 self.action.perform(fdomain, handle);
792 Poll::Ready(())
793 } else {
794 Poll::Pending
795 }
796 } else {
797 Poll::Ready(())
798 }
799 }
800}
801
802enum CloseAction {
807 Close { tid: NonZeroU32, count: AtomicU32, result: Result<()> },
808 Replace { tid: NonZeroU32, new_hid: proto::NewHandleId, rights: fidl::Rights },
809}
810
811impl CloseAction {
812 fn perform(&self, fdomain: &mut FDomain, handle: AnyHandle) {
813 match self {
814 CloseAction::Close { tid, count, result } => {
815 if count.fetch_sub(1, Ordering::Relaxed) == 1 {
816 fdomain
817 .event_queue
818 .push_back(FDomainEvent::ClosedHandle(*tid, result.clone()).into());
819 }
820 }
821 CloseAction::Replace { tid, new_hid, rights } => {
822 let result = handle
823 .replace(*rights)
824 .and_then(|handle| fdomain.alloc_client_handles([*new_hid], [handle]));
825 fdomain.event_queue.push_back(FDomainEvent::ReplacedHandle(*tid, result).into());
826 }
827 }
828 }
829}
830
831#[pin_project::pin_project]
836pub struct FDomain {
837 namespace: Box<dyn Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send>,
838 handles: HashMap<proto::HandleId, HandleState>,
839 closing_handles: Vec<ClosingHandle>,
840 event_queue: VecDeque<UnprocessedFDomainEvent>,
841 waker: Option<Waker>,
842}
843
844impl FDomain {
845 pub fn new_empty() -> Self {
848 Self::new(|| Err(fidl::Status::NOT_FOUND))
849 }
850
851 pub fn new(
853 namespace: impl Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send + 'static,
854 ) -> Self {
855 FDomain {
856 namespace: Box::new(namespace),
857 handles: HashMap::new(),
858 closing_handles: Vec::new(),
859 event_queue: VecDeque::new(),
860 waker: None,
861 }
862 }
863
864 fn push_event(&mut self, event: impl Into<UnprocessedFDomainEvent>) {
866 self.event_queue.push_back(event.into());
867 self.waker.take().map(Waker::wake);
868 }
869
870 fn process_message(
874 &mut self,
875 message: fidl::MessageBufEtc,
876 ) -> Result<proto::ChannelMessage, proto::Error> {
877 let (data, handles) = message.split();
878 let handles = handles
879 .into_iter()
880 .map(|info| {
881 let type_ = info.object_type;
882
883 let handle = match info.object_type {
884 fidl::ObjectType::CHANNEL => {
885 AnyHandle::Channel(fidl::Channel::from(info.handle))
886 }
887 fidl::ObjectType::SOCKET => AnyHandle::Socket(fidl::Socket::from(info.handle)),
888 fidl::ObjectType::EVENTPAIR => {
889 AnyHandle::EventPair(fidl::EventPair::from(info.handle))
890 }
891 fidl::ObjectType::EVENT => AnyHandle::Event(fidl::Event::from(info.handle)),
892 _ => AnyHandle::Unknown(handles::Unknown(info.handle, info.object_type)),
893 };
894
895 Ok(proto::HandleInfo {
896 rights: info.rights,
897 handle: self.alloc_fdomain_handle(handle)?,
898 type_,
899 })
900 })
901 .collect::<Result<Vec<_>, proto::Error>>()?;
902
903 Ok(proto::ChannelMessage { data, handles })
904 }
905
906 fn alloc_client_handles<const N: usize>(
913 &mut self,
914 ids: [proto::NewHandleId; N],
915 handles: [AnyHandle; N],
916 ) -> Result<(), proto::Error> {
917 for id in ids {
918 if id.id & (1 << 31) != 0 {
919 return Err(proto::Error::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange {
920 id: id.id,
921 }));
922 }
923
924 if self.handles.contains_key(&proto::HandleId { id: id.id }) {
925 return Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
926 id: id.id,
927 same_call: false,
928 }));
929 }
930 }
931
932 let mut sorted_ids = ids;
933 sorted_ids.sort();
934
935 if let Some([a, _]) = sorted_ids.array_windows().find(|&[a, b]| a == b) {
936 Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
937 id: a.id,
938 same_call: true,
939 }))
940 } else {
941 let ids = ids.into_iter().map(|id| proto::HandleId { id: id.id });
942 let handles = ids
943 .zip(handles.into_iter())
944 .map(|(id, h)| HandleState::new(h, id).map(|x| (id, x)))
945 .collect::<Result<Vec<_>, proto::Error>>()?;
946
947 self.handles.extend(handles);
948
949 Ok(())
950 }
951 }
952
953 fn alloc_fdomain_handle(&mut self, handle: AnyHandle) -> Result<proto::HandleId, proto::Error> {
956 loop {
957 let id = proto::HandleId { id: rand::random::<u32>() | (1u32 << 31) };
958 if let Entry::Vacant(v) = self.handles.entry(id) {
959 v.insert(HandleState::new(handle, id)?);
960 break Ok(id);
961 }
962 }
963 }
964
965 fn take_handle(&mut self, handle: proto::HandleId) -> Result<HandleState, proto::Error> {
967 self.handles
968 .remove(&handle)
969 .ok_or(proto::Error::BadHandleId(proto::BadHandleId { id: handle.id }))
970 }
971
972 fn using_handle<T>(
974 &mut self,
975 id: proto::HandleId,
976 f: impl FnOnce(&mut HandleState) -> Result<T, proto::Error>,
977 ) -> Result<T, proto::Error> {
978 if let Some(s) = self.handles.get_mut(&id) {
979 f(s)
980 } else {
981 Err(proto::Error::BadHandleId(proto::BadHandleId { id: id.id }))
982 }
983 }
984
985 pub fn get_namespace(&mut self, request: proto::FDomainGetNamespaceRequest) -> Result<()> {
986 match (self.namespace)() {
987 Ok(endpoint) => self.alloc_client_handles(
988 [request.new_handle],
989 [AnyHandle::Channel(endpoint.into_channel())],
990 ),
991 Err(e) => Err(proto::Error::TargetError(e.into_raw())),
992 }
993 }
994
995 pub fn create_channel(&mut self, request: proto::ChannelCreateChannelRequest) -> Result<()> {
996 let (a, b) = fidl::Channel::create();
997 self.alloc_client_handles(request.handles, [AnyHandle::Channel(a), AnyHandle::Channel(b)])
998 }
999
1000 pub fn create_socket(&mut self, request: proto::SocketCreateSocketRequest) -> Result<()> {
1001 let (a, b) = match request.options {
1002 proto::SocketType::Stream => fidl::Socket::create_stream(),
1003 proto::SocketType::Datagram => fidl::Socket::create_datagram(),
1004 type_ => {
1005 return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown { type_ }));
1006 }
1007 };
1008
1009 self.alloc_client_handles(request.handles, [AnyHandle::Socket(a), AnyHandle::Socket(b)])
1010 }
1011
1012 pub fn create_event_pair(
1013 &mut self,
1014 request: proto::EventPairCreateEventPairRequest,
1015 ) -> Result<()> {
1016 let (a, b) = fidl::EventPair::create();
1017 self.alloc_client_handles(
1018 request.handles,
1019 [AnyHandle::EventPair(a), AnyHandle::EventPair(b)],
1020 )
1021 }
1022
1023 pub fn create_event(&mut self, request: proto::EventCreateEventRequest) -> Result<()> {
1024 let a = fidl::Event::create();
1025 self.alloc_client_handles([request.handle], [AnyHandle::Event(a)])
1026 }
1027
1028 pub fn set_socket_disposition(
1029 &mut self,
1030 tid: NonZeroU32,
1031 request: proto::SocketSetSocketDispositionRequest,
1032 ) {
1033 if let Err(err) = self.using_handle(request.handle, |h| {
1034 h.write_queue.push_back(WriteOp::SetDisposition(
1035 tid,
1036 request.disposition,
1037 request.disposition_peer,
1038 ));
1039 Ok(())
1040 }) {
1041 self.push_event(FDomainEvent::SocketDispositionSet(tid, Err(err)));
1042 }
1043 }
1044
1045 pub fn read_socket(&mut self, tid: NonZeroU32, request: proto::SocketReadSocketRequest) {
1046 if let Err(e) = self.using_handle(request.handle, |h| {
1047 h.read_queue.push_back(ReadOp::Socket(tid, request.max_bytes));
1048 Ok(())
1049 }) {
1050 self.push_event(FDomainEvent::SocketData(tid, Err(e)));
1051 }
1052 }
1053
1054 pub fn read_channel(&mut self, tid: NonZeroU32, request: proto::ChannelReadChannelRequest) {
1055 if let Err(e) = self.using_handle(request.handle, |h| {
1056 h.read_queue.push_back(ReadOp::Channel(tid));
1057 Ok(())
1058 }) {
1059 self.push_event(FDomainEvent::ChannelData(tid, Err(e)));
1060 }
1061 }
1062
1063 pub fn write_socket(&mut self, tid: NonZeroU32, request: proto::SocketWriteSocketRequest) {
1064 if let Err(error) = self.using_handle(request.handle, |h| {
1065 h.write_queue.push_back(WriteOp::Socket(SocketWrite {
1066 tid,
1067 wrote: 0,
1068 to_write: request.data,
1069 }));
1070 Ok(())
1071 }) {
1072 self.push_event(FDomainEvent::WroteSocket(
1073 tid,
1074 Err(proto::WriteSocketError { error, wrote: 0 }),
1075 ));
1076 }
1077 }
1078
1079 pub fn write_channel(&mut self, tid: NonZeroU32, request: proto::ChannelWriteChannelRequest) {
1080 let handles: Vec<Result<(ShuttingDownHandle, fidl::Rights)>> = match request.handles {
1088 proto::Handles::Handles(h) => h
1089 .into_iter()
1090 .map(|h| {
1091 if h != request.handle {
1092 self.take_handle(h).map(|handle_state| {
1093 (ShuttingDownHandle::InUse(h, handle_state), fidl::Rights::SAME_RIGHTS)
1094 })
1095 } else {
1096 Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1097 }
1098 })
1099 .collect(),
1100 proto::Handles::Dispositions(d) => d
1101 .into_iter()
1102 .map(|d| {
1103 let res = match d.handle {
1104 proto::HandleOp::Move_(h) => {
1105 if h != request.handle {
1106 self.take_handle(h).map(|x| ShuttingDownHandle::InUse(h, x))
1107 } else {
1108 Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1109 }
1110 }
1111 proto::HandleOp::Duplicate(h) => {
1112 if h != request.handle {
1113 self.using_handle(h, |h| {
1119 h.handle.duplicate(fidl::Rights::SAME_RIGHTS)
1120 })
1121 .map(ShuttingDownHandle::Ready)
1122 } else {
1123 Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1124 }
1125 }
1126 };
1127
1128 res.and_then(|x| Ok((x, d.rights)))
1129 })
1130 .collect(),
1131 };
1132
1133 if handles.iter().any(|x| x.is_err()) {
1134 let e = handles.into_iter().map(|x| x.err().map(Box::new)).collect();
1135
1136 self.push_event(FDomainEvent::WroteChannel(
1137 tid,
1138 Err(proto::WriteChannelError::OpErrors(e)),
1139 ));
1140 return;
1141 }
1142
1143 let handles = handles.into_iter().map(|x| x.unwrap()).collect::<Vec<_>>();
1144
1145 if let Err(e) = self.using_handle(request.handle, |h| {
1146 h.write_queue.push_back(WriteOp::Channel(
1147 tid,
1148 request.data,
1149 HandlesToWrite::SomeInUse(handles),
1150 ));
1151 Ok(())
1152 }) {
1153 self.push_event(FDomainEvent::WroteChannel(
1154 tid,
1155 Err(proto::WriteChannelError::Error(e)),
1156 ));
1157 }
1158 }
1159
1160 pub fn wait_for_signals(
1161 &mut self,
1162 tid: NonZeroU32,
1163 request: proto::FDomainWaitForSignalsRequest,
1164 ) {
1165 let result = self.using_handle(request.handle, |h| {
1166 let signals = fidl::Signals::from_bits_retain(request.signals);
1167 h.signal_waiters.push(SignalWaiter {
1168 tid,
1169 waiter: Box::pin(OnSignals::new(AnyHandleRef(Arc::clone(&h.handle)), signals)),
1170 });
1171 Ok(())
1172 });
1173
1174 if let Err(e) = result {
1175 self.push_event(FDomainEvent::WaitForSignals(tid, Err(e)));
1176 } else {
1177 self.waker.take().map(Waker::wake);
1178 }
1179 }
1180
1181 pub fn close(&mut self, tid: NonZeroU32, request: proto::FDomainCloseRequest) {
1182 let mut states = Vec::with_capacity(request.handles.len());
1183 let mut result = Ok(());
1184 for hid in request.handles {
1185 match self.take_handle(hid) {
1186 Ok(state) => states.push((hid, state)),
1187
1188 Err(e) => {
1189 result = result.and(Err(e));
1190 }
1191 }
1192 }
1193
1194 let action = Arc::new(CloseAction::Close {
1195 tid,
1196 count: AtomicU32::new(states.len().try_into().unwrap()),
1197 result,
1198 });
1199
1200 for (hid, state) in states {
1201 self.closing_handles.push(ClosingHandle {
1202 action: Arc::clone(&action),
1203 state: Some(ShuttingDownHandle::InUse(hid, state)),
1204 });
1205 }
1206 }
1207
1208 pub fn duplicate(&mut self, request: proto::FDomainDuplicateRequest) -> Result<()> {
1209 let rights = request.rights;
1210 let handle = self.using_handle(request.handle, |h| h.handle.duplicate(rights));
1211 handle.and_then(|h| self.alloc_client_handles([request.new_handle], [h]))
1212 }
1213
1214 pub fn replace(
1215 &mut self,
1216 tid: NonZeroU32,
1217 request: proto::FDomainReplaceRequest,
1218 ) -> Result<()> {
1219 let rights = request.rights;
1220 let new_hid = request.new_handle;
1221 match self.take_handle(request.handle) {
1222 Ok(state) => self.closing_handles.push(ClosingHandle {
1223 action: Arc::new(CloseAction::Replace { tid, new_hid, rights }),
1224 state: Some(ShuttingDownHandle::InUse(request.handle, state)),
1225 }),
1226 Err(e) => self.event_queue.push_back(UnprocessedFDomainEvent::Ready(
1227 FDomainEvent::ReplacedHandle(tid, Err(e)),
1228 )),
1229 }
1230
1231 Ok(())
1232 }
1233
1234 pub fn signal(&mut self, request: proto::FDomainSignalRequest) -> Result<()> {
1235 let set = fidl::Signals::from_bits_retain(request.set);
1236 let clear = fidl::Signals::from_bits_retain(request.clear);
1237
1238 self.using_handle(request.handle, |h| h.handle.signal(clear, set))
1239 }
1240
1241 pub fn signal_peer(&mut self, request: proto::FDomainSignalPeerRequest) -> Result<()> {
1242 let set = fidl::Signals::from_bits_retain(request.set);
1243 let clear = fidl::Signals::from_bits_retain(request.clear);
1244
1245 self.using_handle(request.handle, |h| h.handle.signal_peer(clear, set))
1246 }
1247
1248 pub fn get_koid(
1249 &mut self,
1250 request: proto::FDomainGetKoidRequest,
1251 ) -> Result<proto::FDomainGetKoidResponse> {
1252 self.using_handle(request.handle, |h| {
1253 h.handle
1254 .as_handle_ref()
1255 .koid()
1256 .map(|k| proto::FDomainGetKoidResponse { koid: k.raw_koid() })
1257 .map_err(|e| proto::Error::TargetError(e.into_raw()))
1258 })
1259 }
1260
1261 pub fn read_channel_streaming_start(
1262 &mut self,
1263 tid: NonZeroU32,
1264 request: proto::ChannelReadChannelStreamingStartRequest,
1265 ) {
1266 if let Err(err) = self.using_handle(request.handle, |h| {
1267 h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1268 h.read_queue.push_back(ReadOp::StreamingChannel(tid, true));
1269 Ok(())
1270 }) {
1271 self.event_queue
1272 .push_back(FDomainEvent::ChannelStreamingReadStart(tid, Err(err)).into())
1273 }
1274 }
1275
1276 pub fn read_channel_streaming_stop(
1277 &mut self,
1278 tid: NonZeroU32,
1279 request: proto::ChannelReadChannelStreamingStopRequest,
1280 ) {
1281 if let Err(err) = self.using_handle(request.handle, |h| {
1282 h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1283 h.read_queue.push_back(ReadOp::StreamingChannel(tid, false));
1284 Ok(())
1285 }) {
1286 self.event_queue.push_back(FDomainEvent::ChannelStreamingReadStop(tid, Err(err)).into())
1287 }
1288 }
1289
1290 pub fn read_socket_streaming_start(
1291 &mut self,
1292 tid: NonZeroU32,
1293 request: proto::SocketReadSocketStreamingStartRequest,
1294 ) {
1295 if let Err(err) = self.using_handle(request.handle, |h| {
1296 h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1297 h.read_queue.push_back(ReadOp::StreamingSocket(tid, true));
1298 Ok(())
1299 }) {
1300 self.event_queue.push_back(FDomainEvent::SocketStreamingReadStart(tid, Err(err)).into())
1301 }
1302 }
1303
1304 pub fn read_socket_streaming_stop(
1305 &mut self,
1306 tid: NonZeroU32,
1307 request: proto::SocketReadSocketStreamingStopRequest,
1308 ) {
1309 if let Err(err) = self.using_handle(request.handle, |h| {
1310 h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1311 h.read_queue.push_back(ReadOp::StreamingSocket(tid, false));
1312 Ok(())
1313 }) {
1314 self.event_queue.push_back(FDomainEvent::SocketStreamingReadStop(tid, Err(err)).into())
1315 }
1316 }
1317}
1318
1319impl futures::Stream for FDomain {
1322 type Item = FDomainEvent;
1323
1324 fn poll_next(
1325 mut self: std::pin::Pin<&mut Self>,
1326 ctx: &mut Context<'_>,
1327 ) -> Poll<Option<Self::Item>> {
1328 let this = &mut *self;
1329
1330 let mut closing_handles = std::mem::replace(&mut this.closing_handles, Vec::new());
1331 closing_handles.retain_mut(|x| x.poll_ready(this, ctx).is_pending());
1332 this.closing_handles = closing_handles;
1333
1334 let handles = &mut this.handles;
1335 let event_queue = &mut this.event_queue;
1336 for state in handles.values_mut() {
1337 state.poll(event_queue, ctx);
1338 }
1339
1340 if let Some(event) = self.event_queue.pop_front() {
1341 match event {
1342 UnprocessedFDomainEvent::Ready(event) => Poll::Ready(Some(event)),
1343 UnprocessedFDomainEvent::ChannelData(tid, message) => {
1344 Poll::Ready(Some(FDomainEvent::ChannelData(tid, self.process_message(message))))
1345 }
1346 UnprocessedFDomainEvent::ChannelStreamingData(hid, message) => {
1347 match self.process_message(message) {
1348 Ok(message) => Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1349 proto::ChannelOnChannelStreamingDataRequest {
1350 handle: hid,
1351 channel_sent: proto::ChannelSent::Message(message),
1352 },
1353 ))),
1354 Err(e) => {
1355 self.handles.get_mut(&hid).unwrap().async_read_in_progress = false;
1356 Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1357 proto::ChannelOnChannelStreamingDataRequest {
1358 handle: hid,
1359 channel_sent: proto::ChannelSent::Stopped(proto::AioStopped {
1360 error: Some(Box::new(e)),
1361 }),
1362 },
1363 )))
1364 }
1365 }
1366 }
1367 }
1368 } else {
1369 self.waker = Some(ctx.waker().clone());
1370 Poll::Pending
1371 }
1372 }
1373}