1use fidl::AsHandleRef;
6use fidl::endpoints::ClientEnd;
7use futures::prelude::*;
8use replace_with::replace_with;
9use std::collections::hash_map::Entry;
10use std::collections::{HashMap, VecDeque};
11use std::num::NonZeroU32;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicU32, Ordering};
14use std::task::{Context, Poll, Waker};
15use {fidl_fuchsia_fdomain as proto, fidl_fuchsia_io as fio, fuchsia_async as fasync};
16
17mod handles;
18pub mod wire;
19
20#[cfg(test)]
21mod test;
22
23pub type Result<T, E = proto::Error> = std::result::Result<T, E>;
24
25use handles::{AnyHandle, HandleType as _, IsDatagramSocket};
26
27struct Queue<T>(VecDeque<T>, Option<Waker>);
30
31impl<T> Queue<T> {
32 fn new() -> Self {
34 Queue(VecDeque::new(), None)
35 }
36
37 fn is_empty(&self) -> bool {
39 self.0.is_empty()
40 }
41
42 fn destroy_front(&mut self) {
47 assert!(self.0.pop_front().is_some(), "Expected to find a value!");
48 }
49
50 fn pop_front(&mut self, ctx: &mut Context<'_>) -> Poll<T> {
52 if let Some(t) = self.0.pop_front() {
53 Poll::Ready(t)
54 } else {
55 self.1 = Some(ctx.waker().clone());
56 Poll::Pending
57 }
58 }
59
60 fn push_front_no_wake(&mut self, t: T) {
67 self.0.push_front(t)
68 }
69
70 fn push_back(&mut self, t: T) {
72 self.0.push_back(t);
73 self.1.take().map(Waker::wake);
74 }
75
76 fn front_mut(&mut self, ctx: &mut Context<'_>) -> Poll<&mut T> {
78 if let Some(t) = self.0.front_mut() {
79 Poll::Ready(t)
80 } else {
81 self.1 = Some(ctx.waker().clone());
82 Poll::Pending
83 }
84 }
85}
86
87const ASYNC_READ_BUFSIZE: u64 = 40960;
89
90#[derive(Debug)]
92pub enum FDomainEvent {
93 ChannelStreamingReadStart(NonZeroU32, Result<()>),
94 ChannelStreamingReadStop(NonZeroU32, Result<()>),
95 SocketStreamingReadStart(NonZeroU32, Result<()>),
96 SocketStreamingReadStop(NonZeroU32, Result<()>),
97 WaitForSignals(NonZeroU32, Result<proto::FDomainWaitForSignalsResponse>),
98 SocketData(NonZeroU32, Result<proto::SocketData>),
99 SocketStreamingData(proto::SocketOnSocketStreamingDataRequest),
100 SocketDispositionSet(NonZeroU32, Result<()>),
101 WroteSocket(NonZeroU32, Result<proto::SocketWriteSocketResponse, proto::WriteSocketError>),
102 ChannelData(NonZeroU32, Result<proto::ChannelMessage>),
103 ChannelStreamingData(proto::ChannelOnChannelStreamingDataRequest),
104 WroteChannel(NonZeroU32, Result<(), proto::WriteChannelError>),
105 ClosedHandle(NonZeroU32, Result<()>),
106 ReplacedHandle(NonZeroU32, Result<()>),
107}
108
109enum UnprocessedFDomainEvent {
113 Ready(FDomainEvent),
114 ChannelData(NonZeroU32, fidl::MessageBufEtc),
115 ChannelStreamingData(proto::HandleId, fidl::MessageBufEtc),
116}
117
118impl From<FDomainEvent> for UnprocessedFDomainEvent {
119 fn from(other: FDomainEvent) -> UnprocessedFDomainEvent {
120 UnprocessedFDomainEvent::Ready(other)
121 }
122}
123
124enum ReadOp {
126 StreamingChannel(NonZeroU32, bool),
128 StreamingSocket(NonZeroU32, bool),
130 Socket(NonZeroU32, u64),
131 Channel(NonZeroU32),
132}
133
134struct SocketWrite {
138 tid: NonZeroU32,
139 wrote: usize,
140 to_write: Vec<u8>,
141}
142
143enum WriteOp {
145 Socket(SocketWrite),
146 Channel(NonZeroU32, Vec<u8>, HandlesToWrite),
147 SetDisposition(NonZeroU32, proto::SocketDisposition, proto::SocketDisposition),
148}
149
150enum ShuttingDownHandle {
155 InUse(proto::HandleId, HandleState),
156 Ready(AnyHandle),
157}
158
159impl ShuttingDownHandle {
160 fn poll_ready(
161 &mut self,
162 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
163 ctx: &mut Context<'_>,
164 ) -> Poll<()> {
165 replace_with(self, |this| match this {
166 this @ ShuttingDownHandle::Ready(_) => this,
167 ShuttingDownHandle::InUse(hid, mut state) => {
168 state.poll(event_queue, ctx);
169
170 if state.write_queue.is_empty() {
171 while let Poll::Ready(op) = state.read_queue.pop_front(ctx) {
172 match op {
173 ReadOp::StreamingChannel(tid, start) => {
174 let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
175 id: hid.id,
176 }));
177 let event = if start {
178 FDomainEvent::ChannelStreamingReadStart(tid, err)
179 } else {
180 FDomainEvent::ChannelStreamingReadStop(tid, err)
181 };
182 event_queue.push_back(event.into());
183 }
184 ReadOp::StreamingSocket(tid, start) => {
185 let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
186 id: hid.id,
187 }));
188 let event = if start {
189 FDomainEvent::SocketStreamingReadStart(tid, err)
190 } else {
191 FDomainEvent::SocketStreamingReadStop(tid, err)
192 };
193 event_queue.push_back(event.into());
194 }
195 ReadOp::Channel(tid) => {
196 let err = state
197 .handle
198 .expected_type(fidl::ObjectType::CHANNEL)
199 .err()
200 .unwrap_or(proto::Error::ClosedDuringRead(
201 proto::ClosedDuringRead,
202 ));
203 event_queue
204 .push_back(FDomainEvent::ChannelData(tid, Err(err)).into());
205 }
206 ReadOp::Socket(tid, _max_bytes) => {
207 let err = state
208 .handle
209 .expected_type(fidl::ObjectType::SOCKET)
210 .err()
211 .unwrap_or(proto::Error::ClosedDuringRead(
212 proto::ClosedDuringRead,
213 ));
214 event_queue
215 .push_back(FDomainEvent::SocketData(tid, Err(err)).into());
216 }
217 }
218 }
219
220 if state.async_read_in_progress {
221 match &*state.handle {
222 AnyHandle::Channel(_) => event_queue.push_back(
223 FDomainEvent::ChannelStreamingData(
224 proto::ChannelOnChannelStreamingDataRequest {
225 handle: hid,
226 channel_sent: proto::ChannelSent::Stopped(
227 proto::AioStopped { error: None },
228 ),
229 },
230 )
231 .into(),
232 ),
233 AnyHandle::Socket(_) => event_queue.push_back(
234 FDomainEvent::SocketStreamingData(
235 proto::SocketOnSocketStreamingDataRequest {
236 handle: hid,
237 socket_message: proto::SocketMessage::Stopped(
238 proto::AioStopped { error: None },
239 ),
240 },
241 )
242 .into(),
243 ),
244 AnyHandle::EventPair(_)
245 | AnyHandle::Event(_)
246 | AnyHandle::Unknown(_) => unreachable!(),
247 }
248 }
249
250 state.signal_waiters.clear();
251 state.io_waiter = None;
252
253 ShuttingDownHandle::Ready(
254 Arc::into_inner(state.handle).expect("Unaccounted-for handle reference!"),
255 )
256 } else {
257 ShuttingDownHandle::InUse(hid, state)
258 }
259 }
260 });
261
262 if matches!(self, ShuttingDownHandle::Ready(_)) { Poll::Ready(()) } else { Poll::Pending }
263 }
264}
265
266enum HandlesToWrite {
270 SomeInUse(Vec<(ShuttingDownHandle, fidl::Rights)>),
271 AllReady(Vec<fidl::HandleDisposition<'static>>),
272}
273
274impl HandlesToWrite {
275 fn poll_ready(
276 &mut self,
277 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
278 ctx: &mut Context<'_>,
279 ) -> Poll<&mut Vec<fidl::HandleDisposition<'static>>> {
280 match self {
281 HandlesToWrite::AllReady(s) => Poll::Ready(s),
282 HandlesToWrite::SomeInUse(handles) => {
283 let mut ready = true;
284 for (handle, _) in handles.iter_mut() {
285 ready = ready && handle.poll_ready(event_queue, ctx).is_ready();
286 }
287
288 if !ready {
289 return Poll::Pending;
290 }
291
292 *self = HandlesToWrite::AllReady(
293 handles
294 .drain(..)
295 .map(|(handle, rights)| {
296 let ShuttingDownHandle::Ready(handle) = handle else { unreachable!() };
297
298 fidl::HandleDisposition::new(
299 fidl::HandleOp::Move(handle.into()),
300 fidl::ObjectType::NONE,
301 rights,
302 fidl::Status::OK,
303 )
304 })
305 .collect(),
306 );
307
308 let HandlesToWrite::AllReady(s) = self else { unreachable!() };
309 Poll::Ready(s)
310 }
311 }
312 }
313}
314
315struct AnyHandleRef(Arc<AnyHandle>);
316
317impl AsHandleRef for AnyHandleRef {
318 fn as_handle_ref(&self) -> fidl::HandleRef<'_> {
319 self.0.as_handle_ref()
320 }
321}
322
323#[cfg(target_os = "fuchsia")]
324type OnSignals = fasync::OnSignals<'static, AnyHandleRef>;
325
326#[cfg(not(target_os = "fuchsia"))]
327type OnSignals = fasync::OnSignalsRef<'static>;
328
329struct SignalWaiter {
332 tid: NonZeroU32,
333 waiter: OnSignals,
334}
335
336struct HandleState {
338 handle: Arc<AnyHandle>,
340 hid: proto::HandleId,
342 is_datagram_socket: bool,
346 async_read_in_progress: bool,
351 read_queue: Queue<ReadOp>,
357 write_queue: Queue<WriteOp>,
364 signal_waiters: Vec<SignalWaiter>,
366 io_waiter: Option<OnSignals>,
369}
370
371impl HandleState {
372 fn new(handle: AnyHandle, hid: proto::HandleId) -> Result<Self, proto::Error> {
373 let is_datagram_socket = match handle.is_datagram_socket() {
374 IsDatagramSocket::Unknown => {
375 return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown {
376 type_: proto::SocketType::unknown(),
377 }));
378 }
379 other => other.is_datagram(),
380 };
381 Ok(HandleState {
382 handle: Arc::new(handle),
383 hid,
384 async_read_in_progress: false,
385 is_datagram_socket,
386 read_queue: Queue::new(),
387 write_queue: Queue::new(),
388 signal_waiters: Vec::new(),
389 io_waiter: None,
390 })
391 }
392
393 fn poll(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>, ctx: &mut Context<'_>) {
396 self.signal_waiters.retain_mut(|x| {
397 let Poll::Ready(result) = x.waiter.poll_unpin(ctx) else {
398 return true;
399 };
400
401 event_queue.push_back(
402 FDomainEvent::WaitForSignals(
403 x.tid,
404 result
405 .map(|x| proto::FDomainWaitForSignalsResponse { signals: x.bits() })
406 .map_err(|e| proto::Error::TargetError(e.into_raw())),
407 )
408 .into(),
409 );
410
411 false
412 });
413
414 let read_signals = self.handle.read_signals();
415 let write_signals = self.handle.write_signals();
416
417 loop {
418 if let Some(signal_waiter) = self.io_waiter.as_mut() {
419 if let Poll::Ready(sigs) = signal_waiter.poll_unpin(ctx) {
420 if let Ok(sigs) = sigs {
421 if sigs.intersects(read_signals) {
422 self.process_read_queue(event_queue, ctx);
423 }
424 if sigs.intersects(write_signals) {
425 self.process_write_queue(event_queue, ctx);
426 }
427 }
428 } else {
429 let need_read = matches!(
430 self.read_queue.front_mut(ctx),
431 Poll::Ready(ReadOp::StreamingChannel(_, _) | ReadOp::StreamingSocket(_, _))
432 );
433 let need_write = matches!(
434 self.write_queue.front_mut(ctx),
435 Poll::Ready(WriteOp::SetDisposition(_, _, _))
436 );
437
438 self.process_read_queue(event_queue, ctx);
439 self.process_write_queue(event_queue, ctx);
440
441 if !(need_read || need_write) {
442 break;
443 }
444 }
445 }
446
447 let subscribed_signals =
448 if self.async_read_in_progress || !self.read_queue.is_empty() {
449 read_signals
450 } else {
451 fidl::Signals::NONE
452 } | if !self.write_queue.is_empty() { write_signals } else { fidl::Signals::NONE };
453
454 if !subscribed_signals.is_empty() {
455 self.io_waiter = Some(OnSignals::new(
456 AnyHandleRef(Arc::clone(&self.handle)),
457 subscribed_signals,
458 ));
459 } else {
460 self.io_waiter = None;
461 break;
462 }
463 }
464 }
465
466 fn try_enable_async_read(&mut self) -> Result<()> {
468 if self.async_read_in_progress {
469 Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress))
470 } else {
471 self.async_read_in_progress = true;
472 Ok(())
473 }
474 }
475
476 fn try_disable_async_read(&mut self) -> Result<()> {
478 if !self.async_read_in_progress {
479 Err(proto::Error::NoReadInProgress(proto::NoReadInProgress))
480 } else {
481 self.async_read_in_progress = false;
482 Ok(())
483 }
484 }
485
486 fn process_read_queue(
488 &mut self,
489 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
490 ctx: &mut Context<'_>,
491 ) {
492 while let Poll::Ready(op) = self.read_queue.front_mut(ctx) {
493 match op {
494 ReadOp::StreamingChannel(tid, true) => {
495 let tid = *tid;
496 let result = self.try_enable_async_read();
497 event_queue
498 .push_back(FDomainEvent::ChannelStreamingReadStart(tid, result).into());
499 self.read_queue.destroy_front();
500 }
501 ReadOp::StreamingChannel(tid, false) => {
502 let tid = *tid;
503 let result = self.try_disable_async_read();
504 event_queue
505 .push_back(FDomainEvent::ChannelStreamingReadStop(tid, result).into());
506 self.read_queue.destroy_front();
507 }
508 ReadOp::StreamingSocket(tid, true) => {
509 let tid = *tid;
510 let result = self.try_enable_async_read();
511 event_queue
512 .push_back(FDomainEvent::SocketStreamingReadStart(tid, result).into());
513 self.read_queue.destroy_front();
514 }
515 ReadOp::StreamingSocket(tid, false) => {
516 let tid = *tid;
517 let result = self.try_disable_async_read();
518 event_queue
519 .push_back(FDomainEvent::SocketStreamingReadStop(tid, result).into());
520 self.read_queue.destroy_front();
521 }
522 ReadOp::Socket(tid, max_bytes) => {
523 let (tid, max_bytes) = (*tid, *max_bytes);
524 if let Some(event) = self.do_read_socket(tid, max_bytes) {
525 let _ = self.read_queue.pop_front(ctx);
526 event_queue.push_back(event.into());
527 } else {
528 break;
529 }
530 }
531 ReadOp::Channel(tid) => {
532 let tid = *tid;
533 if let Some(event) = self.do_read_channel(tid) {
534 let _ = self.read_queue.pop_front(ctx);
535 event_queue.push_back(event.into());
536 } else {
537 break;
538 }
539 }
540 }
541 }
542
543 if self.async_read_in_progress {
544 assert!(self.read_queue.is_empty());
547 self.process_async_read(event_queue);
548 }
549 }
550
551 fn process_async_read(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>) {
552 assert!(self.async_read_in_progress);
553
554 match &*self.handle {
555 AnyHandle::Channel(_) => {
556 'read_loop: while let Some(result) = self.handle.read_channel().transpose() {
557 match result {
558 Ok(msg) => event_queue.push_back(
559 UnprocessedFDomainEvent::ChannelStreamingData(self.hid, msg),
560 ),
561 Err(e) => {
562 event_queue.push_back(
563 FDomainEvent::ChannelStreamingData(
564 proto::ChannelOnChannelStreamingDataRequest {
565 handle: self.hid,
566 channel_sent: proto::ChannelSent::Stopped(
567 proto::AioStopped { error: Some(Box::new(e)) },
568 ),
569 },
570 )
571 .into(),
572 );
573 self.async_read_in_progress = false;
574 break 'read_loop;
575 }
576 }
577 }
578 }
579
580 AnyHandle::Socket(_) => {
581 'read_loop: while let Some(result) =
582 self.handle.read_socket(ASYNC_READ_BUFSIZE).transpose()
583 {
584 match result {
585 Ok(data) => {
586 event_queue.push_back(
587 FDomainEvent::SocketStreamingData(
588 proto::SocketOnSocketStreamingDataRequest {
589 handle: self.hid,
590 socket_message: proto::SocketMessage::Data(
591 proto::SocketData {
592 data,
593 is_datagram: self.is_datagram_socket,
594 },
595 ),
596 },
597 )
598 .into(),
599 );
600 }
601 Err(e) => {
602 event_queue.push_back(
603 FDomainEvent::SocketStreamingData(
604 proto::SocketOnSocketStreamingDataRequest {
605 handle: self.hid,
606 socket_message: proto::SocketMessage::Stopped(
607 proto::AioStopped { error: Some(Box::new(e)) },
608 ),
609 },
610 )
611 .into(),
612 );
613 self.async_read_in_progress = false;
614 break 'read_loop;
615 }
616 }
617 }
618 }
619
620 _ => unreachable!("Processed async read for unreadable handle type!"),
621 }
622 }
623
624 fn process_write_queue(
626 &mut self,
627 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
628 ctx: &mut Context<'_>,
629 ) {
630 while let Poll::Ready(op) = self.write_queue.pop_front(ctx) {
635 match op {
636 WriteOp::Socket(mut op) => {
637 if let Some(event) = self.do_write_socket(&mut op) {
638 event_queue.push_back(event.into());
639 } else {
640 self.write_queue.push_front_no_wake(WriteOp::Socket(op));
641 break;
642 }
643 }
644 WriteOp::SetDisposition(tid, disposition, disposition_peer) => {
645 let result = { self.handle.socket_disposition(disposition, disposition_peer) };
646 event_queue.push_back(FDomainEvent::SocketDispositionSet(tid, result).into())
647 }
648 WriteOp::Channel(tid, data, mut handles) => {
649 if self
650 .do_write_channel(tid, &data, &mut handles, event_queue, ctx)
651 .is_pending()
652 {
653 self.write_queue.push_front_no_wake(WriteOp::Channel(tid, data, handles));
654 break;
655 }
656 }
657 }
658 }
659 }
660
661 fn do_read_socket(&mut self, tid: NonZeroU32, max_bytes: u64) -> Option<FDomainEvent> {
665 if self.async_read_in_progress {
666 return Some(
667 FDomainEvent::SocketData(
668 tid,
669 Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
670 )
671 .into(),
672 );
673 }
674
675 let max_bytes = if self.is_datagram_socket {
676 let AnyHandle::Socket(s) = &*self.handle else {
677 unreachable!("Read socket from state that wasn't for a socket!");
678 };
679 match s.info() {
680 Ok(x) => x.rx_buf_available as u64,
681 Err(e) => {
687 return Some(FDomainEvent::SocketData(
688 tid,
689 Err(proto::Error::TargetError(e.into_raw())),
690 ));
691 }
692 }
693 } else {
694 max_bytes
695 };
696 self.handle.read_socket(max_bytes).transpose().map(|x| {
697 FDomainEvent::SocketData(
698 tid,
699 x.map(|data| proto::SocketData { data, is_datagram: self.is_datagram_socket }),
700 )
701 })
702 }
703
704 fn do_write_socket(&mut self, op: &mut SocketWrite) -> Option<FDomainEvent> {
708 match self.handle.write_socket(&op.to_write) {
709 Ok(wrote) => {
710 op.wrote += wrote;
711 op.to_write.drain(..wrote);
712
713 if op.to_write.is_empty() {
714 Some(FDomainEvent::WroteSocket(
715 op.tid,
716 Ok(proto::SocketWriteSocketResponse {
717 wrote: op.wrote.try_into().unwrap(),
718 }),
719 ))
720 } else {
721 None
722 }
723 }
724 Err(error) => Some(FDomainEvent::WroteSocket(
725 op.tid,
726 Err(proto::WriteSocketError { error, wrote: op.wrote.try_into().unwrap() }),
727 )),
728 }
729 }
730
731 fn do_write_channel(
735 &mut self,
736 tid: NonZeroU32,
737 data: &[u8],
738 handles: &mut HandlesToWrite,
739 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
740 ctx: &mut Context<'_>,
741 ) -> Poll<()> {
742 let Poll::Ready(handles) = handles.poll_ready(event_queue, ctx) else {
743 return Poll::Pending;
744 };
745
746 let ret = self.handle.write_channel(data, handles);
747 if let Some(ret) = ret {
748 event_queue.push_back(FDomainEvent::WroteChannel(tid, ret).into())
749 }
750 Poll::Ready(())
751 }
752
753 fn do_read_channel(&mut self, tid: NonZeroU32) -> Option<UnprocessedFDomainEvent> {
757 if self.async_read_in_progress {
758 return Some(
759 FDomainEvent::ChannelData(
760 tid,
761 Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
762 )
763 .into(),
764 );
765 }
766 match self.handle.read_channel() {
767 Ok(x) => x.map(|x| UnprocessedFDomainEvent::ChannelData(tid, x)),
768 Err(e) => Some(FDomainEvent::ChannelData(tid, Err(e)).into()),
769 }
770 }
771}
772
773struct ClosingHandle {
776 action: Arc<CloseAction>,
777 state: Option<ShuttingDownHandle>,
778}
779
780impl ClosingHandle {
781 fn poll_ready(&mut self, fdomain: &mut FDomain, ctx: &mut Context<'_>) -> Poll<()> {
782 if let Some(state) = self.state.as_mut() {
783 if state.poll_ready(&mut fdomain.event_queue, ctx).is_ready() {
784 let state = self.state.take().unwrap();
785 let ShuttingDownHandle::Ready(handle) = state else {
786 unreachable!();
787 };
788 self.action.perform(fdomain, handle);
789 Poll::Ready(())
790 } else {
791 Poll::Pending
792 }
793 } else {
794 Poll::Ready(())
795 }
796 }
797}
798
799enum CloseAction {
804 Close { tid: NonZeroU32, count: AtomicU32, result: Result<()> },
805 Replace { tid: NonZeroU32, new_hid: proto::NewHandleId, rights: fidl::Rights },
806}
807
808impl CloseAction {
809 fn perform(&self, fdomain: &mut FDomain, handle: AnyHandle) {
810 match self {
811 CloseAction::Close { tid, count, result } => {
812 if count.fetch_sub(1, Ordering::Relaxed) == 1 {
813 fdomain
814 .event_queue
815 .push_back(FDomainEvent::ClosedHandle(*tid, result.clone()).into());
816 }
817 }
818 CloseAction::Replace { tid, new_hid, rights } => {
819 let result = handle
820 .replace(*rights)
821 .and_then(|handle| fdomain.alloc_client_handles([*new_hid], [handle]));
822 fdomain.event_queue.push_back(FDomainEvent::ReplacedHandle(*tid, result).into());
823 }
824 }
825 }
826}
827
828#[pin_project::pin_project]
833pub struct FDomain {
834 namespace: Box<dyn Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send>,
835 handles: HashMap<proto::HandleId, HandleState>,
836 closing_handles: Vec<ClosingHandle>,
837 event_queue: VecDeque<UnprocessedFDomainEvent>,
838 waker: Option<Waker>,
839}
840
841impl FDomain {
842 pub fn new_empty() -> Self {
845 Self::new(|| Err(fidl::Status::NOT_FOUND))
846 }
847
848 pub fn new(
850 namespace: impl Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send + 'static,
851 ) -> Self {
852 FDomain {
853 namespace: Box::new(namespace),
854 handles: HashMap::new(),
855 closing_handles: Vec::new(),
856 event_queue: VecDeque::new(),
857 waker: None,
858 }
859 }
860
861 fn push_event(&mut self, event: impl Into<UnprocessedFDomainEvent>) {
863 self.event_queue.push_back(event.into());
864 self.waker.take().map(Waker::wake);
865 }
866
867 fn process_message(
871 &mut self,
872 message: fidl::MessageBufEtc,
873 ) -> Result<proto::ChannelMessage, proto::Error> {
874 let (data, handles) = message.split();
875 let handles = handles
876 .into_iter()
877 .map(|info| {
878 let type_ = info.object_type;
879
880 let handle = match info.object_type {
881 fidl::ObjectType::CHANNEL => {
882 AnyHandle::Channel(fidl::Channel::from(info.handle))
883 }
884 fidl::ObjectType::SOCKET => AnyHandle::Socket(fidl::Socket::from(info.handle)),
885 fidl::ObjectType::EVENTPAIR => {
886 AnyHandle::EventPair(fidl::EventPair::from(info.handle))
887 }
888 fidl::ObjectType::EVENT => AnyHandle::Event(fidl::Event::from(info.handle)),
889 _ => AnyHandle::Unknown(handles::Unknown(info.handle, info.object_type)),
890 };
891
892 Ok(proto::HandleInfo {
893 rights: info.rights,
894 handle: self.alloc_fdomain_handle(handle)?,
895 type_,
896 })
897 })
898 .collect::<Result<Vec<_>, proto::Error>>()?;
899
900 Ok(proto::ChannelMessage { data, handles })
901 }
902
903 fn alloc_client_handles<const N: usize>(
910 &mut self,
911 ids: [proto::NewHandleId; N],
912 handles: [AnyHandle; N],
913 ) -> Result<(), proto::Error> {
914 for id in ids {
915 if id.id & (1 << 31) != 0 {
916 return Err(proto::Error::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange {
917 id: id.id,
918 }));
919 }
920
921 if self.handles.contains_key(&proto::HandleId { id: id.id }) {
922 return Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
923 id: id.id,
924 same_call: false,
925 }));
926 }
927 }
928
929 let mut sorted_ids = ids;
930 sorted_ids.sort();
931
932 if let Some(a) = sorted_ids.windows(2).find(|x| x[0] == x[1]) {
933 Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
934 id: a[0].id,
935 same_call: true,
936 }))
937 } else {
938 let ids = ids.into_iter().map(|id| proto::HandleId { id: id.id });
939 let handles = ids
940 .zip(handles.into_iter())
941 .map(|(id, h)| HandleState::new(h, id).map(|x| (id, x)))
942 .collect::<Result<Vec<_>, proto::Error>>()?;
943
944 self.handles.extend(handles);
945
946 Ok(())
947 }
948 }
949
950 fn alloc_fdomain_handle(&mut self, handle: AnyHandle) -> Result<proto::HandleId, proto::Error> {
953 loop {
954 let id = proto::HandleId { id: rand::random::<u32>() | (1u32 << 31) };
955 if let Entry::Vacant(v) = self.handles.entry(id) {
956 v.insert(HandleState::new(handle, id)?);
957 break Ok(id);
958 }
959 }
960 }
961
962 fn take_handle(&mut self, handle: proto::HandleId) -> Result<HandleState, proto::Error> {
964 self.handles
965 .remove(&handle)
966 .ok_or(proto::Error::BadHandleId(proto::BadHandleId { id: handle.id }))
967 }
968
969 fn using_handle<T>(
971 &mut self,
972 id: proto::HandleId,
973 f: impl FnOnce(&mut HandleState) -> Result<T, proto::Error>,
974 ) -> Result<T, proto::Error> {
975 if let Some(s) = self.handles.get_mut(&id) {
976 f(s)
977 } else {
978 Err(proto::Error::BadHandleId(proto::BadHandleId { id: id.id }))
979 }
980 }
981
982 pub fn get_namespace(&mut self, request: proto::FDomainGetNamespaceRequest) -> Result<()> {
983 match (self.namespace)() {
984 Ok(endpoint) => self.alloc_client_handles(
985 [request.new_handle],
986 [AnyHandle::Channel(endpoint.into_channel())],
987 ),
988 Err(e) => Err(proto::Error::TargetError(e.into_raw())),
989 }
990 }
991
992 pub fn create_channel(&mut self, request: proto::ChannelCreateChannelRequest) -> Result<()> {
993 let (a, b) = fidl::Channel::create();
994 self.alloc_client_handles(request.handles, [AnyHandle::Channel(a), AnyHandle::Channel(b)])
995 }
996
997 pub fn create_socket(&mut self, request: proto::SocketCreateSocketRequest) -> Result<()> {
998 let (a, b) = match request.options {
999 proto::SocketType::Stream => fidl::Socket::create_stream(),
1000 proto::SocketType::Datagram => fidl::Socket::create_datagram(),
1001 type_ => {
1002 return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown { type_ }));
1003 }
1004 };
1005
1006 self.alloc_client_handles(request.handles, [AnyHandle::Socket(a), AnyHandle::Socket(b)])
1007 }
1008
1009 pub fn create_event_pair(
1010 &mut self,
1011 request: proto::EventPairCreateEventPairRequest,
1012 ) -> Result<()> {
1013 let (a, b) = fidl::EventPair::create();
1014 self.alloc_client_handles(
1015 request.handles,
1016 [AnyHandle::EventPair(a), AnyHandle::EventPair(b)],
1017 )
1018 }
1019
1020 pub fn create_event(&mut self, request: proto::EventCreateEventRequest) -> Result<()> {
1021 let a = fidl::Event::create();
1022 self.alloc_client_handles([request.handle], [AnyHandle::Event(a)])
1023 }
1024
1025 pub fn set_socket_disposition(
1026 &mut self,
1027 tid: NonZeroU32,
1028 request: proto::SocketSetSocketDispositionRequest,
1029 ) {
1030 if let Err(err) = self.using_handle(request.handle, |h| {
1031 h.write_queue.push_back(WriteOp::SetDisposition(
1032 tid,
1033 request.disposition,
1034 request.disposition_peer,
1035 ));
1036 Ok(())
1037 }) {
1038 self.push_event(FDomainEvent::SocketDispositionSet(tid, Err(err)));
1039 }
1040 }
1041
1042 pub fn read_socket(&mut self, tid: NonZeroU32, request: proto::SocketReadSocketRequest) {
1043 if let Err(e) = self.using_handle(request.handle, |h| {
1044 h.read_queue.push_back(ReadOp::Socket(tid, request.max_bytes));
1045 Ok(())
1046 }) {
1047 self.push_event(FDomainEvent::SocketData(tid, Err(e)));
1048 }
1049 }
1050
1051 pub fn read_channel(&mut self, tid: NonZeroU32, request: proto::ChannelReadChannelRequest) {
1052 if let Err(e) = self.using_handle(request.handle, |h| {
1053 h.read_queue.push_back(ReadOp::Channel(tid));
1054 Ok(())
1055 }) {
1056 self.push_event(FDomainEvent::ChannelData(tid, Err(e)));
1057 }
1058 }
1059
1060 pub fn write_socket(&mut self, tid: NonZeroU32, request: proto::SocketWriteSocketRequest) {
1061 if let Err(error) = self.using_handle(request.handle, |h| {
1062 h.write_queue.push_back(WriteOp::Socket(SocketWrite {
1063 tid,
1064 wrote: 0,
1065 to_write: request.data,
1066 }));
1067 Ok(())
1068 }) {
1069 self.push_event(FDomainEvent::WroteSocket(
1070 tid,
1071 Err(proto::WriteSocketError { error, wrote: 0 }),
1072 ));
1073 }
1074 }
1075
1076 pub fn write_channel(&mut self, tid: NonZeroU32, request: proto::ChannelWriteChannelRequest) {
1077 let handles: Vec<Result<(ShuttingDownHandle, fidl::Rights)>> = match request.handles {
1085 proto::Handles::Handles(h) => h
1086 .into_iter()
1087 .map(|h| {
1088 if h != request.handle {
1089 self.take_handle(h).map(|handle_state| {
1090 (ShuttingDownHandle::InUse(h, handle_state), fidl::Rights::SAME_RIGHTS)
1091 })
1092 } else {
1093 Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1094 }
1095 })
1096 .collect(),
1097 proto::Handles::Dispositions(d) => d
1098 .into_iter()
1099 .map(|d| {
1100 let res = match d.handle {
1101 proto::HandleOp::Move_(h) => {
1102 if h != request.handle {
1103 self.take_handle(h).map(|x| ShuttingDownHandle::InUse(h, x))
1104 } else {
1105 Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1106 }
1107 }
1108 proto::HandleOp::Duplicate(h) => {
1109 if h != request.handle {
1110 self.using_handle(h, |h| {
1116 h.handle.duplicate(fidl::Rights::SAME_RIGHTS)
1117 })
1118 .map(ShuttingDownHandle::Ready)
1119 } else {
1120 Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1121 }
1122 }
1123 };
1124
1125 res.and_then(|x| Ok((x, d.rights)))
1126 })
1127 .collect(),
1128 };
1129
1130 if handles.iter().any(|x| x.is_err()) {
1131 let e = handles.into_iter().map(|x| x.err().map(Box::new)).collect();
1132
1133 self.push_event(FDomainEvent::WroteChannel(
1134 tid,
1135 Err(proto::WriteChannelError::OpErrors(e)),
1136 ));
1137 return;
1138 }
1139
1140 let handles = handles.into_iter().map(|x| x.unwrap()).collect::<Vec<_>>();
1141
1142 if let Err(e) = self.using_handle(request.handle, |h| {
1143 h.write_queue.push_back(WriteOp::Channel(
1144 tid,
1145 request.data,
1146 HandlesToWrite::SomeInUse(handles),
1147 ));
1148 Ok(())
1149 }) {
1150 self.push_event(FDomainEvent::WroteChannel(
1151 tid,
1152 Err(proto::WriteChannelError::Error(e)),
1153 ));
1154 }
1155 }
1156
1157 pub fn wait_for_signals(
1158 &mut self,
1159 tid: NonZeroU32,
1160 request: proto::FDomainWaitForSignalsRequest,
1161 ) {
1162 let result = self.using_handle(request.handle, |h| {
1163 let signals = fidl::Signals::from_bits_retain(request.signals);
1164 h.signal_waiters.push(SignalWaiter {
1165 tid,
1166 waiter: OnSignals::new(AnyHandleRef(Arc::clone(&h.handle)), signals),
1167 });
1168 Ok(())
1169 });
1170
1171 if let Err(e) = result {
1172 self.push_event(FDomainEvent::WaitForSignals(tid, Err(e)));
1173 } else {
1174 self.waker.take().map(Waker::wake);
1175 }
1176 }
1177
1178 pub fn close(&mut self, tid: NonZeroU32, request: proto::FDomainCloseRequest) {
1179 let mut states = Vec::with_capacity(request.handles.len());
1180 let mut result = Ok(());
1181 for hid in request.handles {
1182 match self.take_handle(hid) {
1183 Ok(state) => states.push((hid, state)),
1184
1185 Err(e) => {
1186 result = result.and(Err(e));
1187 }
1188 }
1189 }
1190
1191 let action = Arc::new(CloseAction::Close {
1192 tid,
1193 count: AtomicU32::new(states.len().try_into().unwrap()),
1194 result,
1195 });
1196
1197 for (hid, state) in states {
1198 self.closing_handles.push(ClosingHandle {
1199 action: Arc::clone(&action),
1200 state: Some(ShuttingDownHandle::InUse(hid, state)),
1201 });
1202 }
1203 }
1204
1205 pub fn duplicate(&mut self, request: proto::FDomainDuplicateRequest) -> Result<()> {
1206 let rights = request.rights;
1207 let handle = self.using_handle(request.handle, |h| h.handle.duplicate(rights));
1208 handle.and_then(|h| self.alloc_client_handles([request.new_handle], [h]))
1209 }
1210
1211 pub fn replace(
1212 &mut self,
1213 tid: NonZeroU32,
1214 request: proto::FDomainReplaceRequest,
1215 ) -> Result<()> {
1216 let rights = request.rights;
1217 let new_hid = request.new_handle;
1218 match self.take_handle(request.handle) {
1219 Ok(state) => self.closing_handles.push(ClosingHandle {
1220 action: Arc::new(CloseAction::Replace { tid, new_hid, rights }),
1221 state: Some(ShuttingDownHandle::InUse(request.handle, state)),
1222 }),
1223 Err(e) => self.event_queue.push_back(UnprocessedFDomainEvent::Ready(
1224 FDomainEvent::ReplacedHandle(tid, Err(e)),
1225 )),
1226 }
1227
1228 Ok(())
1229 }
1230
1231 pub fn signal(&mut self, request: proto::FDomainSignalRequest) -> Result<()> {
1232 let set = fidl::Signals::from_bits_retain(request.set);
1233 let clear = fidl::Signals::from_bits_retain(request.clear);
1234
1235 self.using_handle(request.handle, |h| h.handle.signal(clear, set))
1236 }
1237
1238 pub fn signal_peer(&mut self, request: proto::FDomainSignalPeerRequest) -> Result<()> {
1239 let set = fidl::Signals::from_bits_retain(request.set);
1240 let clear = fidl::Signals::from_bits_retain(request.clear);
1241
1242 self.using_handle(request.handle, |h| h.handle.signal_peer(clear, set))
1243 }
1244
1245 pub fn get_koid(
1246 &mut self,
1247 request: proto::FDomainGetKoidRequest,
1248 ) -> Result<proto::FDomainGetKoidResponse> {
1249 self.using_handle(request.handle, |h| {
1250 h.handle
1251 .as_handle_ref()
1252 .koid()
1253 .map(|k| proto::FDomainGetKoidResponse { koid: k.raw_koid() })
1254 .map_err(|e| proto::Error::TargetError(e.into_raw()))
1255 })
1256 }
1257
1258 pub fn read_channel_streaming_start(
1259 &mut self,
1260 tid: NonZeroU32,
1261 request: proto::ChannelReadChannelStreamingStartRequest,
1262 ) {
1263 if let Err(err) = self.using_handle(request.handle, |h| {
1264 h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1265 h.read_queue.push_back(ReadOp::StreamingChannel(tid, true));
1266 Ok(())
1267 }) {
1268 self.event_queue
1269 .push_back(FDomainEvent::ChannelStreamingReadStart(tid, Err(err)).into())
1270 }
1271 }
1272
1273 pub fn read_channel_streaming_stop(
1274 &mut self,
1275 tid: NonZeroU32,
1276 request: proto::ChannelReadChannelStreamingStopRequest,
1277 ) {
1278 if let Err(err) = self.using_handle(request.handle, |h| {
1279 h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1280 h.read_queue.push_back(ReadOp::StreamingChannel(tid, false));
1281 Ok(())
1282 }) {
1283 self.event_queue.push_back(FDomainEvent::ChannelStreamingReadStop(tid, Err(err)).into())
1284 }
1285 }
1286
1287 pub fn read_socket_streaming_start(
1288 &mut self,
1289 tid: NonZeroU32,
1290 request: proto::SocketReadSocketStreamingStartRequest,
1291 ) {
1292 if let Err(err) = self.using_handle(request.handle, |h| {
1293 h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1294 h.read_queue.push_back(ReadOp::StreamingSocket(tid, true));
1295 Ok(())
1296 }) {
1297 self.event_queue.push_back(FDomainEvent::SocketStreamingReadStart(tid, Err(err)).into())
1298 }
1299 }
1300
1301 pub fn read_socket_streaming_stop(
1302 &mut self,
1303 tid: NonZeroU32,
1304 request: proto::SocketReadSocketStreamingStopRequest,
1305 ) {
1306 if let Err(err) = self.using_handle(request.handle, |h| {
1307 h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1308 h.read_queue.push_back(ReadOp::StreamingSocket(tid, false));
1309 Ok(())
1310 }) {
1311 self.event_queue.push_back(FDomainEvent::SocketStreamingReadStop(tid, Err(err)).into())
1312 }
1313 }
1314}
1315
1316impl futures::Stream for FDomain {
1319 type Item = FDomainEvent;
1320
1321 fn poll_next(
1322 mut self: std::pin::Pin<&mut Self>,
1323 ctx: &mut Context<'_>,
1324 ) -> Poll<Option<Self::Item>> {
1325 let this = &mut *self;
1326
1327 let mut closing_handles = std::mem::replace(&mut this.closing_handles, Vec::new());
1328 closing_handles.retain_mut(|x| x.poll_ready(this, ctx).is_pending());
1329 this.closing_handles = closing_handles;
1330
1331 let handles = &mut this.handles;
1332 let event_queue = &mut this.event_queue;
1333 for state in handles.values_mut() {
1334 state.poll(event_queue, ctx);
1335 }
1336
1337 if let Some(event) = self.event_queue.pop_front() {
1338 match event {
1339 UnprocessedFDomainEvent::Ready(event) => Poll::Ready(Some(event)),
1340 UnprocessedFDomainEvent::ChannelData(tid, message) => {
1341 Poll::Ready(Some(FDomainEvent::ChannelData(tid, self.process_message(message))))
1342 }
1343 UnprocessedFDomainEvent::ChannelStreamingData(hid, message) => {
1344 match self.process_message(message) {
1345 Ok(message) => Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1346 proto::ChannelOnChannelStreamingDataRequest {
1347 handle: hid,
1348 channel_sent: proto::ChannelSent::Message(message),
1349 },
1350 ))),
1351 Err(e) => {
1352 self.handles.get_mut(&hid).unwrap().async_read_in_progress = false;
1353 Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1354 proto::ChannelOnChannelStreamingDataRequest {
1355 handle: hid,
1356 channel_sent: proto::ChannelSent::Stopped(proto::AioStopped {
1357 error: Some(Box::new(e)),
1358 }),
1359 },
1360 )))
1361 }
1362 }
1363 }
1364 }
1365 } else {
1366 self.waker = Some(ctx.waker().clone());
1367 Poll::Pending
1368 }
1369 }
1370}