Skip to main content

fdomain_container/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::AsHandleRef;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_fdomain as proto;
8use fidl_fuchsia_io as fio;
9use fuchsia_async as fasync;
10use futures::prelude::*;
11use replace_with::replace_with;
12use std::collections::hash_map::Entry;
13use std::collections::{HashMap, VecDeque};
14use std::num::NonZeroU32;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::task::{Context, Poll, Waker};
19
20mod handles;
21pub mod wire;
22
23#[cfg(test)]
24mod test;
25
26pub type Result<T, E = proto::Error> = std::result::Result<T, E>;
27
28use handles::{AnyHandle, HandleType as _, IsDatagramSocket};
29
30/// A queue. Basically just a `VecDeque` except we can asynchronously wait for
31/// an element to pop if it is empty.
32struct Queue<T>(VecDeque<T>, Option<Waker>);
33
34impl<T> Queue<T> {
35    /// Create a new queue.
36    fn new() -> Self {
37        Queue(VecDeque::new(), None)
38    }
39
40    /// Whether the queue is empty.
41    fn is_empty(&self) -> bool {
42        self.0.is_empty()
43    }
44
45    /// Removes and discards the first element in the queue.
46    ///
47    /// # Panics
48    /// There *must* be a first element or this will panic.
49    fn destroy_front(&mut self) {
50        assert!(self.0.pop_front().is_some(), "Expected to find a value!");
51    }
52
53    /// Pop the first element from the queue if available.
54    fn pop_front(&mut self, ctx: &mut Context<'_>) -> Poll<T> {
55        if let Some(t) = self.0.pop_front() {
56            Poll::Ready(t)
57        } else {
58            self.1 = Some(ctx.waker().clone());
59            Poll::Pending
60        }
61    }
62
63    /// Return an element to the front of the queue. Does not wake any waiters
64    /// as it is assumed the waiter is the one who popped it to begin with.
65    ///
66    /// This is used when we'd *like* to use `front_mut` but we can't borrow the
67    /// source of `self` for that long without giving ourselves lifetime
68    /// headaches.
69    fn push_front_no_wake(&mut self, t: T) {
70        self.0.push_front(t)
71    }
72
73    /// Push a new element to the back of the queue.
74    fn push_back(&mut self, t: T) {
75        self.0.push_back(t);
76        self.1.take().map(Waker::wake);
77    }
78
79    /// Get a mutable reference to the first element in the queue.
80    fn front_mut(&mut self, ctx: &mut Context<'_>) -> Poll<&mut T> {
81        if let Some(t) = self.0.front_mut() {
82            Poll::Ready(t)
83        } else {
84            self.1 = Some(ctx.waker().clone());
85            Poll::Pending
86        }
87    }
88}
89
90/// Maximum amount to read for an async socket read.
91const ASYNC_READ_BUFSIZE: u64 = 40960;
92
93/// Wraps the various FIDL Event types that can be produced by an FDomain
94#[derive(Debug)]
95pub enum FDomainEvent {
96    ChannelStreamingReadStart(NonZeroU32, Result<()>),
97    ChannelStreamingReadStop(NonZeroU32, Result<()>),
98    SocketStreamingReadStart(NonZeroU32, Result<()>),
99    SocketStreamingReadStop(NonZeroU32, Result<()>),
100    WaitForSignals(NonZeroU32, Result<proto::FDomainWaitForSignalsResponse>),
101    SocketData(NonZeroU32, Result<proto::SocketData>),
102    SocketStreamingData(proto::SocketOnSocketStreamingDataRequest),
103    SocketDispositionSet(NonZeroU32, Result<()>),
104    WroteSocket(NonZeroU32, Result<proto::SocketWriteSocketResponse, proto::WriteSocketError>),
105    ChannelData(NonZeroU32, Result<proto::ChannelMessage>),
106    ChannelStreamingData(proto::ChannelOnChannelStreamingDataRequest),
107    WroteChannel(NonZeroU32, Result<(), proto::WriteChannelError>),
108    ClosedHandle(NonZeroU32, Result<()>),
109    ReplacedHandle(NonZeroU32, Result<()>),
110}
111
112/// An [`FDomainEvent`] that needs a bit more processing before it can be sent.
113/// I.e. it still contains `fidl::NullableHandle` objects that need to be replaced with
114/// FDomain IDs.
115enum UnprocessedFDomainEvent {
116    Ready(FDomainEvent),
117    ChannelData(NonZeroU32, fidl::MessageBufEtc),
118    ChannelStreamingData(proto::HandleId, fidl::MessageBufEtc),
119}
120
121impl From<FDomainEvent> for UnprocessedFDomainEvent {
122    fn from(other: FDomainEvent) -> UnprocessedFDomainEvent {
123        UnprocessedFDomainEvent::Ready(other)
124    }
125}
126
127/// Operations on a handle which are processed from the read queue.
128enum ReadOp {
129    /// Enable or disable async reads on a channel.
130    StreamingChannel(NonZeroU32, bool),
131    /// Enable or disable async reads on a socket.
132    StreamingSocket(NonZeroU32, bool),
133    Socket(NonZeroU32, u64),
134    Channel(NonZeroU32),
135}
136
137/// An in-progress socket write. It may take multiple syscalls to write to a
138/// socket, so this tracks how many bytes were written already and how many
139/// remain to be written.
140struct SocketWrite {
141    tid: NonZeroU32,
142    wrote: usize,
143    to_write: Vec<u8>,
144}
145
146/// Operations on a handle which are processed from the write queue.
147enum WriteOp {
148    Socket(SocketWrite),
149    Channel(NonZeroU32, Vec<u8>, HandlesToWrite),
150    SetDisposition(NonZeroU32, proto::SocketDisposition, proto::SocketDisposition),
151}
152
153/// A handle which is being moved out of the FDomain by a channel write call or
154/// closure/replacement.  There may still be operations to perform on this
155/// handle, so the write should not proceed while the handle is in the `InUse`
156/// state.
157enum ShuttingDownHandle {
158    InUse(proto::HandleId, HandleState),
159    Ready(AnyHandle),
160}
161
162impl ShuttingDownHandle {
163    fn poll_ready(
164        &mut self,
165        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
166        ctx: &mut Context<'_>,
167    ) -> Poll<()> {
168        replace_with(self, |this| match this {
169            this @ ShuttingDownHandle::Ready(_) => this,
170            ShuttingDownHandle::InUse(hid, mut state) => {
171                state.poll(event_queue, ctx);
172
173                if state.write_queue.is_empty() {
174                    while let Poll::Ready(op) = state.read_queue.pop_front(ctx) {
175                        match op {
176                            ReadOp::StreamingChannel(tid, start) => {
177                                let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
178                                    id: hid.id,
179                                }));
180                                let event = if start {
181                                    FDomainEvent::ChannelStreamingReadStart(tid, err)
182                                } else {
183                                    FDomainEvent::ChannelStreamingReadStop(tid, err)
184                                };
185                                event_queue.push_back(event.into());
186                            }
187                            ReadOp::StreamingSocket(tid, start) => {
188                                let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
189                                    id: hid.id,
190                                }));
191                                let event = if start {
192                                    FDomainEvent::SocketStreamingReadStart(tid, err)
193                                } else {
194                                    FDomainEvent::SocketStreamingReadStop(tid, err)
195                                };
196                                event_queue.push_back(event.into());
197                            }
198                            ReadOp::Channel(tid) => {
199                                let err = state
200                                    .handle
201                                    .expected_type(fidl::ObjectType::CHANNEL)
202                                    .err()
203                                    .unwrap_or(proto::Error::ClosedDuringRead(
204                                        proto::ClosedDuringRead,
205                                    ));
206                                event_queue
207                                    .push_back(FDomainEvent::ChannelData(tid, Err(err)).into());
208                            }
209                            ReadOp::Socket(tid, _max_bytes) => {
210                                let err = state
211                                    .handle
212                                    .expected_type(fidl::ObjectType::SOCKET)
213                                    .err()
214                                    .unwrap_or(proto::Error::ClosedDuringRead(
215                                        proto::ClosedDuringRead,
216                                    ));
217                                event_queue
218                                    .push_back(FDomainEvent::SocketData(tid, Err(err)).into());
219                            }
220                        }
221                    }
222
223                    if state.async_read_in_progress {
224                        match &*state.handle {
225                            AnyHandle::Channel(_) => event_queue.push_back(
226                                FDomainEvent::ChannelStreamingData(
227                                    proto::ChannelOnChannelStreamingDataRequest {
228                                        handle: hid,
229                                        channel_sent: proto::ChannelSent::Stopped(
230                                            proto::AioStopped { error: None },
231                                        ),
232                                    },
233                                )
234                                .into(),
235                            ),
236                            AnyHandle::Socket(_) => event_queue.push_back(
237                                FDomainEvent::SocketStreamingData(
238                                    proto::SocketOnSocketStreamingDataRequest {
239                                        handle: hid,
240                                        socket_message: proto::SocketMessage::Stopped(
241                                            proto::AioStopped { error: None },
242                                        ),
243                                    },
244                                )
245                                .into(),
246                            ),
247                            AnyHandle::EventPair(_)
248                            | AnyHandle::Event(_)
249                            | AnyHandle::Unknown(_) => unreachable!(),
250                        }
251                    }
252
253                    state.signal_waiters.clear();
254                    state.io_waiter = None;
255
256                    ShuttingDownHandle::Ready(
257                        Arc::into_inner(state.handle).expect("Unaccounted-for handle reference!"),
258                    )
259                } else {
260                    ShuttingDownHandle::InUse(hid, state)
261                }
262            }
263        });
264
265        if matches!(self, ShuttingDownHandle::Ready(_)) { Poll::Ready(()) } else { Poll::Pending }
266    }
267}
268
269/// A vector of [`ShuttingDownHandle`] paired with rights for the new handles, which
270/// can transition into being a vector of [`fidl::HandleDisposition`] when all the
271/// handles are ready.
272enum HandlesToWrite {
273    SomeInUse(Vec<(ShuttingDownHandle, fidl::Rights)>),
274    AllReady(Vec<fidl::HandleDisposition<'static>>),
275}
276
277impl HandlesToWrite {
278    fn poll_ready(
279        &mut self,
280        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
281        ctx: &mut Context<'_>,
282    ) -> Poll<&mut Vec<fidl::HandleDisposition<'static>>> {
283        match self {
284            HandlesToWrite::AllReady(s) => Poll::Ready(s),
285            HandlesToWrite::SomeInUse(handles) => {
286                let mut ready = true;
287                for (handle, _) in handles.iter_mut() {
288                    ready = ready && handle.poll_ready(event_queue, ctx).is_ready();
289                }
290
291                if !ready {
292                    return Poll::Pending;
293                }
294
295                *self = HandlesToWrite::AllReady(
296                    handles
297                        .drain(..)
298                        .map(|(handle, rights)| {
299                            let ShuttingDownHandle::Ready(handle) = handle else { unreachable!() };
300
301                            fidl::HandleDisposition::new(
302                                fidl::HandleOp::Move(handle.into()),
303                                fidl::ObjectType::NONE,
304                                rights,
305                                fidl::Status::OK,
306                            )
307                        })
308                        .collect(),
309                );
310
311                let HandlesToWrite::AllReady(s) = self else { unreachable!() };
312                Poll::Ready(s)
313            }
314        }
315    }
316}
317
318struct AnyHandleRef(Arc<AnyHandle>);
319
320impl AsHandleRef for AnyHandleRef {
321    fn as_handle_ref(&self) -> fidl::HandleRef<'_> {
322        self.0.as_handle_ref()
323    }
324}
325
326#[cfg(target_os = "fuchsia")]
327type OnSignals = fasync::OnSignals<'static, AnyHandleRef>;
328
329#[cfg(not(target_os = "fuchsia"))]
330type OnSignals = fasync::OnSignalsRef<'static>;
331
332/// Represents a `WaitForSignals` transaction from a client. When the contained
333/// `OnSignals` polls to completion we can reply to the transaction.
334struct SignalWaiter {
335    tid: NonZeroU32,
336    waiter: Pin<Box<OnSignals>>,
337}
338
339/// Information about a single handle within the [`FDomain`].
340struct HandleState {
341    /// The handle itself.
342    handle: Arc<AnyHandle>,
343    /// Our handle ID.
344    hid: proto::HandleId,
345    /// Whether this is a datagram socket. We have to handle data coming out of
346    /// datagram sockets a bit differently to preserve their semantics from the
347    /// perspective of the host and avoid data loss.
348    is_datagram_socket: bool,
349    /// Indicates we are sending `On*StreamingData` events to the client
350    /// presently. It is an error for the user to try to move the handle out of
351    /// the FDomain (e.g. send it through a channel or close it) until after
352    /// they request that streaming events stop.
353    async_read_in_progress: bool,
354    /// Queue of client requests to read from the handle. We have to queue read
355    /// requests because they may block, and we don't want to block the event
356    /// loop or be unable to handle further requests while a long read request
357    /// is blocking. Also we want to retire read requests in the order they were
358    /// submitted, otherwise pipelined reads could return data in a strange order.
359    read_queue: Queue<ReadOp>,
360    /// Queue of client requests to write to the handle. We have to queue write
361    /// requests for the same reason we have to queue read requests. Since we
362    /// process the queue one at a time, we need a separate queue for writes
363    /// otherwise we'd effectively make handles half-duplex, with read requests
364    /// unable to proceed if a write request is blocked at the head of the
365    /// queue.
366    write_queue: Queue<WriteOp>,
367    /// List of outstanding `WaitForSignals` transactions.
368    signal_waiters: Vec<SignalWaiter>,
369    /// Contains a waiter on this handle for IO reading and writing. Populated
370    /// whenever we need to block on IO to service a request.
371    io_waiter: Option<Pin<Box<OnSignals>>>,
372}
373
374impl HandleState {
375    fn new(handle: AnyHandle, hid: proto::HandleId) -> Result<Self, proto::Error> {
376        let is_datagram_socket = match handle.is_datagram_socket() {
377            IsDatagramSocket::Unknown => {
378                return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown {
379                    type_: proto::SocketType::unknown(),
380                }));
381            }
382            other => other.is_datagram(),
383        };
384        Ok(HandleState {
385            handle: Arc::new(handle),
386            hid,
387            async_read_in_progress: false,
388            is_datagram_socket,
389            read_queue: Queue::new(),
390            write_queue: Queue::new(),
391            signal_waiters: Vec::new(),
392            io_waiter: None,
393        })
394    }
395
396    /// Poll this handle state. Lets us handle our IO queues and wait for the
397    /// next IO event.
398    fn poll(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>, ctx: &mut Context<'_>) {
399        self.signal_waiters.retain_mut(|x| {
400            let Poll::Ready(result) = x.waiter.poll_unpin(ctx) else {
401                return true;
402            };
403
404            event_queue.push_back(
405                FDomainEvent::WaitForSignals(
406                    x.tid,
407                    result
408                        .map(|x| proto::FDomainWaitForSignalsResponse { signals: x.bits() })
409                        .map_err(|e| proto::Error::TargetError(e.into_raw())),
410                )
411                .into(),
412            );
413
414            false
415        });
416
417        let read_signals = self.handle.read_signals();
418        let write_signals = self.handle.write_signals();
419
420        loop {
421            if let Some(signal_waiter) = self.io_waiter.as_mut() {
422                if let Poll::Ready(sigs) = signal_waiter.poll_unpin(ctx) {
423                    if let Ok(sigs) = sigs {
424                        if sigs.intersects(read_signals) {
425                            self.process_read_queue(event_queue, ctx);
426                        }
427                        if sigs.intersects(write_signals) {
428                            self.process_write_queue(event_queue, ctx);
429                        }
430                    }
431                } else {
432                    let need_read = matches!(
433                        self.read_queue.front_mut(ctx),
434                        Poll::Ready(ReadOp::StreamingChannel(_, _) | ReadOp::StreamingSocket(_, _))
435                    );
436                    let need_write = matches!(
437                        self.write_queue.front_mut(ctx),
438                        Poll::Ready(WriteOp::SetDisposition(_, _, _))
439                    );
440
441                    self.process_read_queue(event_queue, ctx);
442                    self.process_write_queue(event_queue, ctx);
443
444                    if !(need_read || need_write) {
445                        break;
446                    }
447                }
448            }
449
450            let subscribed_signals =
451                if self.async_read_in_progress || !self.read_queue.is_empty() {
452                    read_signals
453                } else {
454                    fidl::Signals::NONE
455                } | if !self.write_queue.is_empty() { write_signals } else { fidl::Signals::NONE };
456
457            if !subscribed_signals.is_empty() {
458                self.io_waiter = Some(Box::pin(OnSignals::new(
459                    AnyHandleRef(Arc::clone(&self.handle)),
460                    subscribed_signals,
461                )));
462            } else {
463                self.io_waiter = None;
464                break;
465            }
466        }
467    }
468
469    /// Set `async_read_in_progress` to `true`. Return an error if it was already `true`.
470    fn try_enable_async_read(&mut self) -> Result<()> {
471        if self.async_read_in_progress {
472            Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress))
473        } else {
474            self.async_read_in_progress = true;
475            Ok(())
476        }
477    }
478
479    /// Set `async_read_in_progress` to `false`. Return an error if it was already `false`.
480    fn try_disable_async_read(&mut self) -> Result<()> {
481        if !self.async_read_in_progress {
482            Err(proto::Error::NoReadInProgress(proto::NoReadInProgress))
483        } else {
484            self.async_read_in_progress = false;
485            Ok(())
486        }
487    }
488
489    /// Handle events from the front of the read queue.
490    fn process_read_queue(
491        &mut self,
492        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
493        ctx: &mut Context<'_>,
494    ) {
495        while let Poll::Ready(op) = self.read_queue.front_mut(ctx) {
496            match op {
497                ReadOp::StreamingChannel(tid, true) => {
498                    let tid = *tid;
499                    let result = self.try_enable_async_read();
500                    event_queue
501                        .push_back(FDomainEvent::ChannelStreamingReadStart(tid, result).into());
502                    self.read_queue.destroy_front();
503                }
504                ReadOp::StreamingChannel(tid, false) => {
505                    let tid = *tid;
506                    let result = self.try_disable_async_read();
507                    event_queue
508                        .push_back(FDomainEvent::ChannelStreamingReadStop(tid, result).into());
509                    self.read_queue.destroy_front();
510                }
511                ReadOp::StreamingSocket(tid, true) => {
512                    let tid = *tid;
513                    let result = self.try_enable_async_read();
514                    event_queue
515                        .push_back(FDomainEvent::SocketStreamingReadStart(tid, result).into());
516                    self.read_queue.destroy_front();
517                }
518                ReadOp::StreamingSocket(tid, false) => {
519                    let tid = *tid;
520                    let result = self.try_disable_async_read();
521                    event_queue
522                        .push_back(FDomainEvent::SocketStreamingReadStop(tid, result).into());
523                    self.read_queue.destroy_front();
524                }
525                ReadOp::Socket(tid, max_bytes) => {
526                    let (tid, max_bytes) = (*tid, *max_bytes);
527                    if let Some(event) = self.do_read_socket(tid, max_bytes) {
528                        let _ = self.read_queue.pop_front(ctx);
529                        event_queue.push_back(event.into());
530                    } else {
531                        break;
532                    }
533                }
534                ReadOp::Channel(tid) => {
535                    let tid = *tid;
536                    if let Some(event) = self.do_read_channel(tid) {
537                        let _ = self.read_queue.pop_front(ctx);
538                        event_queue.push_back(event.into());
539                    } else {
540                        break;
541                    }
542                }
543            }
544        }
545
546        if self.async_read_in_progress {
547            // We should have error'd out of any blocking operations if we had a
548            // read in progress.
549            assert!(self.read_queue.is_empty());
550            self.process_async_read(event_queue);
551        }
552    }
553
554    fn process_async_read(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>) {
555        assert!(self.async_read_in_progress);
556
557        match &*self.handle {
558            AnyHandle::Channel(_) => {
559                'read_loop: while let Some(result) = self.handle.read_channel().transpose() {
560                    match result {
561                        Ok(msg) => event_queue.push_back(
562                            UnprocessedFDomainEvent::ChannelStreamingData(self.hid, msg),
563                        ),
564                        Err(e) => {
565                            event_queue.push_back(
566                                FDomainEvent::ChannelStreamingData(
567                                    proto::ChannelOnChannelStreamingDataRequest {
568                                        handle: self.hid,
569                                        channel_sent: proto::ChannelSent::Stopped(
570                                            proto::AioStopped { error: Some(Box::new(e)) },
571                                        ),
572                                    },
573                                )
574                                .into(),
575                            );
576                            self.async_read_in_progress = false;
577                            break 'read_loop;
578                        }
579                    }
580                }
581            }
582
583            AnyHandle::Socket(_) => {
584                'read_loop: while let Some(result) =
585                    self.handle.read_socket(ASYNC_READ_BUFSIZE).transpose()
586                {
587                    match result {
588                        Ok(data) => {
589                            event_queue.push_back(
590                                FDomainEvent::SocketStreamingData(
591                                    proto::SocketOnSocketStreamingDataRequest {
592                                        handle: self.hid,
593                                        socket_message: proto::SocketMessage::Data(
594                                            proto::SocketData {
595                                                data,
596                                                is_datagram: self.is_datagram_socket,
597                                            },
598                                        ),
599                                    },
600                                )
601                                .into(),
602                            );
603                        }
604                        Err(e) => {
605                            event_queue.push_back(
606                                FDomainEvent::SocketStreamingData(
607                                    proto::SocketOnSocketStreamingDataRequest {
608                                        handle: self.hid,
609                                        socket_message: proto::SocketMessage::Stopped(
610                                            proto::AioStopped { error: Some(Box::new(e)) },
611                                        ),
612                                    },
613                                )
614                                .into(),
615                            );
616                            self.async_read_in_progress = false;
617                            break 'read_loop;
618                        }
619                    }
620                }
621            }
622
623            _ => unreachable!("Processed async read for unreadable handle type!"),
624        }
625    }
626
627    /// Handle events from the front of the write queue.
628    fn process_write_queue(
629        &mut self,
630        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
631        ctx: &mut Context<'_>,
632    ) {
633        // We want to mutate and *maybe* pop the front of the write queue, but
634        // lifetime shenanigans mean we can't do that and also access `self`,
635        // which we need. So we pop the item always, and then maybe push it to
636        // the front again if we didn't actually want to pop it.
637        while let Poll::Ready(op) = self.write_queue.pop_front(ctx) {
638            match op {
639                WriteOp::Socket(mut op) => {
640                    if let Some(event) = self.do_write_socket(&mut op) {
641                        event_queue.push_back(event.into());
642                    } else {
643                        self.write_queue.push_front_no_wake(WriteOp::Socket(op));
644                        break;
645                    }
646                }
647                WriteOp::SetDisposition(tid, disposition, disposition_peer) => {
648                    let result = { self.handle.socket_disposition(disposition, disposition_peer) };
649                    event_queue.push_back(FDomainEvent::SocketDispositionSet(tid, result).into())
650                }
651                WriteOp::Channel(tid, data, mut handles) => {
652                    if self
653                        .do_write_channel(tid, &data, &mut handles, event_queue, ctx)
654                        .is_pending()
655                    {
656                        self.write_queue.push_front_no_wake(WriteOp::Channel(tid, data, handles));
657                        break;
658                    }
659                }
660            }
661        }
662    }
663
664    /// Attempt to read from the handle in this [`HandleState`] as if it were a
665    /// socket. If the read succeeds or produces an error that should not be
666    /// retried, produce an [`FDomainEvent`] containing the result.
667    fn do_read_socket(&mut self, tid: NonZeroU32, max_bytes: u64) -> Option<FDomainEvent> {
668        if self.async_read_in_progress {
669            return Some(
670                FDomainEvent::SocketData(
671                    tid,
672                    Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
673                )
674                .into(),
675            );
676        }
677
678        let max_bytes = if self.is_datagram_socket {
679            let AnyHandle::Socket(s) = &*self.handle else {
680                unreachable!("Read socket from state that wasn't for a socket!");
681            };
682            match s.info() {
683                Ok(x) => x.rx_buf_available as u64,
684                // We should always succeed. The only failures are if we don't
685                // have the rights or something's screwed up with the handle. We
686                // know we have the rights because figuring out this was a
687                // datagram socket to begin with meant calling the same call on
688                // the same handle earlier.
689                Err(e) => {
690                    return Some(FDomainEvent::SocketData(
691                        tid,
692                        Err(proto::Error::TargetError(e.into_raw())),
693                    ));
694                }
695            }
696        } else {
697            max_bytes
698        };
699        self.handle.read_socket(max_bytes).transpose().map(|x| {
700            FDomainEvent::SocketData(
701                tid,
702                x.map(|data| proto::SocketData { data, is_datagram: self.is_datagram_socket }),
703            )
704        })
705    }
706
707    /// Attempt to write to the handle in this [`HandleState`] as if it were a
708    /// socket. If the write succeeds or produces an error that should not be
709    /// retried, produce an [`FDomainEvent`] containing the result.
710    fn do_write_socket(&mut self, op: &mut SocketWrite) -> Option<FDomainEvent> {
711        match self.handle.write_socket(&op.to_write) {
712            Ok(wrote) => {
713                op.wrote += wrote;
714                op.to_write.drain(..wrote);
715
716                if op.to_write.is_empty() {
717                    Some(FDomainEvent::WroteSocket(
718                        op.tid,
719                        Ok(proto::SocketWriteSocketResponse {
720                            wrote: op.wrote.try_into().unwrap(),
721                        }),
722                    ))
723                } else {
724                    None
725                }
726            }
727            Err(error) => Some(FDomainEvent::WroteSocket(
728                op.tid,
729                Err(proto::WriteSocketError { error, wrote: op.wrote.try_into().unwrap() }),
730            )),
731        }
732    }
733
734    /// Attempt to write to the handle in this [`HandleState`] as if it were a
735    /// channel. If the write succeeds or produces an error that should not be
736    /// retried, produce an [`FDomainEvent`] containing the result.
737    fn do_write_channel(
738        &mut self,
739        tid: NonZeroU32,
740        data: &[u8],
741        handles: &mut HandlesToWrite,
742        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
743        ctx: &mut Context<'_>,
744    ) -> Poll<()> {
745        let Poll::Ready(handles) = handles.poll_ready(event_queue, ctx) else {
746            return Poll::Pending;
747        };
748
749        let ret = self.handle.write_channel(data, handles);
750        if let Some(ret) = ret {
751            event_queue.push_back(FDomainEvent::WroteChannel(tid, ret).into())
752        }
753        Poll::Ready(())
754    }
755
756    /// Attempt to read from the handle in this [`HandleState`] as if it were a
757    /// channel. If the read succeeds or produces an error that should not be
758    /// retried, produce an [`FDomainEvent`] containing the result.
759    fn do_read_channel(&mut self, tid: NonZeroU32) -> Option<UnprocessedFDomainEvent> {
760        if self.async_read_in_progress {
761            return Some(
762                FDomainEvent::ChannelData(
763                    tid,
764                    Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
765                )
766                .into(),
767            );
768        }
769        match self.handle.read_channel() {
770            Ok(x) => x.map(|x| UnprocessedFDomainEvent::ChannelData(tid, x)),
771            Err(e) => Some(FDomainEvent::ChannelData(tid, Err(e)).into()),
772        }
773    }
774}
775
776/// State for a handle which is closing, but which needs its read and write
777/// queues flushed first.
778struct ClosingHandle {
779    action: Arc<CloseAction>,
780    state: Option<ShuttingDownHandle>,
781}
782
783impl ClosingHandle {
784    fn poll_ready(&mut self, fdomain: &mut FDomain, ctx: &mut Context<'_>) -> Poll<()> {
785        if let Some(state) = self.state.as_mut() {
786            if state.poll_ready(&mut fdomain.event_queue, ctx).is_ready() {
787                let state = self.state.take().unwrap();
788                let ShuttingDownHandle::Ready(handle) = state else {
789                    unreachable!();
790                };
791                self.action.perform(fdomain, handle);
792                Poll::Ready(())
793            } else {
794                Poll::Pending
795            }
796        } else {
797            Poll::Ready(())
798        }
799    }
800}
801
802/// When the client requests a handle to be closed or moved or otherwise
803/// destroyed, it goes into limbo for a bit while pending read and write actions
804/// are flushed. This is how we mark what should happen to the handle after that
805/// period ends.
806enum CloseAction {
807    Close { tid: NonZeroU32, count: AtomicU32, result: Result<()> },
808    Replace { tid: NonZeroU32, new_hid: proto::NewHandleId, rights: fidl::Rights },
809}
810
811impl CloseAction {
812    fn perform(&self, fdomain: &mut FDomain, handle: AnyHandle) {
813        match self {
814            CloseAction::Close { tid, count, result } => {
815                if count.fetch_sub(1, Ordering::Relaxed) == 1 {
816                    fdomain
817                        .event_queue
818                        .push_back(FDomainEvent::ClosedHandle(*tid, result.clone()).into());
819                }
820            }
821            CloseAction::Replace { tid, new_hid, rights } => {
822                let result = handle
823                    .replace(*rights)
824                    .and_then(|handle| fdomain.alloc_client_handles([*new_hid], [handle]));
825                fdomain.event_queue.push_back(FDomainEvent::ReplacedHandle(*tid, result).into());
826            }
827        }
828    }
829}
830
831/// This is a container of handles that is manipulable via the FDomain protocol.
832/// See [RFC-0228].
833///
834/// Most of the methods simply handle FIDL requests from the FDomain protocol.
835#[pin_project::pin_project]
836pub struct FDomain {
837    namespace: Box<dyn Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send>,
838    handles: HashMap<proto::HandleId, HandleState>,
839    closing_handles: Vec<ClosingHandle>,
840    event_queue: VecDeque<UnprocessedFDomainEvent>,
841    waker: Option<Waker>,
842}
843
844impl FDomain {
845    /// Create a new FDomain. The new FDomain is empty and ready to be connected
846    /// to by a client.
847    pub fn new_empty() -> Self {
848        Self::new(|| Err(fidl::Status::NOT_FOUND))
849    }
850
851    /// Create a new FDomain populated with the given namespace entries.
852    pub fn new(
853        namespace: impl Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send + 'static,
854    ) -> Self {
855        FDomain {
856            namespace: Box::new(namespace),
857            handles: HashMap::new(),
858            closing_handles: Vec::new(),
859            event_queue: VecDeque::new(),
860            waker: None,
861        }
862    }
863
864    /// Add an event to be emitted by this FDomain.
865    fn push_event(&mut self, event: impl Into<UnprocessedFDomainEvent>) {
866        self.event_queue.push_back(event.into());
867        self.waker.take().map(Waker::wake);
868    }
869
870    /// Given a [`fidl::MessageBufEtc`], load all of the handles from it into this
871    /// FDomain and return a [`ReadChannelPayload`](proto::ReadChannelPayload)
872    /// with the same data and the IDs for the handles.
873    fn process_message(
874        &mut self,
875        message: fidl::MessageBufEtc,
876    ) -> Result<proto::ChannelMessage, proto::Error> {
877        let (data, handles) = message.split();
878        let handles = handles
879            .into_iter()
880            .map(|info| {
881                let type_ = info.object_type;
882
883                let handle = match info.object_type {
884                    fidl::ObjectType::CHANNEL => {
885                        AnyHandle::Channel(fidl::Channel::from(info.handle))
886                    }
887                    fidl::ObjectType::SOCKET => AnyHandle::Socket(fidl::Socket::from(info.handle)),
888                    fidl::ObjectType::EVENTPAIR => {
889                        AnyHandle::EventPair(fidl::EventPair::from(info.handle))
890                    }
891                    fidl::ObjectType::EVENT => AnyHandle::Event(fidl::Event::from(info.handle)),
892                    _ => AnyHandle::Unknown(handles::Unknown(info.handle, info.object_type)),
893                };
894
895                Ok(proto::HandleInfo {
896                    rights: info.rights,
897                    handle: self.alloc_fdomain_handle(handle)?,
898                    type_,
899                })
900            })
901            .collect::<Result<Vec<_>, proto::Error>>()?;
902
903        Ok(proto::ChannelMessage { data, handles })
904    }
905
906    /// Allocate `N` new handle IDs. These are allocated from
907    /// [`NewHandleId`](proto::NewHandleId) and are expected to follow the protocol
908    /// rules for client-allocated handle IDs.
909    ///
910    /// If any of the handles passed fail to allocate, none of the handles will
911    /// be allocated.
912    fn alloc_client_handles<const N: usize>(
913        &mut self,
914        ids: [proto::NewHandleId; N],
915        handles: [AnyHandle; N],
916    ) -> Result<(), proto::Error> {
917        for id in ids {
918            if id.id & (1 << 31) != 0 {
919                return Err(proto::Error::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange {
920                    id: id.id,
921                }));
922            }
923
924            if self.handles.contains_key(&proto::HandleId { id: id.id }) {
925                return Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
926                    id: id.id,
927                    same_call: false,
928                }));
929            }
930        }
931
932        let mut sorted_ids = ids;
933        sorted_ids.sort();
934
935        if let Some([a, _]) = sorted_ids.array_windows().find(|&[a, b]| a == b) {
936            Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
937                id: a.id,
938                same_call: true,
939            }))
940        } else {
941            let ids = ids.into_iter().map(|id| proto::HandleId { id: id.id });
942            let handles = ids
943                .zip(handles.into_iter())
944                .map(|(id, h)| HandleState::new(h, id).map(|x| (id, x)))
945                .collect::<Result<Vec<_>, proto::Error>>()?;
946
947            self.handles.extend(handles);
948
949            Ok(())
950        }
951    }
952
953    /// Allocate a new handle ID. These are allocated internally and are
954    /// expected to follow the protocol rules for FDomain-allocated handle IDs.
955    fn alloc_fdomain_handle(&mut self, handle: AnyHandle) -> Result<proto::HandleId, proto::Error> {
956        loop {
957            let id = proto::HandleId { id: rand::random::<u32>() | (1u32 << 31) };
958            if let Entry::Vacant(v) = self.handles.entry(id) {
959                v.insert(HandleState::new(handle, id)?);
960                break Ok(id);
961            }
962        }
963    }
964
965    /// If a handle exists in this FDomain, remove it.
966    fn take_handle(&mut self, handle: proto::HandleId) -> Result<HandleState, proto::Error> {
967        self.handles
968            .remove(&handle)
969            .ok_or(proto::Error::BadHandleId(proto::BadHandleId { id: handle.id }))
970    }
971
972    /// Use a handle in our handle table, if it exists.
973    fn using_handle<T>(
974        &mut self,
975        id: proto::HandleId,
976        f: impl FnOnce(&mut HandleState) -> Result<T, proto::Error>,
977    ) -> Result<T, proto::Error> {
978        if let Some(s) = self.handles.get_mut(&id) {
979            f(s)
980        } else {
981            Err(proto::Error::BadHandleId(proto::BadHandleId { id: id.id }))
982        }
983    }
984
985    pub fn get_namespace(&mut self, request: proto::FDomainGetNamespaceRequest) -> Result<()> {
986        match (self.namespace)() {
987            Ok(endpoint) => self.alloc_client_handles(
988                [request.new_handle],
989                [AnyHandle::Channel(endpoint.into_channel())],
990            ),
991            Err(e) => Err(proto::Error::TargetError(e.into_raw())),
992        }
993    }
994
995    pub fn create_channel(&mut self, request: proto::ChannelCreateChannelRequest) -> Result<()> {
996        let (a, b) = fidl::Channel::create();
997        self.alloc_client_handles(request.handles, [AnyHandle::Channel(a), AnyHandle::Channel(b)])
998    }
999
1000    pub fn create_socket(&mut self, request: proto::SocketCreateSocketRequest) -> Result<()> {
1001        let (a, b) = match request.options {
1002            proto::SocketType::Stream => fidl::Socket::create_stream(),
1003            proto::SocketType::Datagram => fidl::Socket::create_datagram(),
1004            type_ => {
1005                return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown { type_ }));
1006            }
1007        };
1008
1009        self.alloc_client_handles(request.handles, [AnyHandle::Socket(a), AnyHandle::Socket(b)])
1010    }
1011
1012    pub fn create_event_pair(
1013        &mut self,
1014        request: proto::EventPairCreateEventPairRequest,
1015    ) -> Result<()> {
1016        let (a, b) = fidl::EventPair::create();
1017        self.alloc_client_handles(
1018            request.handles,
1019            [AnyHandle::EventPair(a), AnyHandle::EventPair(b)],
1020        )
1021    }
1022
1023    pub fn create_event(&mut self, request: proto::EventCreateEventRequest) -> Result<()> {
1024        let a = fidl::Event::create();
1025        self.alloc_client_handles([request.handle], [AnyHandle::Event(a)])
1026    }
1027
1028    pub fn set_socket_disposition(
1029        &mut self,
1030        tid: NonZeroU32,
1031        request: proto::SocketSetSocketDispositionRequest,
1032    ) {
1033        if let Err(err) = self.using_handle(request.handle, |h| {
1034            h.write_queue.push_back(WriteOp::SetDisposition(
1035                tid,
1036                request.disposition,
1037                request.disposition_peer,
1038            ));
1039            Ok(())
1040        }) {
1041            self.push_event(FDomainEvent::SocketDispositionSet(tid, Err(err)));
1042        }
1043    }
1044
1045    pub fn read_socket(&mut self, tid: NonZeroU32, request: proto::SocketReadSocketRequest) {
1046        if let Err(e) = self.using_handle(request.handle, |h| {
1047            h.read_queue.push_back(ReadOp::Socket(tid, request.max_bytes));
1048            Ok(())
1049        }) {
1050            self.push_event(FDomainEvent::SocketData(tid, Err(e)));
1051        }
1052    }
1053
1054    pub fn read_channel(&mut self, tid: NonZeroU32, request: proto::ChannelReadChannelRequest) {
1055        if let Err(e) = self.using_handle(request.handle, |h| {
1056            h.read_queue.push_back(ReadOp::Channel(tid));
1057            Ok(())
1058        }) {
1059            self.push_event(FDomainEvent::ChannelData(tid, Err(e)));
1060        }
1061    }
1062
1063    pub fn write_socket(&mut self, tid: NonZeroU32, request: proto::SocketWriteSocketRequest) {
1064        if let Err(error) = self.using_handle(request.handle, |h| {
1065            h.write_queue.push_back(WriteOp::Socket(SocketWrite {
1066                tid,
1067                wrote: 0,
1068                to_write: request.data,
1069            }));
1070            Ok(())
1071        }) {
1072            self.push_event(FDomainEvent::WroteSocket(
1073                tid,
1074                Err(proto::WriteSocketError { error, wrote: 0 }),
1075            ));
1076        }
1077    }
1078
1079    pub fn write_channel(&mut self, tid: NonZeroU32, request: proto::ChannelWriteChannelRequest) {
1080        // Go through the list of handles in the requests (which will either be
1081        // a simple list of handles or a list of HandleDispositions) and obtain
1082        // for each a `ShuttingDownHandle` which contains our handle state (the
1083        // "Shutting down" refers to the fact that we're pulling the handle out
1084        // of the FDomain in order to send it) and the rights the requester
1085        // would like the handle to have upon arrival at the other end of the
1086        // channel.
1087        let handles: Vec<Result<(ShuttingDownHandle, fidl::Rights)>> = match request.handles {
1088            proto::Handles::Handles(h) => h
1089                .into_iter()
1090                .map(|h| {
1091                    if h != request.handle {
1092                        self.take_handle(h).map(|handle_state| {
1093                            (ShuttingDownHandle::InUse(h, handle_state), fidl::Rights::SAME_RIGHTS)
1094                        })
1095                    } else {
1096                        Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1097                    }
1098                })
1099                .collect(),
1100            proto::Handles::Dispositions(d) => d
1101                .into_iter()
1102                .map(|d| {
1103                    let res = match d.handle {
1104                        proto::HandleOp::Move_(h) => {
1105                            if h != request.handle {
1106                                self.take_handle(h).map(|x| ShuttingDownHandle::InUse(h, x))
1107                            } else {
1108                                Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1109                            }
1110                        }
1111                        proto::HandleOp::Duplicate(h) => {
1112                            if h != request.handle {
1113                                // If the requester wants us to duplicate the
1114                                // handle, we do so now rather than letting
1115                                // `write_etc` do it. Otherwise we have to use a
1116                                // reference to the handle, and we get lifetime
1117                                // hell.
1118                                self.using_handle(h, |h| {
1119                                    h.handle.duplicate(fidl::Rights::SAME_RIGHTS)
1120                                })
1121                                .map(ShuttingDownHandle::Ready)
1122                            } else {
1123                                Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1124                            }
1125                        }
1126                    };
1127
1128                    res.and_then(|x| Ok((x, d.rights)))
1129                })
1130                .collect(),
1131        };
1132
1133        if handles.iter().any(|x| x.is_err()) {
1134            let e = handles.into_iter().map(|x| x.err().map(Box::new)).collect();
1135
1136            self.push_event(FDomainEvent::WroteChannel(
1137                tid,
1138                Err(proto::WriteChannelError::OpErrors(e)),
1139            ));
1140            return;
1141        }
1142
1143        let handles = handles.into_iter().map(|x| x.unwrap()).collect::<Vec<_>>();
1144
1145        if let Err(e) = self.using_handle(request.handle, |h| {
1146            h.write_queue.push_back(WriteOp::Channel(
1147                tid,
1148                request.data,
1149                HandlesToWrite::SomeInUse(handles),
1150            ));
1151            Ok(())
1152        }) {
1153            self.push_event(FDomainEvent::WroteChannel(
1154                tid,
1155                Err(proto::WriteChannelError::Error(e)),
1156            ));
1157        }
1158    }
1159
1160    pub fn wait_for_signals(
1161        &mut self,
1162        tid: NonZeroU32,
1163        request: proto::FDomainWaitForSignalsRequest,
1164    ) {
1165        let result = self.using_handle(request.handle, |h| {
1166            let signals = fidl::Signals::from_bits_retain(request.signals);
1167            h.signal_waiters.push(SignalWaiter {
1168                tid,
1169                waiter: Box::pin(OnSignals::new(AnyHandleRef(Arc::clone(&h.handle)), signals)),
1170            });
1171            Ok(())
1172        });
1173
1174        if let Err(e) = result {
1175            self.push_event(FDomainEvent::WaitForSignals(tid, Err(e)));
1176        } else {
1177            self.waker.take().map(Waker::wake);
1178        }
1179    }
1180
1181    pub fn close(&mut self, tid: NonZeroU32, request: proto::FDomainCloseRequest) {
1182        let mut states = Vec::with_capacity(request.handles.len());
1183        let mut result = Ok(());
1184        for hid in request.handles {
1185            match self.take_handle(hid) {
1186                Ok(state) => states.push((hid, state)),
1187
1188                Err(e) => {
1189                    result = result.and(Err(e));
1190                }
1191            }
1192        }
1193
1194        let action = Arc::new(CloseAction::Close {
1195            tid,
1196            count: AtomicU32::new(states.len().try_into().unwrap()),
1197            result,
1198        });
1199
1200        for (hid, state) in states {
1201            self.closing_handles.push(ClosingHandle {
1202                action: Arc::clone(&action),
1203                state: Some(ShuttingDownHandle::InUse(hid, state)),
1204            });
1205        }
1206    }
1207
1208    pub fn duplicate(&mut self, request: proto::FDomainDuplicateRequest) -> Result<()> {
1209        let rights = request.rights;
1210        let handle = self.using_handle(request.handle, |h| h.handle.duplicate(rights));
1211        handle.and_then(|h| self.alloc_client_handles([request.new_handle], [h]))
1212    }
1213
1214    pub fn replace(
1215        &mut self,
1216        tid: NonZeroU32,
1217        request: proto::FDomainReplaceRequest,
1218    ) -> Result<()> {
1219        let rights = request.rights;
1220        let new_hid = request.new_handle;
1221        match self.take_handle(request.handle) {
1222            Ok(state) => self.closing_handles.push(ClosingHandle {
1223                action: Arc::new(CloseAction::Replace { tid, new_hid, rights }),
1224                state: Some(ShuttingDownHandle::InUse(request.handle, state)),
1225            }),
1226            Err(e) => self.event_queue.push_back(UnprocessedFDomainEvent::Ready(
1227                FDomainEvent::ReplacedHandle(tid, Err(e)),
1228            )),
1229        }
1230
1231        Ok(())
1232    }
1233
1234    pub fn signal(&mut self, request: proto::FDomainSignalRequest) -> Result<()> {
1235        let set = fidl::Signals::from_bits_retain(request.set);
1236        let clear = fidl::Signals::from_bits_retain(request.clear);
1237
1238        self.using_handle(request.handle, |h| h.handle.signal(clear, set))
1239    }
1240
1241    pub fn signal_peer(&mut self, request: proto::FDomainSignalPeerRequest) -> Result<()> {
1242        let set = fidl::Signals::from_bits_retain(request.set);
1243        let clear = fidl::Signals::from_bits_retain(request.clear);
1244
1245        self.using_handle(request.handle, |h| h.handle.signal_peer(clear, set))
1246    }
1247
1248    pub fn get_koid(
1249        &mut self,
1250        request: proto::FDomainGetKoidRequest,
1251    ) -> Result<proto::FDomainGetKoidResponse> {
1252        self.using_handle(request.handle, |h| {
1253            h.handle
1254                .as_handle_ref()
1255                .koid()
1256                .map(|k| proto::FDomainGetKoidResponse { koid: k.raw_koid() })
1257                .map_err(|e| proto::Error::TargetError(e.into_raw()))
1258        })
1259    }
1260
1261    pub fn read_channel_streaming_start(
1262        &mut self,
1263        tid: NonZeroU32,
1264        request: proto::ChannelReadChannelStreamingStartRequest,
1265    ) {
1266        if let Err(err) = self.using_handle(request.handle, |h| {
1267            h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1268            h.read_queue.push_back(ReadOp::StreamingChannel(tid, true));
1269            Ok(())
1270        }) {
1271            self.event_queue
1272                .push_back(FDomainEvent::ChannelStreamingReadStart(tid, Err(err)).into())
1273        }
1274    }
1275
1276    pub fn read_channel_streaming_stop(
1277        &mut self,
1278        tid: NonZeroU32,
1279        request: proto::ChannelReadChannelStreamingStopRequest,
1280    ) {
1281        if let Err(err) = self.using_handle(request.handle, |h| {
1282            h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1283            h.read_queue.push_back(ReadOp::StreamingChannel(tid, false));
1284            Ok(())
1285        }) {
1286            self.event_queue.push_back(FDomainEvent::ChannelStreamingReadStop(tid, Err(err)).into())
1287        }
1288    }
1289
1290    pub fn read_socket_streaming_start(
1291        &mut self,
1292        tid: NonZeroU32,
1293        request: proto::SocketReadSocketStreamingStartRequest,
1294    ) {
1295        if let Err(err) = self.using_handle(request.handle, |h| {
1296            h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1297            h.read_queue.push_back(ReadOp::StreamingSocket(tid, true));
1298            Ok(())
1299        }) {
1300            self.event_queue.push_back(FDomainEvent::SocketStreamingReadStart(tid, Err(err)).into())
1301        }
1302    }
1303
1304    pub fn read_socket_streaming_stop(
1305        &mut self,
1306        tid: NonZeroU32,
1307        request: proto::SocketReadSocketStreamingStopRequest,
1308    ) {
1309        if let Err(err) = self.using_handle(request.handle, |h| {
1310            h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1311            h.read_queue.push_back(ReadOp::StreamingSocket(tid, false));
1312            Ok(())
1313        }) {
1314            self.event_queue.push_back(FDomainEvent::SocketStreamingReadStop(tid, Err(err)).into())
1315        }
1316    }
1317}
1318
1319/// [`FDomain`] implements a stream of events, for protocol events and for
1320/// replies to long-running methods.
1321impl futures::Stream for FDomain {
1322    type Item = FDomainEvent;
1323
1324    fn poll_next(
1325        mut self: std::pin::Pin<&mut Self>,
1326        ctx: &mut Context<'_>,
1327    ) -> Poll<Option<Self::Item>> {
1328        let this = &mut *self;
1329
1330        let mut closing_handles = std::mem::replace(&mut this.closing_handles, Vec::new());
1331        closing_handles.retain_mut(|x| x.poll_ready(this, ctx).is_pending());
1332        this.closing_handles = closing_handles;
1333
1334        let handles = &mut this.handles;
1335        let event_queue = &mut this.event_queue;
1336        for state in handles.values_mut() {
1337            state.poll(event_queue, ctx);
1338        }
1339
1340        if let Some(event) = self.event_queue.pop_front() {
1341            match event {
1342                UnprocessedFDomainEvent::Ready(event) => Poll::Ready(Some(event)),
1343                UnprocessedFDomainEvent::ChannelData(tid, message) => {
1344                    Poll::Ready(Some(FDomainEvent::ChannelData(tid, self.process_message(message))))
1345                }
1346                UnprocessedFDomainEvent::ChannelStreamingData(hid, message) => {
1347                    match self.process_message(message) {
1348                        Ok(message) => Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1349                            proto::ChannelOnChannelStreamingDataRequest {
1350                                handle: hid,
1351                                channel_sent: proto::ChannelSent::Message(message),
1352                            },
1353                        ))),
1354                        Err(e) => {
1355                            self.handles.get_mut(&hid).unwrap().async_read_in_progress = false;
1356                            Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1357                                proto::ChannelOnChannelStreamingDataRequest {
1358                                    handle: hid,
1359                                    channel_sent: proto::ChannelSent::Stopped(proto::AioStopped {
1360                                        error: Some(Box::new(e)),
1361                                    }),
1362                                },
1363                            )))
1364                        }
1365                    }
1366                }
1367            }
1368        } else {
1369            self.waker = Some(ctx.waker().clone());
1370            Poll::Pending
1371        }
1372    }
1373}