1use fidl::endpoints::ClientEnd;
6use fidl::{AsHandleRef, HandleBased};
7use futures::prelude::*;
8use replace_with::replace_with;
9use std::collections::hash_map::Entry;
10use std::collections::{HashMap, VecDeque};
11use std::num::NonZeroU32;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicU32, Ordering};
14use std::task::{Context, Poll, Waker};
15use {fidl_fuchsia_fdomain as proto, fidl_fuchsia_io as fio, fuchsia_async as fasync};
16
17mod handles;
18pub mod wire;
19
20#[cfg(test)]
21mod test;
22
23pub type Result<T, E = proto::Error> = std::result::Result<T, E>;
24
25use handles::{AnyHandle, HandleType as _, IsDatagramSocket};
26
27struct Queue<T>(VecDeque<T>, Option<Waker>);
30
31impl<T> Queue<T> {
32 fn new() -> Self {
34 Queue(VecDeque::new(), None)
35 }
36
37 fn is_empty(&self) -> bool {
39 self.0.is_empty()
40 }
41
42 fn destroy_front(&mut self) {
47 assert!(self.0.pop_front().is_some(), "Expected to find a value!");
48 }
49
50 fn pop_front(&mut self, ctx: &mut Context<'_>) -> Poll<T> {
52 if let Some(t) = self.0.pop_front() {
53 Poll::Ready(t)
54 } else {
55 self.1 = Some(ctx.waker().clone());
56 Poll::Pending
57 }
58 }
59
60 fn push_front_no_wake(&mut self, t: T) {
67 self.0.push_front(t)
68 }
69
70 fn push_back(&mut self, t: T) {
72 self.0.push_back(t);
73 self.1.take().map(Waker::wake);
74 }
75
76 fn front_mut(&mut self, ctx: &mut Context<'_>) -> Poll<&mut T> {
78 if let Some(t) = self.0.front_mut() {
79 Poll::Ready(t)
80 } else {
81 self.1 = Some(ctx.waker().clone());
82 Poll::Pending
83 }
84 }
85}
86
87const ASYNC_READ_BUFSIZE: u64 = 40960;
89
90#[derive(Debug)]
92pub enum FDomainEvent {
93 ChannelStreamingReadStart(NonZeroU32, Result<()>),
94 ChannelStreamingReadStop(NonZeroU32, Result<()>),
95 SocketStreamingReadStart(NonZeroU32, Result<()>),
96 SocketStreamingReadStop(NonZeroU32, Result<()>),
97 WaitForSignals(NonZeroU32, Result<proto::FDomainWaitForSignalsResponse>),
98 SocketData(NonZeroU32, Result<proto::SocketData>),
99 SocketStreamingData(proto::SocketOnSocketStreamingDataRequest),
100 SocketDispositionSet(NonZeroU32, Result<()>),
101 WroteSocket(NonZeroU32, Result<proto::SocketWriteSocketResponse, proto::WriteSocketError>),
102 ChannelData(NonZeroU32, Result<proto::ChannelMessage>),
103 ChannelStreamingData(proto::ChannelOnChannelStreamingDataRequest),
104 WroteChannel(NonZeroU32, Result<(), proto::WriteChannelError>),
105 ClosedHandle(NonZeroU32, Result<()>),
106 ReplacedHandle(NonZeroU32, Result<()>),
107}
108
109enum UnprocessedFDomainEvent {
113 Ready(FDomainEvent),
114 ChannelData(NonZeroU32, fidl::MessageBufEtc),
115 ChannelStreamingData(proto::HandleId, fidl::MessageBufEtc),
116}
117
118impl From<FDomainEvent> for UnprocessedFDomainEvent {
119 fn from(other: FDomainEvent) -> UnprocessedFDomainEvent {
120 UnprocessedFDomainEvent::Ready(other)
121 }
122}
123
124enum ReadOp {
126 StreamingChannel(NonZeroU32, bool),
128 StreamingSocket(NonZeroU32, bool),
130 Socket(NonZeroU32, u64),
131 Channel(NonZeroU32),
132}
133
134struct SocketWrite {
138 tid: NonZeroU32,
139 wrote: usize,
140 to_write: Vec<u8>,
141}
142
143enum WriteOp {
145 Socket(SocketWrite),
146 Channel(NonZeroU32, Vec<u8>, HandlesToWrite),
147 SetDisposition(NonZeroU32, proto::SocketDisposition, proto::SocketDisposition),
148}
149
150enum ShuttingDownHandle {
155 InUse(proto::HandleId, HandleState),
156 Ready(AnyHandle),
157}
158
159impl ShuttingDownHandle {
160 fn poll_ready(
161 &mut self,
162 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
163 ctx: &mut Context<'_>,
164 ) -> Poll<()> {
165 replace_with(self, |this| match this {
166 this @ ShuttingDownHandle::Ready(_) => this,
167 ShuttingDownHandle::InUse(hid, mut state) => {
168 state.poll(event_queue, ctx);
169
170 if state.write_queue.is_empty() {
171 while let Poll::Ready(op) = state.read_queue.pop_front(ctx) {
172 match op {
173 ReadOp::StreamingChannel(tid, start) => {
174 let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
175 id: hid.id,
176 }));
177 let event = if start {
178 FDomainEvent::ChannelStreamingReadStart(tid, err)
179 } else {
180 FDomainEvent::ChannelStreamingReadStop(tid, err)
181 };
182 event_queue.push_back(event.into());
183 }
184 ReadOp::StreamingSocket(tid, start) => {
185 let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
186 id: hid.id,
187 }));
188 let event = if start {
189 FDomainEvent::SocketStreamingReadStart(tid, err)
190 } else {
191 FDomainEvent::SocketStreamingReadStop(tid, err)
192 };
193 event_queue.push_back(event.into());
194 }
195 ReadOp::Channel(tid) => {
196 let err = state
197 .handle
198 .expected_type(fidl::ObjectType::CHANNEL)
199 .err()
200 .unwrap_or(proto::Error::ClosedDuringRead(
201 proto::ClosedDuringRead,
202 ));
203 event_queue
204 .push_back(FDomainEvent::ChannelData(tid, Err(err)).into());
205 }
206 ReadOp::Socket(tid, _max_bytes) => {
207 let err = state
208 .handle
209 .expected_type(fidl::ObjectType::SOCKET)
210 .err()
211 .unwrap_or(proto::Error::ClosedDuringRead(
212 proto::ClosedDuringRead,
213 ));
214 event_queue
215 .push_back(FDomainEvent::SocketData(tid, Err(err)).into());
216 }
217 }
218 }
219
220 if state.async_read_in_progress {
221 match &*state.handle {
222 AnyHandle::Channel(_) => event_queue.push_back(
223 FDomainEvent::ChannelStreamingData(
224 proto::ChannelOnChannelStreamingDataRequest {
225 handle: hid,
226 channel_sent: proto::ChannelSent::Stopped(
227 proto::AioStopped { error: None },
228 ),
229 },
230 )
231 .into(),
232 ),
233 AnyHandle::Socket(_) => event_queue.push_back(
234 FDomainEvent::SocketStreamingData(
235 proto::SocketOnSocketStreamingDataRequest {
236 handle: hid,
237 socket_message: proto::SocketMessage::Stopped(
238 proto::AioStopped { error: None },
239 ),
240 },
241 )
242 .into(),
243 ),
244 AnyHandle::EventPair(_)
245 | AnyHandle::Event(_)
246 | AnyHandle::Unknown(_) => unreachable!(),
247 }
248 }
249
250 state.signal_waiters.clear();
251 state.io_waiter = None;
252
253 ShuttingDownHandle::Ready(
254 Arc::into_inner(state.handle).expect("Unaccounted-for handle reference!"),
255 )
256 } else {
257 ShuttingDownHandle::InUse(hid, state)
258 }
259 }
260 });
261
262 if matches!(self, ShuttingDownHandle::Ready(_)) { Poll::Ready(()) } else { Poll::Pending }
263 }
264}
265
266enum HandlesToWrite {
270 SomeInUse(Vec<(ShuttingDownHandle, fidl::Rights)>),
271 AllReady(Vec<fidl::HandleDisposition<'static>>),
272}
273
274impl HandlesToWrite {
275 fn poll_ready(
276 &mut self,
277 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
278 ctx: &mut Context<'_>,
279 ) -> Poll<&mut Vec<fidl::HandleDisposition<'static>>> {
280 match self {
281 HandlesToWrite::AllReady(s) => Poll::Ready(s),
282 HandlesToWrite::SomeInUse(handles) => {
283 let mut ready = true;
284 for (handle, _) in handles.iter_mut() {
285 ready = ready && handle.poll_ready(event_queue, ctx).is_ready();
286 }
287
288 if !ready {
289 return Poll::Pending;
290 }
291
292 *self = HandlesToWrite::AllReady(
293 handles
294 .drain(..)
295 .map(|(handle, rights)| {
296 let ShuttingDownHandle::Ready(handle) = handle else { unreachable!() };
297
298 fidl::HandleDisposition::new(
299 fidl::HandleOp::Move(handle.into()),
300 fidl::ObjectType::NONE,
301 rights,
302 fidl::Status::OK,
303 )
304 })
305 .collect(),
306 );
307
308 let HandlesToWrite::AllReady(s) = self else { unreachable!() };
309 Poll::Ready(s)
310 }
311 }
312 }
313}
314
315struct AnyHandleRef(Arc<AnyHandle>);
316
317impl AsHandleRef for AnyHandleRef {
318 fn as_handle_ref(&self) -> fidl::HandleRef<'_> {
319 self.0.as_handle_ref()
320 }
321}
322
323#[cfg(target_os = "fuchsia")]
324type OnSignals = fasync::OnSignals<'static, AnyHandleRef>;
325
326#[cfg(not(target_os = "fuchsia"))]
327type OnSignals = fasync::OnSignalsRef<'static>;
328
329struct SignalWaiter {
332 tid: NonZeroU32,
333 waiter: OnSignals,
334}
335
336struct HandleState {
338 handle: Arc<AnyHandle>,
340 hid: proto::HandleId,
342 is_datagram_socket: bool,
346 async_read_in_progress: bool,
351 read_queue: Queue<ReadOp>,
357 write_queue: Queue<WriteOp>,
364 signal_waiters: Vec<SignalWaiter>,
366 io_waiter: Option<OnSignals>,
369}
370
371impl HandleState {
372 fn new(handle: AnyHandle, hid: proto::HandleId) -> Result<Self, proto::Error> {
373 let is_datagram_socket = match handle.is_datagram_socket() {
374 IsDatagramSocket::Unknown => {
375 return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown {
376 type_: proto::SocketType::unknown(),
377 }));
378 }
379 other => other.is_datagram(),
380 };
381 Ok(HandleState {
382 handle: Arc::new(handle),
383 hid,
384 async_read_in_progress: false,
385 is_datagram_socket,
386 read_queue: Queue::new(),
387 write_queue: Queue::new(),
388 signal_waiters: Vec::new(),
389 io_waiter: None,
390 })
391 }
392
393 fn poll(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>, ctx: &mut Context<'_>) {
396 self.signal_waiters.retain_mut(|x| {
397 let Poll::Ready(result) = x.waiter.poll_unpin(ctx) else {
398 return true;
399 };
400
401 event_queue.push_back(
402 FDomainEvent::WaitForSignals(
403 x.tid,
404 result
405 .map(|x| proto::FDomainWaitForSignalsResponse { signals: x.bits() })
406 .map_err(|e| proto::Error::TargetError(e.into_raw())),
407 )
408 .into(),
409 );
410
411 false
412 });
413
414 let read_signals = self.handle.read_signals();
415 let write_signals = self.handle.write_signals();
416
417 loop {
418 if let Some(signal_waiter) = self.io_waiter.as_mut() {
419 if let Poll::Ready(sigs) = signal_waiter.poll_unpin(ctx) {
420 if let Ok(sigs) = sigs {
421 if sigs.intersects(read_signals) {
422 self.process_read_queue(event_queue, ctx);
423 }
424 if sigs.intersects(write_signals) {
425 self.process_write_queue(event_queue, ctx);
426 }
427 }
428 } else {
429 let need_read = matches!(
430 self.read_queue.front_mut(ctx),
431 Poll::Ready(ReadOp::StreamingChannel(_, _) | ReadOp::StreamingSocket(_, _))
432 );
433 let need_write = matches!(
434 self.write_queue.front_mut(ctx),
435 Poll::Ready(WriteOp::SetDisposition(_, _, _))
436 );
437
438 self.process_read_queue(event_queue, ctx);
439 self.process_write_queue(event_queue, ctx);
440
441 if !(need_read || need_write) {
442 break;
443 }
444 }
445 }
446
447 let subscribed_signals =
448 if self.async_read_in_progress || !self.read_queue.is_empty() {
449 read_signals
450 } else {
451 fidl::Signals::NONE
452 } | if !self.write_queue.is_empty() { write_signals } else { fidl::Signals::NONE };
453
454 if !subscribed_signals.is_empty() {
455 self.io_waiter = Some(OnSignals::new(
456 AnyHandleRef(Arc::clone(&self.handle)),
457 subscribed_signals,
458 ));
459 } else {
460 self.io_waiter = None;
461 break;
462 }
463 }
464 }
465
466 fn try_enable_async_read(&mut self) -> Result<()> {
468 if self.async_read_in_progress {
469 Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress))
470 } else {
471 self.async_read_in_progress = true;
472 Ok(())
473 }
474 }
475
476 fn try_disable_async_read(&mut self) -> Result<()> {
478 if !self.async_read_in_progress {
479 Err(proto::Error::NoReadInProgress(proto::NoReadInProgress))
480 } else {
481 self.async_read_in_progress = false;
482 Ok(())
483 }
484 }
485
486 fn process_read_queue(
488 &mut self,
489 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
490 ctx: &mut Context<'_>,
491 ) {
492 while let Poll::Ready(op) = self.read_queue.front_mut(ctx) {
493 match op {
494 ReadOp::StreamingChannel(tid, true) => {
495 let tid = *tid;
496 let result = self.try_enable_async_read();
497 event_queue
498 .push_back(FDomainEvent::ChannelStreamingReadStart(tid, result).into());
499 self.read_queue.destroy_front();
500 }
501 ReadOp::StreamingChannel(tid, false) => {
502 let tid = *tid;
503 let result = self.try_disable_async_read();
504 event_queue
505 .push_back(FDomainEvent::ChannelStreamingReadStop(tid, result).into());
506 self.read_queue.destroy_front();
507 }
508 ReadOp::StreamingSocket(tid, true) => {
509 let tid = *tid;
510 let result = self.try_enable_async_read();
511 event_queue
512 .push_back(FDomainEvent::SocketStreamingReadStart(tid, result).into());
513 self.read_queue.destroy_front();
514 }
515 ReadOp::StreamingSocket(tid, false) => {
516 let tid = *tid;
517 let result = self.try_disable_async_read();
518 event_queue
519 .push_back(FDomainEvent::SocketStreamingReadStop(tid, result).into());
520 self.read_queue.destroy_front();
521 }
522 ReadOp::Socket(tid, max_bytes) => {
523 let (tid, max_bytes) = (*tid, *max_bytes);
524 if let Some(event) = self.do_read_socket(tid, max_bytes) {
525 let _ = self.read_queue.pop_front(ctx);
526 event_queue.push_back(event.into());
527 } else {
528 break;
529 }
530 }
531 ReadOp::Channel(tid) => {
532 let tid = *tid;
533 if let Some(event) = self.do_read_channel(tid) {
534 let _ = self.read_queue.pop_front(ctx);
535 event_queue.push_back(event.into());
536 } else {
537 break;
538 }
539 }
540 }
541 }
542
543 if self.async_read_in_progress {
544 assert!(self.read_queue.is_empty());
547 self.process_async_read(event_queue);
548 }
549 }
550
551 fn process_async_read(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>) {
552 assert!(self.async_read_in_progress);
553
554 match &*self.handle {
555 AnyHandle::Channel(_) => {
556 'read_loop: while let Some(result) = self.handle.read_channel().transpose() {
557 match result {
558 Ok(msg) => event_queue.push_back(
559 UnprocessedFDomainEvent::ChannelStreamingData(self.hid, msg),
560 ),
561 Err(e) => {
562 event_queue.push_back(
563 FDomainEvent::ChannelStreamingData(
564 proto::ChannelOnChannelStreamingDataRequest {
565 handle: self.hid,
566 channel_sent: proto::ChannelSent::Stopped(
567 proto::AioStopped { error: Some(Box::new(e)) },
568 ),
569 },
570 )
571 .into(),
572 );
573 self.async_read_in_progress = false;
574 break 'read_loop;
575 }
576 }
577 }
578 }
579
580 AnyHandle::Socket(_) => {
581 'read_loop: while let Some(result) =
582 self.handle.read_socket(ASYNC_READ_BUFSIZE).transpose()
583 {
584 match result {
585 Ok(data) => {
586 event_queue.push_back(
587 FDomainEvent::SocketStreamingData(
588 proto::SocketOnSocketStreamingDataRequest {
589 handle: self.hid,
590 socket_message: proto::SocketMessage::Data(
591 proto::SocketData {
592 data,
593 is_datagram: self.is_datagram_socket,
594 },
595 ),
596 },
597 )
598 .into(),
599 );
600 }
601 Err(e) => {
602 event_queue.push_back(
603 FDomainEvent::SocketStreamingData(
604 proto::SocketOnSocketStreamingDataRequest {
605 handle: self.hid,
606 socket_message: proto::SocketMessage::Stopped(
607 proto::AioStopped { error: Some(Box::new(e)) },
608 ),
609 },
610 )
611 .into(),
612 );
613 self.async_read_in_progress = false;
614 break 'read_loop;
615 }
616 }
617 }
618 }
619
620 _ => unreachable!("Processed async read for unreadable handle type!"),
621 }
622 }
623
624 fn process_write_queue(
626 &mut self,
627 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
628 ctx: &mut Context<'_>,
629 ) {
630 while let Poll::Ready(op) = self.write_queue.pop_front(ctx) {
635 match op {
636 WriteOp::Socket(mut op) => {
637 if let Some(event) = self.do_write_socket(&mut op) {
638 event_queue.push_back(event.into());
639 } else {
640 self.write_queue.push_front_no_wake(WriteOp::Socket(op));
641 break;
642 }
643 }
644 WriteOp::SetDisposition(tid, disposition, disposition_peer) => {
645 let result = { self.handle.socket_disposition(disposition, disposition_peer) };
646 event_queue.push_back(FDomainEvent::SocketDispositionSet(tid, result).into())
647 }
648 WriteOp::Channel(tid, data, mut handles) => {
649 if self
650 .do_write_channel(tid, &data, &mut handles, event_queue, ctx)
651 .is_pending()
652 {
653 self.write_queue.push_front_no_wake(WriteOp::Channel(tid, data, handles));
654 break;
655 }
656 }
657 }
658 }
659 }
660
661 fn do_read_socket(&mut self, tid: NonZeroU32, max_bytes: u64) -> Option<FDomainEvent> {
665 if self.async_read_in_progress {
666 return Some(
667 FDomainEvent::SocketData(
668 tid,
669 Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
670 )
671 .into(),
672 );
673 }
674
675 let max_bytes = if self.is_datagram_socket {
676 let AnyHandle::Socket(s) = &*self.handle else {
677 unreachable!("Read socket from state that wasn't for a socket!");
678 };
679 match s.info() {
680 Ok(x) => x.rx_buf_available as u64,
681 Err(e) => {
687 return Some(FDomainEvent::SocketData(
688 tid,
689 Err(proto::Error::TargetError(e.into_raw())),
690 ));
691 }
692 }
693 } else {
694 max_bytes
695 };
696 self.handle.read_socket(max_bytes).transpose().map(|x| {
697 FDomainEvent::SocketData(
698 tid,
699 x.map(|data| proto::SocketData { data, is_datagram: self.is_datagram_socket }),
700 )
701 })
702 }
703
704 fn do_write_socket(&mut self, op: &mut SocketWrite) -> Option<FDomainEvent> {
708 match self.handle.write_socket(&op.to_write) {
709 Ok(wrote) => {
710 op.wrote += wrote;
711 op.to_write.drain(..wrote);
712
713 if op.to_write.is_empty() {
714 Some(FDomainEvent::WroteSocket(
715 op.tid,
716 Ok(proto::SocketWriteSocketResponse {
717 wrote: op.wrote.try_into().unwrap(),
718 }),
719 ))
720 } else {
721 None
722 }
723 }
724 Err(error) => Some(FDomainEvent::WroteSocket(
725 op.tid,
726 Err(proto::WriteSocketError { error, wrote: op.wrote.try_into().unwrap() }),
727 )),
728 }
729 }
730
731 fn do_write_channel(
735 &mut self,
736 tid: NonZeroU32,
737 data: &[u8],
738 handles: &mut HandlesToWrite,
739 event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
740 ctx: &mut Context<'_>,
741 ) -> Poll<()> {
742 let Poll::Ready(handles) = handles.poll_ready(event_queue, ctx) else {
743 return Poll::Pending;
744 };
745
746 let ret = self.handle.write_channel(data, handles);
747 if let Some(ret) = ret {
748 event_queue.push_back(FDomainEvent::WroteChannel(tid, ret).into())
749 }
750 Poll::Ready(())
751 }
752
753 fn do_read_channel(&mut self, tid: NonZeroU32) -> Option<UnprocessedFDomainEvent> {
757 if self.async_read_in_progress {
758 return Some(
759 FDomainEvent::ChannelData(
760 tid,
761 Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
762 )
763 .into(),
764 );
765 }
766 match self.handle.read_channel() {
767 Ok(x) => x.map(|x| UnprocessedFDomainEvent::ChannelData(tid, x)),
768 Err(e) => Some(FDomainEvent::ChannelData(tid, Err(e)).into()),
769 }
770 }
771}
772
773struct ClosingHandle {
776 action: Arc<CloseAction>,
777 state: Option<ShuttingDownHandle>,
778}
779
780impl ClosingHandle {
781 fn poll_ready(&mut self, fdomain: &mut FDomain, ctx: &mut Context<'_>) -> Poll<()> {
782 if let Some(state) = self.state.as_mut() {
783 if state.poll_ready(&mut fdomain.event_queue, ctx).is_ready() {
784 let state = self.state.take().unwrap();
785 let ShuttingDownHandle::Ready(handle) = state else {
786 unreachable!();
787 };
788 self.action.perform(fdomain, handle);
789 Poll::Ready(())
790 } else {
791 Poll::Pending
792 }
793 } else {
794 Poll::Ready(())
795 }
796 }
797}
798
799enum CloseAction {
804 Close { tid: NonZeroU32, count: AtomicU32, result: Result<()> },
805 Replace { tid: NonZeroU32, new_hid: proto::NewHandleId, rights: fidl::Rights },
806}
807
808impl CloseAction {
809 fn perform(&self, fdomain: &mut FDomain, handle: AnyHandle) {
810 match self {
811 CloseAction::Close { tid, count, result } => {
812 if count.fetch_sub(1, Ordering::Relaxed) == 1 {
813 fdomain
814 .event_queue
815 .push_back(FDomainEvent::ClosedHandle(*tid, result.clone()).into());
816 }
817 }
818 CloseAction::Replace { tid, new_hid, rights } => {
819 let result = handle
820 .replace(*rights)
821 .and_then(|handle| fdomain.alloc_client_handles([*new_hid], [handle]));
822 fdomain.event_queue.push_back(FDomainEvent::ReplacedHandle(*tid, result).into());
823 }
824 }
825 }
826}
827
828#[pin_project::pin_project]
833pub struct FDomain {
834 namespace: Box<dyn Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send>,
835 handles: HashMap<proto::HandleId, HandleState>,
836 closing_handles: Vec<ClosingHandle>,
837 event_queue: VecDeque<UnprocessedFDomainEvent>,
838 waker: Option<Waker>,
839}
840
841impl FDomain {
842 pub fn new_empty() -> Self {
845 Self::new(|| Err(fidl::Status::NOT_FOUND))
846 }
847
848 pub fn new(
850 namespace: impl Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send + 'static,
851 ) -> Self {
852 FDomain {
853 namespace: Box::new(namespace),
854 handles: HashMap::new(),
855 closing_handles: Vec::new(),
856 event_queue: VecDeque::new(),
857 waker: None,
858 }
859 }
860
861 fn push_event(&mut self, event: impl Into<UnprocessedFDomainEvent>) {
863 self.event_queue.push_back(event.into());
864 self.waker.take().map(Waker::wake);
865 }
866
867 fn process_message(
871 &mut self,
872 message: fidl::MessageBufEtc,
873 ) -> Result<proto::ChannelMessage, proto::Error> {
874 let (data, handles) = message.split();
875 let handles = handles
876 .into_iter()
877 .map(|info| {
878 let type_ = info.object_type;
879
880 let handle = match info.object_type {
881 fidl::ObjectType::CHANNEL => {
882 AnyHandle::Channel(fidl::Channel::from_handle(info.handle))
883 }
884 fidl::ObjectType::SOCKET => {
885 AnyHandle::Socket(fidl::Socket::from_handle(info.handle))
886 }
887 fidl::ObjectType::EVENTPAIR => {
888 AnyHandle::EventPair(fidl::EventPair::from_handle(info.handle))
889 }
890 fidl::ObjectType::EVENT => {
891 AnyHandle::Event(fidl::Event::from_handle(info.handle))
892 }
893 _ => AnyHandle::Unknown(handles::Unknown(info.handle, info.object_type)),
894 };
895
896 Ok(proto::HandleInfo {
897 rights: info.rights,
898 handle: self.alloc_fdomain_handle(handle)?,
899 type_,
900 })
901 })
902 .collect::<Result<Vec<_>, proto::Error>>()?;
903
904 Ok(proto::ChannelMessage { data, handles })
905 }
906
907 fn alloc_client_handles<const N: usize>(
914 &mut self,
915 ids: [proto::NewHandleId; N],
916 handles: [AnyHandle; N],
917 ) -> Result<(), proto::Error> {
918 for id in ids {
919 if id.id & (1 << 31) != 0 {
920 return Err(proto::Error::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange {
921 id: id.id,
922 }));
923 }
924
925 if self.handles.contains_key(&proto::HandleId { id: id.id }) {
926 return Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
927 id: id.id,
928 same_call: false,
929 }));
930 }
931 }
932
933 let mut sorted_ids = ids;
934 sorted_ids.sort();
935
936 if let Some(a) = sorted_ids.windows(2).find(|x| x[0] == x[1]) {
937 Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
938 id: a[0].id,
939 same_call: true,
940 }))
941 } else {
942 let ids = ids.into_iter().map(|id| proto::HandleId { id: id.id });
943 let handles = ids
944 .zip(handles.into_iter())
945 .map(|(id, h)| HandleState::new(h, id).map(|x| (id, x)))
946 .collect::<Result<Vec<_>, proto::Error>>()?;
947
948 self.handles.extend(handles);
949
950 Ok(())
951 }
952 }
953
954 fn alloc_fdomain_handle(&mut self, handle: AnyHandle) -> Result<proto::HandleId, proto::Error> {
957 loop {
958 let id = proto::HandleId { id: rand::random::<u32>() | (1u32 << 31) };
959 if let Entry::Vacant(v) = self.handles.entry(id) {
960 v.insert(HandleState::new(handle, id)?);
961 break Ok(id);
962 }
963 }
964 }
965
966 fn take_handle(&mut self, handle: proto::HandleId) -> Result<HandleState, proto::Error> {
968 self.handles
969 .remove(&handle)
970 .ok_or(proto::Error::BadHandleId(proto::BadHandleId { id: handle.id }))
971 }
972
973 fn using_handle<T>(
975 &mut self,
976 id: proto::HandleId,
977 f: impl FnOnce(&mut HandleState) -> Result<T, proto::Error>,
978 ) -> Result<T, proto::Error> {
979 if let Some(s) = self.handles.get_mut(&id) {
980 f(s)
981 } else {
982 Err(proto::Error::BadHandleId(proto::BadHandleId { id: id.id }))
983 }
984 }
985
986 pub fn get_namespace(&mut self, request: proto::FDomainGetNamespaceRequest) -> Result<()> {
987 match (self.namespace)() {
988 Ok(endpoint) => self.alloc_client_handles(
989 [request.new_handle],
990 [AnyHandle::Channel(endpoint.into_channel())],
991 ),
992 Err(e) => Err(proto::Error::TargetError(e.into_raw())),
993 }
994 }
995
996 pub fn create_channel(&mut self, request: proto::ChannelCreateChannelRequest) -> Result<()> {
997 let (a, b) = fidl::Channel::create();
998 self.alloc_client_handles(request.handles, [AnyHandle::Channel(a), AnyHandle::Channel(b)])
999 }
1000
1001 pub fn create_socket(&mut self, request: proto::SocketCreateSocketRequest) -> Result<()> {
1002 let (a, b) = match request.options {
1003 proto::SocketType::Stream => fidl::Socket::create_stream(),
1004 proto::SocketType::Datagram => fidl::Socket::create_datagram(),
1005 type_ => {
1006 return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown { type_ }));
1007 }
1008 };
1009
1010 self.alloc_client_handles(request.handles, [AnyHandle::Socket(a), AnyHandle::Socket(b)])
1011 }
1012
1013 pub fn create_event_pair(
1014 &mut self,
1015 request: proto::EventPairCreateEventPairRequest,
1016 ) -> Result<()> {
1017 let (a, b) = fidl::EventPair::create();
1018 self.alloc_client_handles(
1019 request.handles,
1020 [AnyHandle::EventPair(a), AnyHandle::EventPair(b)],
1021 )
1022 }
1023
1024 pub fn create_event(&mut self, request: proto::EventCreateEventRequest) -> Result<()> {
1025 let a = fidl::Event::create();
1026 self.alloc_client_handles([request.handle], [AnyHandle::Event(a)])
1027 }
1028
1029 pub fn set_socket_disposition(
1030 &mut self,
1031 tid: NonZeroU32,
1032 request: proto::SocketSetSocketDispositionRequest,
1033 ) {
1034 if let Err(err) = self.using_handle(request.handle, |h| {
1035 h.write_queue.push_back(WriteOp::SetDisposition(
1036 tid,
1037 request.disposition,
1038 request.disposition_peer,
1039 ));
1040 Ok(())
1041 }) {
1042 self.push_event(FDomainEvent::SocketDispositionSet(tid, Err(err)));
1043 }
1044 }
1045
1046 pub fn read_socket(&mut self, tid: NonZeroU32, request: proto::SocketReadSocketRequest) {
1047 if let Err(e) = self.using_handle(request.handle, |h| {
1048 h.read_queue.push_back(ReadOp::Socket(tid, request.max_bytes));
1049 Ok(())
1050 }) {
1051 self.push_event(FDomainEvent::SocketData(tid, Err(e)));
1052 }
1053 }
1054
1055 pub fn read_channel(&mut self, tid: NonZeroU32, request: proto::ChannelReadChannelRequest) {
1056 if let Err(e) = self.using_handle(request.handle, |h| {
1057 h.read_queue.push_back(ReadOp::Channel(tid));
1058 Ok(())
1059 }) {
1060 self.push_event(FDomainEvent::ChannelData(tid, Err(e)));
1061 }
1062 }
1063
1064 pub fn write_socket(&mut self, tid: NonZeroU32, request: proto::SocketWriteSocketRequest) {
1065 if let Err(error) = self.using_handle(request.handle, |h| {
1066 h.write_queue.push_back(WriteOp::Socket(SocketWrite {
1067 tid,
1068 wrote: 0,
1069 to_write: request.data,
1070 }));
1071 Ok(())
1072 }) {
1073 self.push_event(FDomainEvent::WroteSocket(
1074 tid,
1075 Err(proto::WriteSocketError { error, wrote: 0 }),
1076 ));
1077 }
1078 }
1079
1080 pub fn write_channel(&mut self, tid: NonZeroU32, request: proto::ChannelWriteChannelRequest) {
1081 let handles: Vec<Result<(ShuttingDownHandle, fidl::Rights)>> = match request.handles {
1089 proto::Handles::Handles(h) => h
1090 .into_iter()
1091 .map(|h| {
1092 if h != request.handle {
1093 self.take_handle(h).map(|handle_state| {
1094 (ShuttingDownHandle::InUse(h, handle_state), fidl::Rights::SAME_RIGHTS)
1095 })
1096 } else {
1097 Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1098 }
1099 })
1100 .collect(),
1101 proto::Handles::Dispositions(d) => d
1102 .into_iter()
1103 .map(|d| {
1104 let res = match d.handle {
1105 proto::HandleOp::Move_(h) => {
1106 if h != request.handle {
1107 self.take_handle(h).map(|x| ShuttingDownHandle::InUse(h, x))
1108 } else {
1109 Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1110 }
1111 }
1112 proto::HandleOp::Duplicate(h) => {
1113 if h != request.handle {
1114 self.using_handle(h, |h| {
1120 h.handle.duplicate(fidl::Rights::SAME_RIGHTS)
1121 })
1122 .map(ShuttingDownHandle::Ready)
1123 } else {
1124 Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1125 }
1126 }
1127 };
1128
1129 res.and_then(|x| Ok((x, d.rights)))
1130 })
1131 .collect(),
1132 };
1133
1134 if handles.iter().any(|x| x.is_err()) {
1135 let e = handles.into_iter().map(|x| x.err().map(Box::new)).collect();
1136
1137 self.push_event(FDomainEvent::WroteChannel(
1138 tid,
1139 Err(proto::WriteChannelError::OpErrors(e)),
1140 ));
1141 return;
1142 }
1143
1144 let handles = handles.into_iter().map(|x| x.unwrap()).collect::<Vec<_>>();
1145
1146 if let Err(e) = self.using_handle(request.handle, |h| {
1147 h.write_queue.push_back(WriteOp::Channel(
1148 tid,
1149 request.data,
1150 HandlesToWrite::SomeInUse(handles),
1151 ));
1152 Ok(())
1153 }) {
1154 self.push_event(FDomainEvent::WroteChannel(
1155 tid,
1156 Err(proto::WriteChannelError::Error(e)),
1157 ));
1158 }
1159 }
1160
1161 pub fn wait_for_signals(
1162 &mut self,
1163 tid: NonZeroU32,
1164 request: proto::FDomainWaitForSignalsRequest,
1165 ) {
1166 let result = self.using_handle(request.handle, |h| {
1167 let signals = fidl::Signals::from_bits_retain(request.signals);
1168 h.signal_waiters.push(SignalWaiter {
1169 tid,
1170 waiter: OnSignals::new(AnyHandleRef(Arc::clone(&h.handle)), signals),
1171 });
1172 Ok(())
1173 });
1174
1175 if let Err(e) = result {
1176 self.push_event(FDomainEvent::WaitForSignals(tid, Err(e)));
1177 } else {
1178 self.waker.take().map(Waker::wake);
1179 }
1180 }
1181
1182 pub fn close(&mut self, tid: NonZeroU32, request: proto::FDomainCloseRequest) {
1183 let mut states = Vec::with_capacity(request.handles.len());
1184 let mut result = Ok(());
1185 for hid in request.handles {
1186 match self.take_handle(hid) {
1187 Ok(state) => states.push((hid, state)),
1188
1189 Err(e) => {
1190 result = result.and(Err(e));
1191 }
1192 }
1193 }
1194
1195 let action = Arc::new(CloseAction::Close {
1196 tid,
1197 count: AtomicU32::new(states.len().try_into().unwrap()),
1198 result,
1199 });
1200
1201 for (hid, state) in states {
1202 self.closing_handles.push(ClosingHandle {
1203 action: Arc::clone(&action),
1204 state: Some(ShuttingDownHandle::InUse(hid, state)),
1205 });
1206 }
1207 }
1208
1209 pub fn duplicate(&mut self, request: proto::FDomainDuplicateRequest) -> Result<()> {
1210 let rights = request.rights;
1211 let handle = self.using_handle(request.handle, |h| h.handle.duplicate(rights));
1212 handle.and_then(|h| self.alloc_client_handles([request.new_handle], [h]))
1213 }
1214
1215 pub fn replace(
1216 &mut self,
1217 tid: NonZeroU32,
1218 request: proto::FDomainReplaceRequest,
1219 ) -> Result<()> {
1220 let rights = request.rights;
1221 let new_hid = request.new_handle;
1222 match self.take_handle(request.handle) {
1223 Ok(state) => self.closing_handles.push(ClosingHandle {
1224 action: Arc::new(CloseAction::Replace { tid, new_hid, rights }),
1225 state: Some(ShuttingDownHandle::InUse(request.handle, state)),
1226 }),
1227 Err(e) => self.event_queue.push_back(UnprocessedFDomainEvent::Ready(
1228 FDomainEvent::ReplacedHandle(tid, Err(e)),
1229 )),
1230 }
1231
1232 Ok(())
1233 }
1234
1235 pub fn signal(&mut self, request: proto::FDomainSignalRequest) -> Result<()> {
1236 let set = fidl::Signals::from_bits_retain(request.set);
1237 let clear = fidl::Signals::from_bits_retain(request.clear);
1238
1239 self.using_handle(request.handle, |h| {
1240 h.handle.signal_handle(clear, set).map_err(|e| proto::Error::TargetError(e.into_raw()))
1241 })
1242 }
1243
1244 pub fn signal_peer(&mut self, request: proto::FDomainSignalPeerRequest) -> Result<()> {
1245 let set = fidl::Signals::from_bits_retain(request.set);
1246 let clear = fidl::Signals::from_bits_retain(request.clear);
1247
1248 self.using_handle(request.handle, |h| h.handle.signal_peer(clear, set))
1249 }
1250
1251 pub fn read_channel_streaming_start(
1252 &mut self,
1253 tid: NonZeroU32,
1254 request: proto::ChannelReadChannelStreamingStartRequest,
1255 ) {
1256 if let Err(err) = self.using_handle(request.handle, |h| {
1257 h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1258 h.read_queue.push_back(ReadOp::StreamingChannel(tid, true));
1259 Ok(())
1260 }) {
1261 self.event_queue
1262 .push_back(FDomainEvent::ChannelStreamingReadStart(tid, Err(err)).into())
1263 }
1264 }
1265
1266 pub fn read_channel_streaming_stop(
1267 &mut self,
1268 tid: NonZeroU32,
1269 request: proto::ChannelReadChannelStreamingStopRequest,
1270 ) {
1271 if let Err(err) = self.using_handle(request.handle, |h| {
1272 h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1273 h.read_queue.push_back(ReadOp::StreamingChannel(tid, false));
1274 Ok(())
1275 }) {
1276 self.event_queue.push_back(FDomainEvent::ChannelStreamingReadStop(tid, Err(err)).into())
1277 }
1278 }
1279
1280 pub fn read_socket_streaming_start(
1281 &mut self,
1282 tid: NonZeroU32,
1283 request: proto::SocketReadSocketStreamingStartRequest,
1284 ) {
1285 if let Err(err) = self.using_handle(request.handle, |h| {
1286 h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1287 h.read_queue.push_back(ReadOp::StreamingSocket(tid, true));
1288 Ok(())
1289 }) {
1290 self.event_queue.push_back(FDomainEvent::SocketStreamingReadStart(tid, Err(err)).into())
1291 }
1292 }
1293
1294 pub fn read_socket_streaming_stop(
1295 &mut self,
1296 tid: NonZeroU32,
1297 request: proto::SocketReadSocketStreamingStopRequest,
1298 ) {
1299 if let Err(err) = self.using_handle(request.handle, |h| {
1300 h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1301 h.read_queue.push_back(ReadOp::StreamingSocket(tid, false));
1302 Ok(())
1303 }) {
1304 self.event_queue.push_back(FDomainEvent::SocketStreamingReadStop(tid, Err(err)).into())
1305 }
1306 }
1307}
1308
1309impl futures::Stream for FDomain {
1312 type Item = FDomainEvent;
1313
1314 fn poll_next(
1315 mut self: std::pin::Pin<&mut Self>,
1316 ctx: &mut Context<'_>,
1317 ) -> Poll<Option<Self::Item>> {
1318 let this = &mut *self;
1319
1320 let mut closing_handles = std::mem::replace(&mut this.closing_handles, Vec::new());
1321 closing_handles.retain_mut(|x| x.poll_ready(this, ctx).is_pending());
1322 this.closing_handles = closing_handles;
1323
1324 let handles = &mut this.handles;
1325 let event_queue = &mut this.event_queue;
1326 for state in handles.values_mut() {
1327 state.poll(event_queue, ctx);
1328 }
1329
1330 if let Some(event) = self.event_queue.pop_front() {
1331 match event {
1332 UnprocessedFDomainEvent::Ready(event) => Poll::Ready(Some(event)),
1333 UnprocessedFDomainEvent::ChannelData(tid, message) => {
1334 Poll::Ready(Some(FDomainEvent::ChannelData(tid, self.process_message(message))))
1335 }
1336 UnprocessedFDomainEvent::ChannelStreamingData(hid, message) => {
1337 match self.process_message(message) {
1338 Ok(message) => Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1339 proto::ChannelOnChannelStreamingDataRequest {
1340 handle: hid,
1341 channel_sent: proto::ChannelSent::Message(message),
1342 },
1343 ))),
1344 Err(e) => {
1345 self.handles.get_mut(&hid).unwrap().async_read_in_progress = false;
1346 Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1347 proto::ChannelOnChannelStreamingDataRequest {
1348 handle: hid,
1349 channel_sent: proto::ChannelSent::Stopped(proto::AioStopped {
1350 error: Some(Box::new(e)),
1351 }),
1352 },
1353 )))
1354 }
1355 }
1356 }
1357 }
1358 } else {
1359 self.waker = Some(ctx.waker().clone());
1360 Poll::Pending
1361 }
1362 }
1363}