1use fidl::AsHandleRef;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_fdomain as proto;
8use fidl_fuchsia_io as fio;
9use fuchsia_async as fasync;
10use futures::prelude::*;
11use replace_with::replace_with;
12use std::collections::hash_map::Entry;
13use std::collections::{HashMap, VecDeque};
14use std::num::NonZeroU32;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU32, Ordering};
17use std::task::{Context, Poll, Waker};
18
19mod handles;
20pub mod wire;
21
22#[cfg(test)]
23mod test;
24
25pub type Result<T, E = proto::Error> = std::result::Result<T, E>;
26
27use handles::{AnyHandle, HandleType as _, IsDatagramSocket};
28
29struct Queue<T>(VecDeque<T>, Option<Waker>);
32
33impl<T> Queue<T> {
34 fn new() -> Self {
36 Queue(VecDeque::new(), None)
37 }
38
39 fn is_empty(&self) -> bool {
41 self.0.is_empty()
42 }
43
44 fn destroy_front(&mut self) {
49 assert!(self.0.pop_front().is_some(), "Expected to find a value!");
50 }
51
52 fn pop_front(&mut self, ctx: &mut Context<'_>) -> Poll<T> {
54 if let Some(t) = self.0.pop_front() {
55 Poll::Ready(t)
56 } else {
57 self.1 = Some(ctx.waker().clone());
58 Poll::Pending
59 }
60 }
61
62 fn push_front_no_wake(&mut self, t: T) {
69 self.0.push_front(t)
70 }
71
72 fn push_back(&mut self, t: T) {
74 self.0.push_back(t);
75 self.1.take().map(Waker::wake);
76 }
77
78 fn front_mut(&mut self, ctx: &mut Context<'_>) -> Poll<&mut T> {
80 if let Some(t) = self.0.front_mut() {
81 Poll::Ready(t)
82 } else {
83 self.1 = Some(ctx.waker().clone());
84 Poll::Pending
85 }
86 }
87}
88
89const ASYNC_READ_BUFSIZE: u64 = 40960;
91
92#[derive(Debug)]
94pub enum FDomainEvent {
95 ChannelStreamingReadStart(NonZeroU32, Result<()>),
96 ChannelStreamingReadStop(NonZeroU32, Result<()>),
97 SocketStreamingReadStart(NonZeroU32, Result<()>),
98 SocketStreamingReadStop(NonZeroU32, Result<()>),
99 WaitForSignals(NonZeroU32, Result<proto::FDomainWaitForSignalsResponse>),
100 SocketData(NonZeroU32, Result<proto::SocketData>),
101 SocketStreamingData(proto::SocketOnSocketStreamingDataRequest),
102 SocketDispositionSet(NonZeroU32, Result<()>),
103 WroteSocket(NonZeroU32, Result<proto::SocketWriteSocketResponse, proto::WriteSocketError>),
104 ChannelData(NonZeroU32, Result<proto::ChannelMessage>),
105 ChannelStreamingData(proto::ChannelOnChannelStreamingDataRequest),
106 WroteChannel(NonZeroU32, Result<(), proto::WriteChannelError>),
107 ClosedHandle(NonZeroU32, Result<()>),
108 ReplacedHandle(NonZeroU32, Result<()>),
109}
110
111enum UnprocessedFDomainEvent {
115 Ready(FDomainEvent),
116 ChannelData(NonZeroU32, fidl::MessageBufEtc),
117 ChannelStreamingData(proto::HandleId, fidl::MessageBufEtc),
118}
119
120impl From<FDomainEvent> for UnprocessedFDomainEvent {
121 fn from(other: FDomainEvent) -> UnprocessedFDomainEvent {
122 UnprocessedFDomainEvent::Ready(other)
123 }
124}
125
126enum ReadOp {
128 StreamingChannel(NonZeroU32, bool),
130 StreamingSocket(NonZeroU32, bool),
132 Socket(NonZeroU32, u64),
133 Channel(NonZeroU32),
134}
135
136struct SocketWrite {
140 tid: NonZeroU32,
141 wrote: usize,
142 to_write: Vec<u8>,
143}
144
145enum WriteOp {
147 Socket(SocketWrite),
148 Channel(NonZeroU32, Vec<u8>, HandlesToWrite),
149 SetDisposition(NonZeroU32, proto::SocketDisposition, proto::SocketDisposition),
150}
151
152enum ShuttingDownHandle {
157 InUse(proto::HandleId, HandleState),
158 Ready(AnyHandle),
159}
160
161impl ShuttingDownHandle {
162 fn poll_ready(
163 &mut self,
164 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
165 ctx: &mut Context<'_>,
166 ) -> Poll<()> {
167 replace_with(self, |this| match this {
168 this @ ShuttingDownHandle::Ready(_) => this,
169 ShuttingDownHandle::InUse(hid, mut state) => {
170 state.poll(event_queue, ctx);
171
172 if state.write_queue.is_empty() {
173 while let Poll::Ready(op) = state.read_queue.pop_front(ctx) {
174 match op {
175 ReadOp::StreamingChannel(tid, start) => {
176 let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
177 id: hid.id,
178 }));
179 let event = if start {
180 FDomainEvent::ChannelStreamingReadStart(tid, err)
181 } else {
182 FDomainEvent::ChannelStreamingReadStop(tid, err)
183 };
184 event_queue.push_back(event.into());
185 }
186 ReadOp::StreamingSocket(tid, start) => {
187 let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
188 id: hid.id,
189 }));
190 let event = if start {
191 FDomainEvent::SocketStreamingReadStart(tid, err)
192 } else {
193 FDomainEvent::SocketStreamingReadStop(tid, err)
194 };
195 event_queue.push_back(event.into());
196 }
197 ReadOp::Channel(tid) => {
198 let err = state
199 .handle
200 .expected_type(fidl::ObjectType::CHANNEL)
201 .err()
202 .unwrap_or(proto::Error::ClosedDuringRead(
203 proto::ClosedDuringRead,
204 ));
205 event_queue
206 .push_back(FDomainEvent::ChannelData(tid, Err(err)).into());
207 }
208 ReadOp::Socket(tid, _max_bytes) => {
209 let err = state
210 .handle
211 .expected_type(fidl::ObjectType::SOCKET)
212 .err()
213 .unwrap_or(proto::Error::ClosedDuringRead(
214 proto::ClosedDuringRead,
215 ));
216 event_queue
217 .push_back(FDomainEvent::SocketData(tid, Err(err)).into());
218 }
219 }
220 }
221
222 if state.async_read_in_progress {
223 match &*state.handle {
224 AnyHandle::Channel(_) => event_queue.push_back(
225 FDomainEvent::ChannelStreamingData(
226 proto::ChannelOnChannelStreamingDataRequest {
227 handle: hid,
228 channel_sent: proto::ChannelSent::Stopped(
229 proto::AioStopped { error: None },
230 ),
231 },
232 )
233 .into(),
234 ),
235 AnyHandle::Socket(_) => event_queue.push_back(
236 FDomainEvent::SocketStreamingData(
237 proto::SocketOnSocketStreamingDataRequest {
238 handle: hid,
239 socket_message: proto::SocketMessage::Stopped(
240 proto::AioStopped { error: None },
241 ),
242 },
243 )
244 .into(),
245 ),
246 AnyHandle::EventPair(_)
247 | AnyHandle::Event(_)
248 | AnyHandle::Unknown(_) => unreachable!(),
249 }
250 }
251
252 state.signal_waiters.clear();
253 state.io_waiter = None;
254
255 ShuttingDownHandle::Ready(
256 Arc::into_inner(state.handle).expect("Unaccounted-for handle reference!"),
257 )
258 } else {
259 ShuttingDownHandle::InUse(hid, state)
260 }
261 }
262 });
263
264 if matches!(self, ShuttingDownHandle::Ready(_)) { Poll::Ready(()) } else { Poll::Pending }
265 }
266}
267
268enum HandlesToWrite {
272 SomeInUse(Vec<(ShuttingDownHandle, fidl::Rights)>),
273 AllReady(Vec<fidl::HandleDisposition<'static>>),
274}
275
276impl HandlesToWrite {
277 fn poll_ready(
278 &mut self,
279 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
280 ctx: &mut Context<'_>,
281 ) -> Poll<&mut Vec<fidl::HandleDisposition<'static>>> {
282 match self {
283 HandlesToWrite::AllReady(s) => Poll::Ready(s),
284 HandlesToWrite::SomeInUse(handles) => {
285 let mut ready = true;
286 for (handle, _) in handles.iter_mut() {
287 ready = ready && handle.poll_ready(event_queue, ctx).is_ready();
288 }
289
290 if !ready {
291 return Poll::Pending;
292 }
293
294 *self = HandlesToWrite::AllReady(
295 handles
296 .drain(..)
297 .map(|(handle, rights)| {
298 let ShuttingDownHandle::Ready(handle) = handle else { unreachable!() };
299
300 fidl::HandleDisposition::new(
301 fidl::HandleOp::Move(handle.into()),
302 fidl::ObjectType::NONE,
303 rights,
304 fidl::Status::OK,
305 )
306 })
307 .collect(),
308 );
309
310 let HandlesToWrite::AllReady(s) = self else { unreachable!() };
311 Poll::Ready(s)
312 }
313 }
314 }
315}
316
317struct AnyHandleRef(Arc<AnyHandle>);
318
319impl AsHandleRef for AnyHandleRef {
320 fn as_handle_ref(&self) -> fidl::HandleRef<'_> {
321 self.0.as_handle_ref()
322 }
323}
324
325#[cfg(target_os = "fuchsia")]
326type OnSignals = fasync::OnSignals<'static, AnyHandleRef>;
327
328#[cfg(not(target_os = "fuchsia"))]
329type OnSignals = fasync::OnSignalsRef<'static>;
330
331struct SignalWaiter {
334 tid: NonZeroU32,
335 waiter: OnSignals,
336}
337
338struct HandleState {
340 handle: Arc<AnyHandle>,
342 hid: proto::HandleId,
344 is_datagram_socket: bool,
348 async_read_in_progress: bool,
353 read_queue: Queue<ReadOp>,
359 write_queue: Queue<WriteOp>,
366 signal_waiters: Vec<SignalWaiter>,
368 io_waiter: Option<OnSignals>,
371}
372
373impl HandleState {
374 fn new(handle: AnyHandle, hid: proto::HandleId) -> Result<Self, proto::Error> {
375 let is_datagram_socket = match handle.is_datagram_socket() {
376 IsDatagramSocket::Unknown => {
377 return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown {
378 type_: proto::SocketType::unknown(),
379 }));
380 }
381 other => other.is_datagram(),
382 };
383 Ok(HandleState {
384 handle: Arc::new(handle),
385 hid,
386 async_read_in_progress: false,
387 is_datagram_socket,
388 read_queue: Queue::new(),
389 write_queue: Queue::new(),
390 signal_waiters: Vec::new(),
391 io_waiter: None,
392 })
393 }
394
395 fn poll(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>, ctx: &mut Context<'_>) {
398 self.signal_waiters.retain_mut(|x| {
399 let Poll::Ready(result) = x.waiter.poll_unpin(ctx) else {
400 return true;
401 };
402
403 event_queue.push_back(
404 FDomainEvent::WaitForSignals(
405 x.tid,
406 result
407 .map(|x| proto::FDomainWaitForSignalsResponse { signals: x.bits() })
408 .map_err(|e| proto::Error::TargetError(e.into_raw())),
409 )
410 .into(),
411 );
412
413 false
414 });
415
416 let read_signals = self.handle.read_signals();
417 let write_signals = self.handle.write_signals();
418
419 loop {
420 if let Some(signal_waiter) = self.io_waiter.as_mut() {
421 if let Poll::Ready(sigs) = signal_waiter.poll_unpin(ctx) {
422 if let Ok(sigs) = sigs {
423 if sigs.intersects(read_signals) {
424 self.process_read_queue(event_queue, ctx);
425 }
426 if sigs.intersects(write_signals) {
427 self.process_write_queue(event_queue, ctx);
428 }
429 }
430 } else {
431 let need_read = matches!(
432 self.read_queue.front_mut(ctx),
433 Poll::Ready(ReadOp::StreamingChannel(_, _) | ReadOp::StreamingSocket(_, _))
434 );
435 let need_write = matches!(
436 self.write_queue.front_mut(ctx),
437 Poll::Ready(WriteOp::SetDisposition(_, _, _))
438 );
439
440 self.process_read_queue(event_queue, ctx);
441 self.process_write_queue(event_queue, ctx);
442
443 if !(need_read || need_write) {
444 break;
445 }
446 }
447 }
448
449 let subscribed_signals =
450 if self.async_read_in_progress || !self.read_queue.is_empty() {
451 read_signals
452 } else {
453 fidl::Signals::NONE
454 } | if !self.write_queue.is_empty() { write_signals } else { fidl::Signals::NONE };
455
456 if !subscribed_signals.is_empty() {
457 self.io_waiter = Some(OnSignals::new(
458 AnyHandleRef(Arc::clone(&self.handle)),
459 subscribed_signals,
460 ));
461 } else {
462 self.io_waiter = None;
463 break;
464 }
465 }
466 }
467
468 fn try_enable_async_read(&mut self) -> Result<()> {
470 if self.async_read_in_progress {
471 Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress))
472 } else {
473 self.async_read_in_progress = true;
474 Ok(())
475 }
476 }
477
478 fn try_disable_async_read(&mut self) -> Result<()> {
480 if !self.async_read_in_progress {
481 Err(proto::Error::NoReadInProgress(proto::NoReadInProgress))
482 } else {
483 self.async_read_in_progress = false;
484 Ok(())
485 }
486 }
487
488 fn process_read_queue(
490 &mut self,
491 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
492 ctx: &mut Context<'_>,
493 ) {
494 while let Poll::Ready(op) = self.read_queue.front_mut(ctx) {
495 match op {
496 ReadOp::StreamingChannel(tid, true) => {
497 let tid = *tid;
498 let result = self.try_enable_async_read();
499 event_queue
500 .push_back(FDomainEvent::ChannelStreamingReadStart(tid, result).into());
501 self.read_queue.destroy_front();
502 }
503 ReadOp::StreamingChannel(tid, false) => {
504 let tid = *tid;
505 let result = self.try_disable_async_read();
506 event_queue
507 .push_back(FDomainEvent::ChannelStreamingReadStop(tid, result).into());
508 self.read_queue.destroy_front();
509 }
510 ReadOp::StreamingSocket(tid, true) => {
511 let tid = *tid;
512 let result = self.try_enable_async_read();
513 event_queue
514 .push_back(FDomainEvent::SocketStreamingReadStart(tid, result).into());
515 self.read_queue.destroy_front();
516 }
517 ReadOp::StreamingSocket(tid, false) => {
518 let tid = *tid;
519 let result = self.try_disable_async_read();
520 event_queue
521 .push_back(FDomainEvent::SocketStreamingReadStop(tid, result).into());
522 self.read_queue.destroy_front();
523 }
524 ReadOp::Socket(tid, max_bytes) => {
525 let (tid, max_bytes) = (*tid, *max_bytes);
526 if let Some(event) = self.do_read_socket(tid, max_bytes) {
527 let _ = self.read_queue.pop_front(ctx);
528 event_queue.push_back(event.into());
529 } else {
530 break;
531 }
532 }
533 ReadOp::Channel(tid) => {
534 let tid = *tid;
535 if let Some(event) = self.do_read_channel(tid) {
536 let _ = self.read_queue.pop_front(ctx);
537 event_queue.push_back(event.into());
538 } else {
539 break;
540 }
541 }
542 }
543 }
544
545 if self.async_read_in_progress {
546 assert!(self.read_queue.is_empty());
549 self.process_async_read(event_queue);
550 }
551 }
552
553 fn process_async_read(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>) {
554 assert!(self.async_read_in_progress);
555
556 match &*self.handle {
557 AnyHandle::Channel(_) => {
558 'read_loop: while let Some(result) = self.handle.read_channel().transpose() {
559 match result {
560 Ok(msg) => event_queue.push_back(
561 UnprocessedFDomainEvent::ChannelStreamingData(self.hid, msg),
562 ),
563 Err(e) => {
564 event_queue.push_back(
565 FDomainEvent::ChannelStreamingData(
566 proto::ChannelOnChannelStreamingDataRequest {
567 handle: self.hid,
568 channel_sent: proto::ChannelSent::Stopped(
569 proto::AioStopped { error: Some(Box::new(e)) },
570 ),
571 },
572 )
573 .into(),
574 );
575 self.async_read_in_progress = false;
576 break 'read_loop;
577 }
578 }
579 }
580 }
581
582 AnyHandle::Socket(_) => {
583 'read_loop: while let Some(result) =
584 self.handle.read_socket(ASYNC_READ_BUFSIZE).transpose()
585 {
586 match result {
587 Ok(data) => {
588 event_queue.push_back(
589 FDomainEvent::SocketStreamingData(
590 proto::SocketOnSocketStreamingDataRequest {
591 handle: self.hid,
592 socket_message: proto::SocketMessage::Data(
593 proto::SocketData {
594 data,
595 is_datagram: self.is_datagram_socket,
596 },
597 ),
598 },
599 )
600 .into(),
601 );
602 }
603 Err(e) => {
604 event_queue.push_back(
605 FDomainEvent::SocketStreamingData(
606 proto::SocketOnSocketStreamingDataRequest {
607 handle: self.hid,
608 socket_message: proto::SocketMessage::Stopped(
609 proto::AioStopped { error: Some(Box::new(e)) },
610 ),
611 },
612 )
613 .into(),
614 );
615 self.async_read_in_progress = false;
616 break 'read_loop;
617 }
618 }
619 }
620 }
621
622 _ => unreachable!("Processed async read for unreadable handle type!"),
623 }
624 }
625
626 fn process_write_queue(
628 &mut self,
629 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
630 ctx: &mut Context<'_>,
631 ) {
632 while let Poll::Ready(op) = self.write_queue.pop_front(ctx) {
637 match op {
638 WriteOp::Socket(mut op) => {
639 if let Some(event) = self.do_write_socket(&mut op) {
640 event_queue.push_back(event.into());
641 } else {
642 self.write_queue.push_front_no_wake(WriteOp::Socket(op));
643 break;
644 }
645 }
646 WriteOp::SetDisposition(tid, disposition, disposition_peer) => {
647 let result = { self.handle.socket_disposition(disposition, disposition_peer) };
648 event_queue.push_back(FDomainEvent::SocketDispositionSet(tid, result).into())
649 }
650 WriteOp::Channel(tid, data, mut handles) => {
651 if self
652 .do_write_channel(tid, &data, &mut handles, event_queue, ctx)
653 .is_pending()
654 {
655 self.write_queue.push_front_no_wake(WriteOp::Channel(tid, data, handles));
656 break;
657 }
658 }
659 }
660 }
661 }
662
663 fn do_read_socket(&mut self, tid: NonZeroU32, max_bytes: u64) -> Option<FDomainEvent> {
667 if self.async_read_in_progress {
668 return Some(
669 FDomainEvent::SocketData(
670 tid,
671 Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
672 )
673 .into(),
674 );
675 }
676
677 let max_bytes = if self.is_datagram_socket {
678 let AnyHandle::Socket(s) = &*self.handle else {
679 unreachable!("Read socket from state that wasn't for a socket!");
680 };
681 match s.info() {
682 Ok(x) => x.rx_buf_available as u64,
683 Err(e) => {
689 return Some(FDomainEvent::SocketData(
690 tid,
691 Err(proto::Error::TargetError(e.into_raw())),
692 ));
693 }
694 }
695 } else {
696 max_bytes
697 };
698 self.handle.read_socket(max_bytes).transpose().map(|x| {
699 FDomainEvent::SocketData(
700 tid,
701 x.map(|data| proto::SocketData { data, is_datagram: self.is_datagram_socket }),
702 )
703 })
704 }
705
706 fn do_write_socket(&mut self, op: &mut SocketWrite) -> Option<FDomainEvent> {
710 match self.handle.write_socket(&op.to_write) {
711 Ok(wrote) => {
712 op.wrote += wrote;
713 op.to_write.drain(..wrote);
714
715 if op.to_write.is_empty() {
716 Some(FDomainEvent::WroteSocket(
717 op.tid,
718 Ok(proto::SocketWriteSocketResponse {
719 wrote: op.wrote.try_into().unwrap(),
720 }),
721 ))
722 } else {
723 None
724 }
725 }
726 Err(error) => Some(FDomainEvent::WroteSocket(
727 op.tid,
728 Err(proto::WriteSocketError { error, wrote: op.wrote.try_into().unwrap() }),
729 )),
730 }
731 }
732
733 fn do_write_channel(
737 &mut self,
738 tid: NonZeroU32,
739 data: &[u8],
740 handles: &mut HandlesToWrite,
741 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
742 ctx: &mut Context<'_>,
743 ) -> Poll<()> {
744 let Poll::Ready(handles) = handles.poll_ready(event_queue, ctx) else {
745 return Poll::Pending;
746 };
747
748 let ret = self.handle.write_channel(data, handles);
749 if let Some(ret) = ret {
750 event_queue.push_back(FDomainEvent::WroteChannel(tid, ret).into())
751 }
752 Poll::Ready(())
753 }
754
755 fn do_read_channel(&mut self, tid: NonZeroU32) -> Option<UnprocessedFDomainEvent> {
759 if self.async_read_in_progress {
760 return Some(
761 FDomainEvent::ChannelData(
762 tid,
763 Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
764 )
765 .into(),
766 );
767 }
768 match self.handle.read_channel() {
769 Ok(x) => x.map(|x| UnprocessedFDomainEvent::ChannelData(tid, x)),
770 Err(e) => Some(FDomainEvent::ChannelData(tid, Err(e)).into()),
771 }
772 }
773}
774
775struct ClosingHandle {
778 action: Arc<CloseAction>,
779 state: Option<ShuttingDownHandle>,
780}
781
782impl ClosingHandle {
783 fn poll_ready(&mut self, fdomain: &mut FDomain, ctx: &mut Context<'_>) -> Poll<()> {
784 if let Some(state) = self.state.as_mut() {
785 if state.poll_ready(&mut fdomain.event_queue, ctx).is_ready() {
786 let state = self.state.take().unwrap();
787 let ShuttingDownHandle::Ready(handle) = state else {
788 unreachable!();
789 };
790 self.action.perform(fdomain, handle);
791 Poll::Ready(())
792 } else {
793 Poll::Pending
794 }
795 } else {
796 Poll::Ready(())
797 }
798 }
799}
800
801enum CloseAction {
806 Close { tid: NonZeroU32, count: AtomicU32, result: Result<()> },
807 Replace { tid: NonZeroU32, new_hid: proto::NewHandleId, rights: fidl::Rights },
808}
809
810impl CloseAction {
811 fn perform(&self, fdomain: &mut FDomain, handle: AnyHandle) {
812 match self {
813 CloseAction::Close { tid, count, result } => {
814 if count.fetch_sub(1, Ordering::Relaxed) == 1 {
815 fdomain
816 .event_queue
817 .push_back(FDomainEvent::ClosedHandle(*tid, result.clone()).into());
818 }
819 }
820 CloseAction::Replace { tid, new_hid, rights } => {
821 let result = handle
822 .replace(*rights)
823 .and_then(|handle| fdomain.alloc_client_handles([*new_hid], [handle]));
824 fdomain.event_queue.push_back(FDomainEvent::ReplacedHandle(*tid, result).into());
825 }
826 }
827 }
828}
829
830#[pin_project::pin_project]
835pub struct FDomain {
836 namespace: Box<dyn Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send>,
837 handles: HashMap<proto::HandleId, HandleState>,
838 closing_handles: Vec<ClosingHandle>,
839 event_queue: VecDeque<UnprocessedFDomainEvent>,
840 waker: Option<Waker>,
841}
842
843impl FDomain {
844 pub fn new_empty() -> Self {
847 Self::new(|| Err(fidl::Status::NOT_FOUND))
848 }
849
850 pub fn new(
852 namespace: impl Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send + 'static,
853 ) -> Self {
854 FDomain {
855 namespace: Box::new(namespace),
856 handles: HashMap::new(),
857 closing_handles: Vec::new(),
858 event_queue: VecDeque::new(),
859 waker: None,
860 }
861 }
862
863 fn push_event(&mut self, event: impl Into<UnprocessedFDomainEvent>) {
865 self.event_queue.push_back(event.into());
866 self.waker.take().map(Waker::wake);
867 }
868
869 fn process_message(
873 &mut self,
874 message: fidl::MessageBufEtc,
875 ) -> Result<proto::ChannelMessage, proto::Error> {
876 let (data, handles) = message.split();
877 let handles = handles
878 .into_iter()
879 .map(|info| {
880 let type_ = info.object_type;
881
882 let handle = match info.object_type {
883 fidl::ObjectType::CHANNEL => {
884 AnyHandle::Channel(fidl::Channel::from(info.handle))
885 }
886 fidl::ObjectType::SOCKET => AnyHandle::Socket(fidl::Socket::from(info.handle)),
887 fidl::ObjectType::EVENTPAIR => {
888 AnyHandle::EventPair(fidl::EventPair::from(info.handle))
889 }
890 fidl::ObjectType::EVENT => AnyHandle::Event(fidl::Event::from(info.handle)),
891 _ => AnyHandle::Unknown(handles::Unknown(info.handle, info.object_type)),
892 };
893
894 Ok(proto::HandleInfo {
895 rights: info.rights,
896 handle: self.alloc_fdomain_handle(handle)?,
897 type_,
898 })
899 })
900 .collect::<Result<Vec<_>, proto::Error>>()?;
901
902 Ok(proto::ChannelMessage { data, handles })
903 }
904
905 fn alloc_client_handles<const N: usize>(
912 &mut self,
913 ids: [proto::NewHandleId; N],
914 handles: [AnyHandle; N],
915 ) -> Result<(), proto::Error> {
916 for id in ids {
917 if id.id & (1 << 31) != 0 {
918 return Err(proto::Error::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange {
919 id: id.id,
920 }));
921 }
922
923 if self.handles.contains_key(&proto::HandleId { id: id.id }) {
924 return Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
925 id: id.id,
926 same_call: false,
927 }));
928 }
929 }
930
931 let mut sorted_ids = ids;
932 sorted_ids.sort();
933
934 if let Some([a, _]) = sorted_ids.array_windows().find(|&[a, b]| a == b) {
935 Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
936 id: a.id,
937 same_call: true,
938 }))
939 } else {
940 let ids = ids.into_iter().map(|id| proto::HandleId { id: id.id });
941 let handles = ids
942 .zip(handles.into_iter())
943 .map(|(id, h)| HandleState::new(h, id).map(|x| (id, x)))
944 .collect::<Result<Vec<_>, proto::Error>>()?;
945
946 self.handles.extend(handles);
947
948 Ok(())
949 }
950 }
951
952 fn alloc_fdomain_handle(&mut self, handle: AnyHandle) -> Result<proto::HandleId, proto::Error> {
955 loop {
956 let id = proto::HandleId { id: rand::random::<u32>() | (1u32 << 31) };
957 if let Entry::Vacant(v) = self.handles.entry(id) {
958 v.insert(HandleState::new(handle, id)?);
959 break Ok(id);
960 }
961 }
962 }
963
964 fn take_handle(&mut self, handle: proto::HandleId) -> Result<HandleState, proto::Error> {
966 self.handles
967 .remove(&handle)
968 .ok_or(proto::Error::BadHandleId(proto::BadHandleId { id: handle.id }))
969 }
970
971 fn using_handle<T>(
973 &mut self,
974 id: proto::HandleId,
975 f: impl FnOnce(&mut HandleState) -> Result<T, proto::Error>,
976 ) -> Result<T, proto::Error> {
977 if let Some(s) = self.handles.get_mut(&id) {
978 f(s)
979 } else {
980 Err(proto::Error::BadHandleId(proto::BadHandleId { id: id.id }))
981 }
982 }
983
984 pub fn get_namespace(&mut self, request: proto::FDomainGetNamespaceRequest) -> Result<()> {
985 match (self.namespace)() {
986 Ok(endpoint) => self.alloc_client_handles(
987 [request.new_handle],
988 [AnyHandle::Channel(endpoint.into_channel())],
989 ),
990 Err(e) => Err(proto::Error::TargetError(e.into_raw())),
991 }
992 }
993
994 pub fn create_channel(&mut self, request: proto::ChannelCreateChannelRequest) -> Result<()> {
995 let (a, b) = fidl::Channel::create();
996 self.alloc_client_handles(request.handles, [AnyHandle::Channel(a), AnyHandle::Channel(b)])
997 }
998
999 pub fn create_socket(&mut self, request: proto::SocketCreateSocketRequest) -> Result<()> {
1000 let (a, b) = match request.options {
1001 proto::SocketType::Stream => fidl::Socket::create_stream(),
1002 proto::SocketType::Datagram => fidl::Socket::create_datagram(),
1003 type_ => {
1004 return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown { type_ }));
1005 }
1006 };
1007
1008 self.alloc_client_handles(request.handles, [AnyHandle::Socket(a), AnyHandle::Socket(b)])
1009 }
1010
1011 pub fn create_event_pair(
1012 &mut self,
1013 request: proto::EventPairCreateEventPairRequest,
1014 ) -> Result<()> {
1015 let (a, b) = fidl::EventPair::create();
1016 self.alloc_client_handles(
1017 request.handles,
1018 [AnyHandle::EventPair(a), AnyHandle::EventPair(b)],
1019 )
1020 }
1021
1022 pub fn create_event(&mut self, request: proto::EventCreateEventRequest) -> Result<()> {
1023 let a = fidl::Event::create();
1024 self.alloc_client_handles([request.handle], [AnyHandle::Event(a)])
1025 }
1026
1027 pub fn set_socket_disposition(
1028 &mut self,
1029 tid: NonZeroU32,
1030 request: proto::SocketSetSocketDispositionRequest,
1031 ) {
1032 if let Err(err) = self.using_handle(request.handle, |h| {
1033 h.write_queue.push_back(WriteOp::SetDisposition(
1034 tid,
1035 request.disposition,
1036 request.disposition_peer,
1037 ));
1038 Ok(())
1039 }) {
1040 self.push_event(FDomainEvent::SocketDispositionSet(tid, Err(err)));
1041 }
1042 }
1043
1044 pub fn read_socket(&mut self, tid: NonZeroU32, request: proto::SocketReadSocketRequest) {
1045 if let Err(e) = self.using_handle(request.handle, |h| {
1046 h.read_queue.push_back(ReadOp::Socket(tid, request.max_bytes));
1047 Ok(())
1048 }) {
1049 self.push_event(FDomainEvent::SocketData(tid, Err(e)));
1050 }
1051 }
1052
1053 pub fn read_channel(&mut self, tid: NonZeroU32, request: proto::ChannelReadChannelRequest) {
1054 if let Err(e) = self.using_handle(request.handle, |h| {
1055 h.read_queue.push_back(ReadOp::Channel(tid));
1056 Ok(())
1057 }) {
1058 self.push_event(FDomainEvent::ChannelData(tid, Err(e)));
1059 }
1060 }
1061
1062 pub fn write_socket(&mut self, tid: NonZeroU32, request: proto::SocketWriteSocketRequest) {
1063 if let Err(error) = self.using_handle(request.handle, |h| {
1064 h.write_queue.push_back(WriteOp::Socket(SocketWrite {
1065 tid,
1066 wrote: 0,
1067 to_write: request.data,
1068 }));
1069 Ok(())
1070 }) {
1071 self.push_event(FDomainEvent::WroteSocket(
1072 tid,
1073 Err(proto::WriteSocketError { error, wrote: 0 }),
1074 ));
1075 }
1076 }
1077
1078 pub fn write_channel(&mut self, tid: NonZeroU32, request: proto::ChannelWriteChannelRequest) {
1079 let handles: Vec<Result<(ShuttingDownHandle, fidl::Rights)>> = match request.handles {
1087 proto::Handles::Handles(h) => h
1088 .into_iter()
1089 .map(|h| {
1090 if h != request.handle {
1091 self.take_handle(h).map(|handle_state| {
1092 (ShuttingDownHandle::InUse(h, handle_state), fidl::Rights::SAME_RIGHTS)
1093 })
1094 } else {
1095 Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1096 }
1097 })
1098 .collect(),
1099 proto::Handles::Dispositions(d) => d
1100 .into_iter()
1101 .map(|d| {
1102 let res = match d.handle {
1103 proto::HandleOp::Move_(h) => {
1104 if h != request.handle {
1105 self.take_handle(h).map(|x| ShuttingDownHandle::InUse(h, x))
1106 } else {
1107 Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1108 }
1109 }
1110 proto::HandleOp::Duplicate(h) => {
1111 if h != request.handle {
1112 self.using_handle(h, |h| {
1118 h.handle.duplicate(fidl::Rights::SAME_RIGHTS)
1119 })
1120 .map(ShuttingDownHandle::Ready)
1121 } else {
1122 Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1123 }
1124 }
1125 };
1126
1127 res.and_then(|x| Ok((x, d.rights)))
1128 })
1129 .collect(),
1130 };
1131
1132 if handles.iter().any(|x| x.is_err()) {
1133 let e = handles.into_iter().map(|x| x.err().map(Box::new)).collect();
1134
1135 self.push_event(FDomainEvent::WroteChannel(
1136 tid,
1137 Err(proto::WriteChannelError::OpErrors(e)),
1138 ));
1139 return;
1140 }
1141
1142 let handles = handles.into_iter().map(|x| x.unwrap()).collect::<Vec<_>>();
1143
1144 if let Err(e) = self.using_handle(request.handle, |h| {
1145 h.write_queue.push_back(WriteOp::Channel(
1146 tid,
1147 request.data,
1148 HandlesToWrite::SomeInUse(handles),
1149 ));
1150 Ok(())
1151 }) {
1152 self.push_event(FDomainEvent::WroteChannel(
1153 tid,
1154 Err(proto::WriteChannelError::Error(e)),
1155 ));
1156 }
1157 }
1158
1159 pub fn wait_for_signals(
1160 &mut self,
1161 tid: NonZeroU32,
1162 request: proto::FDomainWaitForSignalsRequest,
1163 ) {
1164 let result = self.using_handle(request.handle, |h| {
1165 let signals = fidl::Signals::from_bits_retain(request.signals);
1166 h.signal_waiters.push(SignalWaiter {
1167 tid,
1168 waiter: OnSignals::new(AnyHandleRef(Arc::clone(&h.handle)), signals),
1169 });
1170 Ok(())
1171 });
1172
1173 if let Err(e) = result {
1174 self.push_event(FDomainEvent::WaitForSignals(tid, Err(e)));
1175 } else {
1176 self.waker.take().map(Waker::wake);
1177 }
1178 }
1179
1180 pub fn close(&mut self, tid: NonZeroU32, request: proto::FDomainCloseRequest) {
1181 let mut states = Vec::with_capacity(request.handles.len());
1182 let mut result = Ok(());
1183 for hid in request.handles {
1184 match self.take_handle(hid) {
1185 Ok(state) => states.push((hid, state)),
1186
1187 Err(e) => {
1188 result = result.and(Err(e));
1189 }
1190 }
1191 }
1192
1193 let action = Arc::new(CloseAction::Close {
1194 tid,
1195 count: AtomicU32::new(states.len().try_into().unwrap()),
1196 result,
1197 });
1198
1199 for (hid, state) in states {
1200 self.closing_handles.push(ClosingHandle {
1201 action: Arc::clone(&action),
1202 state: Some(ShuttingDownHandle::InUse(hid, state)),
1203 });
1204 }
1205 }
1206
1207 pub fn duplicate(&mut self, request: proto::FDomainDuplicateRequest) -> Result<()> {
1208 let rights = request.rights;
1209 let handle = self.using_handle(request.handle, |h| h.handle.duplicate(rights));
1210 handle.and_then(|h| self.alloc_client_handles([request.new_handle], [h]))
1211 }
1212
1213 pub fn replace(
1214 &mut self,
1215 tid: NonZeroU32,
1216 request: proto::FDomainReplaceRequest,
1217 ) -> Result<()> {
1218 let rights = request.rights;
1219 let new_hid = request.new_handle;
1220 match self.take_handle(request.handle) {
1221 Ok(state) => self.closing_handles.push(ClosingHandle {
1222 action: Arc::new(CloseAction::Replace { tid, new_hid, rights }),
1223 state: Some(ShuttingDownHandle::InUse(request.handle, state)),
1224 }),
1225 Err(e) => self.event_queue.push_back(UnprocessedFDomainEvent::Ready(
1226 FDomainEvent::ReplacedHandle(tid, Err(e)),
1227 )),
1228 }
1229
1230 Ok(())
1231 }
1232
1233 pub fn signal(&mut self, request: proto::FDomainSignalRequest) -> Result<()> {
1234 let set = fidl::Signals::from_bits_retain(request.set);
1235 let clear = fidl::Signals::from_bits_retain(request.clear);
1236
1237 self.using_handle(request.handle, |h| h.handle.signal(clear, set))
1238 }
1239
1240 pub fn signal_peer(&mut self, request: proto::FDomainSignalPeerRequest) -> Result<()> {
1241 let set = fidl::Signals::from_bits_retain(request.set);
1242 let clear = fidl::Signals::from_bits_retain(request.clear);
1243
1244 self.using_handle(request.handle, |h| h.handle.signal_peer(clear, set))
1245 }
1246
1247 pub fn get_koid(
1248 &mut self,
1249 request: proto::FDomainGetKoidRequest,
1250 ) -> Result<proto::FDomainGetKoidResponse> {
1251 self.using_handle(request.handle, |h| {
1252 h.handle
1253 .as_handle_ref()
1254 .koid()
1255 .map(|k| proto::FDomainGetKoidResponse { koid: k.raw_koid() })
1256 .map_err(|e| proto::Error::TargetError(e.into_raw()))
1257 })
1258 }
1259
1260 pub fn read_channel_streaming_start(
1261 &mut self,
1262 tid: NonZeroU32,
1263 request: proto::ChannelReadChannelStreamingStartRequest,
1264 ) {
1265 if let Err(err) = self.using_handle(request.handle, |h| {
1266 h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1267 h.read_queue.push_back(ReadOp::StreamingChannel(tid, true));
1268 Ok(())
1269 }) {
1270 self.event_queue
1271 .push_back(FDomainEvent::ChannelStreamingReadStart(tid, Err(err)).into())
1272 }
1273 }
1274
1275 pub fn read_channel_streaming_stop(
1276 &mut self,
1277 tid: NonZeroU32,
1278 request: proto::ChannelReadChannelStreamingStopRequest,
1279 ) {
1280 if let Err(err) = self.using_handle(request.handle, |h| {
1281 h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1282 h.read_queue.push_back(ReadOp::StreamingChannel(tid, false));
1283 Ok(())
1284 }) {
1285 self.event_queue.push_back(FDomainEvent::ChannelStreamingReadStop(tid, Err(err)).into())
1286 }
1287 }
1288
1289 pub fn read_socket_streaming_start(
1290 &mut self,
1291 tid: NonZeroU32,
1292 request: proto::SocketReadSocketStreamingStartRequest,
1293 ) {
1294 if let Err(err) = self.using_handle(request.handle, |h| {
1295 h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1296 h.read_queue.push_back(ReadOp::StreamingSocket(tid, true));
1297 Ok(())
1298 }) {
1299 self.event_queue.push_back(FDomainEvent::SocketStreamingReadStart(tid, Err(err)).into())
1300 }
1301 }
1302
1303 pub fn read_socket_streaming_stop(
1304 &mut self,
1305 tid: NonZeroU32,
1306 request: proto::SocketReadSocketStreamingStopRequest,
1307 ) {
1308 if let Err(err) = self.using_handle(request.handle, |h| {
1309 h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1310 h.read_queue.push_back(ReadOp::StreamingSocket(tid, false));
1311 Ok(())
1312 }) {
1313 self.event_queue.push_back(FDomainEvent::SocketStreamingReadStop(tid, Err(err)).into())
1314 }
1315 }
1316}
1317
1318impl futures::Stream for FDomain {
1321 type Item = FDomainEvent;
1322
1323 fn poll_next(
1324 mut self: std::pin::Pin<&mut Self>,
1325 ctx: &mut Context<'_>,
1326 ) -> Poll<Option<Self::Item>> {
1327 let this = &mut *self;
1328
1329 let mut closing_handles = std::mem::replace(&mut this.closing_handles, Vec::new());
1330 closing_handles.retain_mut(|x| x.poll_ready(this, ctx).is_pending());
1331 this.closing_handles = closing_handles;
1332
1333 let handles = &mut this.handles;
1334 let event_queue = &mut this.event_queue;
1335 for state in handles.values_mut() {
1336 state.poll(event_queue, ctx);
1337 }
1338
1339 if let Some(event) = self.event_queue.pop_front() {
1340 match event {
1341 UnprocessedFDomainEvent::Ready(event) => Poll::Ready(Some(event)),
1342 UnprocessedFDomainEvent::ChannelData(tid, message) => {
1343 Poll::Ready(Some(FDomainEvent::ChannelData(tid, self.process_message(message))))
1344 }
1345 UnprocessedFDomainEvent::ChannelStreamingData(hid, message) => {
1346 match self.process_message(message) {
1347 Ok(message) => Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1348 proto::ChannelOnChannelStreamingDataRequest {
1349 handle: hid,
1350 channel_sent: proto::ChannelSent::Message(message),
1351 },
1352 ))),
1353 Err(e) => {
1354 self.handles.get_mut(&hid).unwrap().async_read_in_progress = false;
1355 Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1356 proto::ChannelOnChannelStreamingDataRequest {
1357 handle: hid,
1358 channel_sent: proto::ChannelSent::Stopped(proto::AioStopped {
1359 error: Some(Box::new(e)),
1360 }),
1361 },
1362 )))
1363 }
1364 }
1365 }
1366 }
1367 } else {
1368 self.waker = Some(ctx.waker().clone());
1369 Poll::Pending
1370 }
1371 }
1372}