Skip to main content

fdomain_container/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::AsHandleRef;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_fdomain as proto;
8use fidl_fuchsia_io as fio;
9use fuchsia_async as fasync;
10use futures::prelude::*;
11use replace_with::replace_with;
12use std::collections::hash_map::Entry;
13use std::collections::{HashMap, VecDeque};
14use std::num::NonZeroU32;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU32, Ordering};
17use std::task::{Context, Poll, Waker};
18
19mod handles;
20pub mod wire;
21
22#[cfg(test)]
23mod test;
24
25pub type Result<T, E = proto::Error> = std::result::Result<T, E>;
26
27use handles::{AnyHandle, HandleType as _, IsDatagramSocket};
28
29/// A queue. Basically just a `VecDeque` except we can asynchronously wait for
30/// an element to pop if it is empty.
31struct Queue<T>(VecDeque<T>, Option<Waker>);
32
33impl<T> Queue<T> {
34    /// Create a new queue.
35    fn new() -> Self {
36        Queue(VecDeque::new(), None)
37    }
38
39    /// Whether the queue is empty.
40    fn is_empty(&self) -> bool {
41        self.0.is_empty()
42    }
43
44    /// Removes and discards the first element in the queue.
45    ///
46    /// # Panics
47    /// There *must* be a first element or this will panic.
48    fn destroy_front(&mut self) {
49        assert!(self.0.pop_front().is_some(), "Expected to find a value!");
50    }
51
52    /// Pop the first element from the queue if available.
53    fn pop_front(&mut self, ctx: &mut Context<'_>) -> Poll<T> {
54        if let Some(t) = self.0.pop_front() {
55            Poll::Ready(t)
56        } else {
57            self.1 = Some(ctx.waker().clone());
58            Poll::Pending
59        }
60    }
61
62    /// Return an element to the front of the queue. Does not wake any waiters
63    /// as it is assumed the waiter is the one who popped it to begin with.
64    ///
65    /// This is used when we'd *like* to use `front_mut` but we can't borrow the
66    /// source of `self` for that long without giving ourselves lifetime
67    /// headaches.
68    fn push_front_no_wake(&mut self, t: T) {
69        self.0.push_front(t)
70    }
71
72    /// Push a new element to the back of the queue.
73    fn push_back(&mut self, t: T) {
74        self.0.push_back(t);
75        self.1.take().map(Waker::wake);
76    }
77
78    /// Get a mutable reference to the first element in the queue.
79    fn front_mut(&mut self, ctx: &mut Context<'_>) -> Poll<&mut T> {
80        if let Some(t) = self.0.front_mut() {
81            Poll::Ready(t)
82        } else {
83            self.1 = Some(ctx.waker().clone());
84            Poll::Pending
85        }
86    }
87}
88
89/// Maximum amount to read for an async socket read.
90const ASYNC_READ_BUFSIZE: u64 = 40960;
91
92/// Wraps the various FIDL Event types that can be produced by an FDomain
93#[derive(Debug)]
94pub enum FDomainEvent {
95    ChannelStreamingReadStart(NonZeroU32, Result<()>),
96    ChannelStreamingReadStop(NonZeroU32, Result<()>),
97    SocketStreamingReadStart(NonZeroU32, Result<()>),
98    SocketStreamingReadStop(NonZeroU32, Result<()>),
99    WaitForSignals(NonZeroU32, Result<proto::FDomainWaitForSignalsResponse>),
100    SocketData(NonZeroU32, Result<proto::SocketData>),
101    SocketStreamingData(proto::SocketOnSocketStreamingDataRequest),
102    SocketDispositionSet(NonZeroU32, Result<()>),
103    WroteSocket(NonZeroU32, Result<proto::SocketWriteSocketResponse, proto::WriteSocketError>),
104    ChannelData(NonZeroU32, Result<proto::ChannelMessage>),
105    ChannelStreamingData(proto::ChannelOnChannelStreamingDataRequest),
106    WroteChannel(NonZeroU32, Result<(), proto::WriteChannelError>),
107    ClosedHandle(NonZeroU32, Result<()>),
108    ReplacedHandle(NonZeroU32, Result<()>),
109}
110
111/// An [`FDomainEvent`] that needs a bit more processing before it can be sent.
112/// I.e. it still contains `fidl::NullableHandle` objects that need to be replaced with
113/// FDomain IDs.
114enum UnprocessedFDomainEvent {
115    Ready(FDomainEvent),
116    ChannelData(NonZeroU32, fidl::MessageBufEtc),
117    ChannelStreamingData(proto::HandleId, fidl::MessageBufEtc),
118}
119
120impl From<FDomainEvent> for UnprocessedFDomainEvent {
121    fn from(other: FDomainEvent) -> UnprocessedFDomainEvent {
122        UnprocessedFDomainEvent::Ready(other)
123    }
124}
125
126/// Operations on a handle which are processed from the read queue.
127enum ReadOp {
128    /// Enable or disable async reads on a channel.
129    StreamingChannel(NonZeroU32, bool),
130    /// Enable or disable async reads on a socket.
131    StreamingSocket(NonZeroU32, bool),
132    Socket(NonZeroU32, u64),
133    Channel(NonZeroU32),
134}
135
136/// An in-progress socket write. It may take multiple syscalls to write to a
137/// socket, so this tracks how many bytes were written already and how many
138/// remain to be written.
139struct SocketWrite {
140    tid: NonZeroU32,
141    wrote: usize,
142    to_write: Vec<u8>,
143}
144
145/// Operations on a handle which are processed from the write queue.
146enum WriteOp {
147    Socket(SocketWrite),
148    Channel(NonZeroU32, Vec<u8>, HandlesToWrite),
149    SetDisposition(NonZeroU32, proto::SocketDisposition, proto::SocketDisposition),
150}
151
152/// A handle which is being moved out of the FDomain by a channel write call or
153/// closure/replacement.  There may still be operations to perform on this
154/// handle, so the write should not proceed while the handle is in the `InUse`
155/// state.
156enum ShuttingDownHandle {
157    InUse(proto::HandleId, HandleState),
158    Ready(AnyHandle),
159}
160
161impl ShuttingDownHandle {
162    fn poll_ready(
163        &mut self,
164        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
165        ctx: &mut Context<'_>,
166    ) -> Poll<()> {
167        replace_with(self, |this| match this {
168            this @ ShuttingDownHandle::Ready(_) => this,
169            ShuttingDownHandle::InUse(hid, mut state) => {
170                state.poll(event_queue, ctx);
171
172                if state.write_queue.is_empty() {
173                    while let Poll::Ready(op) = state.read_queue.pop_front(ctx) {
174                        match op {
175                            ReadOp::StreamingChannel(tid, start) => {
176                                let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
177                                    id: hid.id,
178                                }));
179                                let event = if start {
180                                    FDomainEvent::ChannelStreamingReadStart(tid, err)
181                                } else {
182                                    FDomainEvent::ChannelStreamingReadStop(tid, err)
183                                };
184                                event_queue.push_back(event.into());
185                            }
186                            ReadOp::StreamingSocket(tid, start) => {
187                                let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
188                                    id: hid.id,
189                                }));
190                                let event = if start {
191                                    FDomainEvent::SocketStreamingReadStart(tid, err)
192                                } else {
193                                    FDomainEvent::SocketStreamingReadStop(tid, err)
194                                };
195                                event_queue.push_back(event.into());
196                            }
197                            ReadOp::Channel(tid) => {
198                                let err = state
199                                    .handle
200                                    .expected_type(fidl::ObjectType::CHANNEL)
201                                    .err()
202                                    .unwrap_or(proto::Error::ClosedDuringRead(
203                                        proto::ClosedDuringRead,
204                                    ));
205                                event_queue
206                                    .push_back(FDomainEvent::ChannelData(tid, Err(err)).into());
207                            }
208                            ReadOp::Socket(tid, _max_bytes) => {
209                                let err = state
210                                    .handle
211                                    .expected_type(fidl::ObjectType::SOCKET)
212                                    .err()
213                                    .unwrap_or(proto::Error::ClosedDuringRead(
214                                        proto::ClosedDuringRead,
215                                    ));
216                                event_queue
217                                    .push_back(FDomainEvent::SocketData(tid, Err(err)).into());
218                            }
219                        }
220                    }
221
222                    if state.async_read_in_progress {
223                        match &*state.handle {
224                            AnyHandle::Channel(_) => event_queue.push_back(
225                                FDomainEvent::ChannelStreamingData(
226                                    proto::ChannelOnChannelStreamingDataRequest {
227                                        handle: hid,
228                                        channel_sent: proto::ChannelSent::Stopped(
229                                            proto::AioStopped { error: None },
230                                        ),
231                                    },
232                                )
233                                .into(),
234                            ),
235                            AnyHandle::Socket(_) => event_queue.push_back(
236                                FDomainEvent::SocketStreamingData(
237                                    proto::SocketOnSocketStreamingDataRequest {
238                                        handle: hid,
239                                        socket_message: proto::SocketMessage::Stopped(
240                                            proto::AioStopped { error: None },
241                                        ),
242                                    },
243                                )
244                                .into(),
245                            ),
246                            AnyHandle::EventPair(_)
247                            | AnyHandle::Event(_)
248                            | AnyHandle::Unknown(_) => unreachable!(),
249                        }
250                    }
251
252                    state.signal_waiters.clear();
253                    state.io_waiter = None;
254
255                    ShuttingDownHandle::Ready(
256                        Arc::into_inner(state.handle).expect("Unaccounted-for handle reference!"),
257                    )
258                } else {
259                    ShuttingDownHandle::InUse(hid, state)
260                }
261            }
262        });
263
264        if matches!(self, ShuttingDownHandle::Ready(_)) { Poll::Ready(()) } else { Poll::Pending }
265    }
266}
267
268/// A vector of [`ShuttingDownHandle`] paired with rights for the new handles, which
269/// can transition into being a vector of [`fidl::HandleDisposition`] when all the
270/// handles are ready.
271enum HandlesToWrite {
272    SomeInUse(Vec<(ShuttingDownHandle, fidl::Rights)>),
273    AllReady(Vec<fidl::HandleDisposition<'static>>),
274}
275
276impl HandlesToWrite {
277    fn poll_ready(
278        &mut self,
279        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
280        ctx: &mut Context<'_>,
281    ) -> Poll<&mut Vec<fidl::HandleDisposition<'static>>> {
282        match self {
283            HandlesToWrite::AllReady(s) => Poll::Ready(s),
284            HandlesToWrite::SomeInUse(handles) => {
285                let mut ready = true;
286                for (handle, _) in handles.iter_mut() {
287                    ready = ready && handle.poll_ready(event_queue, ctx).is_ready();
288                }
289
290                if !ready {
291                    return Poll::Pending;
292                }
293
294                *self = HandlesToWrite::AllReady(
295                    handles
296                        .drain(..)
297                        .map(|(handle, rights)| {
298                            let ShuttingDownHandle::Ready(handle) = handle else { unreachable!() };
299
300                            fidl::HandleDisposition::new(
301                                fidl::HandleOp::Move(handle.into()),
302                                fidl::ObjectType::NONE,
303                                rights,
304                                fidl::Status::OK,
305                            )
306                        })
307                        .collect(),
308                );
309
310                let HandlesToWrite::AllReady(s) = self else { unreachable!() };
311                Poll::Ready(s)
312            }
313        }
314    }
315}
316
317struct AnyHandleRef(Arc<AnyHandle>);
318
319impl AsHandleRef for AnyHandleRef {
320    fn as_handle_ref(&self) -> fidl::HandleRef<'_> {
321        self.0.as_handle_ref()
322    }
323}
324
325#[cfg(target_os = "fuchsia")]
326type OnSignals = fasync::OnSignals<'static, AnyHandleRef>;
327
328#[cfg(not(target_os = "fuchsia"))]
329type OnSignals = fasync::OnSignalsRef<'static>;
330
331/// Represents a `WaitForSignals` transaction from a client. When the contained
332/// `OnSignals` polls to completion we can reply to the transaction.
333struct SignalWaiter {
334    tid: NonZeroU32,
335    waiter: OnSignals,
336}
337
338/// Information about a single handle within the [`FDomain`].
339struct HandleState {
340    /// The handle itself.
341    handle: Arc<AnyHandle>,
342    /// Our handle ID.
343    hid: proto::HandleId,
344    /// Whether this is a datagram socket. We have to handle data coming out of
345    /// datagram sockets a bit differently to preserve their semantics from the
346    /// perspective of the host and avoid data loss.
347    is_datagram_socket: bool,
348    /// Indicates we are sending `On*StreamingData` events to the client
349    /// presently. It is an error for the user to try to move the handle out of
350    /// the FDomain (e.g. send it through a channel or close it) until after
351    /// they request that streaming events stop.
352    async_read_in_progress: bool,
353    /// Queue of client requests to read from the handle. We have to queue read
354    /// requests because they may block, and we don't want to block the event
355    /// loop or be unable to handle further requests while a long read request
356    /// is blocking. Also we want to retire read requests in the order they were
357    /// submitted, otherwise pipelined reads could return data in a strange order.
358    read_queue: Queue<ReadOp>,
359    /// Queue of client requests to write to the handle. We have to queue write
360    /// requests for the same reason we have to queue read requests. Since we
361    /// process the queue one at a time, we need a separate queue for writes
362    /// otherwise we'd effectively make handles half-duplex, with read requests
363    /// unable to proceed if a write request is blocked at the head of the
364    /// queue.
365    write_queue: Queue<WriteOp>,
366    /// List of outstanding `WaitForSignals` transactions.
367    signal_waiters: Vec<SignalWaiter>,
368    /// Contains a waiter on this handle for IO reading and writing. Populated
369    /// whenever we need to block on IO to service a request.
370    io_waiter: Option<OnSignals>,
371}
372
373impl HandleState {
374    fn new(handle: AnyHandle, hid: proto::HandleId) -> Result<Self, proto::Error> {
375        let is_datagram_socket = match handle.is_datagram_socket() {
376            IsDatagramSocket::Unknown => {
377                return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown {
378                    type_: proto::SocketType::unknown(),
379                }));
380            }
381            other => other.is_datagram(),
382        };
383        Ok(HandleState {
384            handle: Arc::new(handle),
385            hid,
386            async_read_in_progress: false,
387            is_datagram_socket,
388            read_queue: Queue::new(),
389            write_queue: Queue::new(),
390            signal_waiters: Vec::new(),
391            io_waiter: None,
392        })
393    }
394
395    /// Poll this handle state. Lets us handle our IO queues and wait for the
396    /// next IO event.
397    fn poll(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>, ctx: &mut Context<'_>) {
398        self.signal_waiters.retain_mut(|x| {
399            let Poll::Ready(result) = x.waiter.poll_unpin(ctx) else {
400                return true;
401            };
402
403            event_queue.push_back(
404                FDomainEvent::WaitForSignals(
405                    x.tid,
406                    result
407                        .map(|x| proto::FDomainWaitForSignalsResponse { signals: x.bits() })
408                        .map_err(|e| proto::Error::TargetError(e.into_raw())),
409                )
410                .into(),
411            );
412
413            false
414        });
415
416        let read_signals = self.handle.read_signals();
417        let write_signals = self.handle.write_signals();
418
419        loop {
420            if let Some(signal_waiter) = self.io_waiter.as_mut() {
421                if let Poll::Ready(sigs) = signal_waiter.poll_unpin(ctx) {
422                    if let Ok(sigs) = sigs {
423                        if sigs.intersects(read_signals) {
424                            self.process_read_queue(event_queue, ctx);
425                        }
426                        if sigs.intersects(write_signals) {
427                            self.process_write_queue(event_queue, ctx);
428                        }
429                    }
430                } else {
431                    let need_read = matches!(
432                        self.read_queue.front_mut(ctx),
433                        Poll::Ready(ReadOp::StreamingChannel(_, _) | ReadOp::StreamingSocket(_, _))
434                    );
435                    let need_write = matches!(
436                        self.write_queue.front_mut(ctx),
437                        Poll::Ready(WriteOp::SetDisposition(_, _, _))
438                    );
439
440                    self.process_read_queue(event_queue, ctx);
441                    self.process_write_queue(event_queue, ctx);
442
443                    if !(need_read || need_write) {
444                        break;
445                    }
446                }
447            }
448
449            let subscribed_signals =
450                if self.async_read_in_progress || !self.read_queue.is_empty() {
451                    read_signals
452                } else {
453                    fidl::Signals::NONE
454                } | if !self.write_queue.is_empty() { write_signals } else { fidl::Signals::NONE };
455
456            if !subscribed_signals.is_empty() {
457                self.io_waiter = Some(OnSignals::new(
458                    AnyHandleRef(Arc::clone(&self.handle)),
459                    subscribed_signals,
460                ));
461            } else {
462                self.io_waiter = None;
463                break;
464            }
465        }
466    }
467
468    /// Set `async_read_in_progress` to `true`. Return an error if it was already `true`.
469    fn try_enable_async_read(&mut self) -> Result<()> {
470        if self.async_read_in_progress {
471            Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress))
472        } else {
473            self.async_read_in_progress = true;
474            Ok(())
475        }
476    }
477
478    /// Set `async_read_in_progress` to `false`. Return an error if it was already `false`.
479    fn try_disable_async_read(&mut self) -> Result<()> {
480        if !self.async_read_in_progress {
481            Err(proto::Error::NoReadInProgress(proto::NoReadInProgress))
482        } else {
483            self.async_read_in_progress = false;
484            Ok(())
485        }
486    }
487
488    /// Handle events from the front of the read queue.
489    fn process_read_queue(
490        &mut self,
491        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
492        ctx: &mut Context<'_>,
493    ) {
494        while let Poll::Ready(op) = self.read_queue.front_mut(ctx) {
495            match op {
496                ReadOp::StreamingChannel(tid, true) => {
497                    let tid = *tid;
498                    let result = self.try_enable_async_read();
499                    event_queue
500                        .push_back(FDomainEvent::ChannelStreamingReadStart(tid, result).into());
501                    self.read_queue.destroy_front();
502                }
503                ReadOp::StreamingChannel(tid, false) => {
504                    let tid = *tid;
505                    let result = self.try_disable_async_read();
506                    event_queue
507                        .push_back(FDomainEvent::ChannelStreamingReadStop(tid, result).into());
508                    self.read_queue.destroy_front();
509                }
510                ReadOp::StreamingSocket(tid, true) => {
511                    let tid = *tid;
512                    let result = self.try_enable_async_read();
513                    event_queue
514                        .push_back(FDomainEvent::SocketStreamingReadStart(tid, result).into());
515                    self.read_queue.destroy_front();
516                }
517                ReadOp::StreamingSocket(tid, false) => {
518                    let tid = *tid;
519                    let result = self.try_disable_async_read();
520                    event_queue
521                        .push_back(FDomainEvent::SocketStreamingReadStop(tid, result).into());
522                    self.read_queue.destroy_front();
523                }
524                ReadOp::Socket(tid, max_bytes) => {
525                    let (tid, max_bytes) = (*tid, *max_bytes);
526                    if let Some(event) = self.do_read_socket(tid, max_bytes) {
527                        let _ = self.read_queue.pop_front(ctx);
528                        event_queue.push_back(event.into());
529                    } else {
530                        break;
531                    }
532                }
533                ReadOp::Channel(tid) => {
534                    let tid = *tid;
535                    if let Some(event) = self.do_read_channel(tid) {
536                        let _ = self.read_queue.pop_front(ctx);
537                        event_queue.push_back(event.into());
538                    } else {
539                        break;
540                    }
541                }
542            }
543        }
544
545        if self.async_read_in_progress {
546            // We should have error'd out of any blocking operations if we had a
547            // read in progress.
548            assert!(self.read_queue.is_empty());
549            self.process_async_read(event_queue);
550        }
551    }
552
553    fn process_async_read(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>) {
554        assert!(self.async_read_in_progress);
555
556        match &*self.handle {
557            AnyHandle::Channel(_) => {
558                'read_loop: while let Some(result) = self.handle.read_channel().transpose() {
559                    match result {
560                        Ok(msg) => event_queue.push_back(
561                            UnprocessedFDomainEvent::ChannelStreamingData(self.hid, msg),
562                        ),
563                        Err(e) => {
564                            event_queue.push_back(
565                                FDomainEvent::ChannelStreamingData(
566                                    proto::ChannelOnChannelStreamingDataRequest {
567                                        handle: self.hid,
568                                        channel_sent: proto::ChannelSent::Stopped(
569                                            proto::AioStopped { error: Some(Box::new(e)) },
570                                        ),
571                                    },
572                                )
573                                .into(),
574                            );
575                            self.async_read_in_progress = false;
576                            break 'read_loop;
577                        }
578                    }
579                }
580            }
581
582            AnyHandle::Socket(_) => {
583                'read_loop: while let Some(result) =
584                    self.handle.read_socket(ASYNC_READ_BUFSIZE).transpose()
585                {
586                    match result {
587                        Ok(data) => {
588                            event_queue.push_back(
589                                FDomainEvent::SocketStreamingData(
590                                    proto::SocketOnSocketStreamingDataRequest {
591                                        handle: self.hid,
592                                        socket_message: proto::SocketMessage::Data(
593                                            proto::SocketData {
594                                                data,
595                                                is_datagram: self.is_datagram_socket,
596                                            },
597                                        ),
598                                    },
599                                )
600                                .into(),
601                            );
602                        }
603                        Err(e) => {
604                            event_queue.push_back(
605                                FDomainEvent::SocketStreamingData(
606                                    proto::SocketOnSocketStreamingDataRequest {
607                                        handle: self.hid,
608                                        socket_message: proto::SocketMessage::Stopped(
609                                            proto::AioStopped { error: Some(Box::new(e)) },
610                                        ),
611                                    },
612                                )
613                                .into(),
614                            );
615                            self.async_read_in_progress = false;
616                            break 'read_loop;
617                        }
618                    }
619                }
620            }
621
622            _ => unreachable!("Processed async read for unreadable handle type!"),
623        }
624    }
625
626    /// Handle events from the front of the write queue.
627    fn process_write_queue(
628        &mut self,
629        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
630        ctx: &mut Context<'_>,
631    ) {
632        // We want to mutate and *maybe* pop the front of the write queue, but
633        // lifetime shenanigans mean we can't do that and also access `self`,
634        // which we need. So we pop the item always, and then maybe push it to
635        // the front again if we didn't actually want to pop it.
636        while let Poll::Ready(op) = self.write_queue.pop_front(ctx) {
637            match op {
638                WriteOp::Socket(mut op) => {
639                    if let Some(event) = self.do_write_socket(&mut op) {
640                        event_queue.push_back(event.into());
641                    } else {
642                        self.write_queue.push_front_no_wake(WriteOp::Socket(op));
643                        break;
644                    }
645                }
646                WriteOp::SetDisposition(tid, disposition, disposition_peer) => {
647                    let result = { self.handle.socket_disposition(disposition, disposition_peer) };
648                    event_queue.push_back(FDomainEvent::SocketDispositionSet(tid, result).into())
649                }
650                WriteOp::Channel(tid, data, mut handles) => {
651                    if self
652                        .do_write_channel(tid, &data, &mut handles, event_queue, ctx)
653                        .is_pending()
654                    {
655                        self.write_queue.push_front_no_wake(WriteOp::Channel(tid, data, handles));
656                        break;
657                    }
658                }
659            }
660        }
661    }
662
663    /// Attempt to read from the handle in this [`HandleState`] as if it were a
664    /// socket. If the read succeeds or produces an error that should not be
665    /// retried, produce an [`FDomainEvent`] containing the result.
666    fn do_read_socket(&mut self, tid: NonZeroU32, max_bytes: u64) -> Option<FDomainEvent> {
667        if self.async_read_in_progress {
668            return Some(
669                FDomainEvent::SocketData(
670                    tid,
671                    Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
672                )
673                .into(),
674            );
675        }
676
677        let max_bytes = if self.is_datagram_socket {
678            let AnyHandle::Socket(s) = &*self.handle else {
679                unreachable!("Read socket from state that wasn't for a socket!");
680            };
681            match s.info() {
682                Ok(x) => x.rx_buf_available as u64,
683                // We should always succeed. The only failures are if we don't
684                // have the rights or something's screwed up with the handle. We
685                // know we have the rights because figuring out this was a
686                // datagram socket to begin with meant calling the same call on
687                // the same handle earlier.
688                Err(e) => {
689                    return Some(FDomainEvent::SocketData(
690                        tid,
691                        Err(proto::Error::TargetError(e.into_raw())),
692                    ));
693                }
694            }
695        } else {
696            max_bytes
697        };
698        self.handle.read_socket(max_bytes).transpose().map(|x| {
699            FDomainEvent::SocketData(
700                tid,
701                x.map(|data| proto::SocketData { data, is_datagram: self.is_datagram_socket }),
702            )
703        })
704    }
705
706    /// Attempt to write to the handle in this [`HandleState`] as if it were a
707    /// socket. If the write succeeds or produces an error that should not be
708    /// retried, produce an [`FDomainEvent`] containing the result.
709    fn do_write_socket(&mut self, op: &mut SocketWrite) -> Option<FDomainEvent> {
710        match self.handle.write_socket(&op.to_write) {
711            Ok(wrote) => {
712                op.wrote += wrote;
713                op.to_write.drain(..wrote);
714
715                if op.to_write.is_empty() {
716                    Some(FDomainEvent::WroteSocket(
717                        op.tid,
718                        Ok(proto::SocketWriteSocketResponse {
719                            wrote: op.wrote.try_into().unwrap(),
720                        }),
721                    ))
722                } else {
723                    None
724                }
725            }
726            Err(error) => Some(FDomainEvent::WroteSocket(
727                op.tid,
728                Err(proto::WriteSocketError { error, wrote: op.wrote.try_into().unwrap() }),
729            )),
730        }
731    }
732
733    /// Attempt to write to the handle in this [`HandleState`] as if it were a
734    /// channel. If the write succeeds or produces an error that should not be
735    /// retried, produce an [`FDomainEvent`] containing the result.
736    fn do_write_channel(
737        &mut self,
738        tid: NonZeroU32,
739        data: &[u8],
740        handles: &mut HandlesToWrite,
741        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
742        ctx: &mut Context<'_>,
743    ) -> Poll<()> {
744        let Poll::Ready(handles) = handles.poll_ready(event_queue, ctx) else {
745            return Poll::Pending;
746        };
747
748        let ret = self.handle.write_channel(data, handles);
749        if let Some(ret) = ret {
750            event_queue.push_back(FDomainEvent::WroteChannel(tid, ret).into())
751        }
752        Poll::Ready(())
753    }
754
755    /// Attempt to read from the handle in this [`HandleState`] as if it were a
756    /// channel. If the read succeeds or produces an error that should not be
757    /// retried, produce an [`FDomainEvent`] containing the result.
758    fn do_read_channel(&mut self, tid: NonZeroU32) -> Option<UnprocessedFDomainEvent> {
759        if self.async_read_in_progress {
760            return Some(
761                FDomainEvent::ChannelData(
762                    tid,
763                    Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
764                )
765                .into(),
766            );
767        }
768        match self.handle.read_channel() {
769            Ok(x) => x.map(|x| UnprocessedFDomainEvent::ChannelData(tid, x)),
770            Err(e) => Some(FDomainEvent::ChannelData(tid, Err(e)).into()),
771        }
772    }
773}
774
775/// State for a handle which is closing, but which needs its read and write
776/// queues flushed first.
777struct ClosingHandle {
778    action: Arc<CloseAction>,
779    state: Option<ShuttingDownHandle>,
780}
781
782impl ClosingHandle {
783    fn poll_ready(&mut self, fdomain: &mut FDomain, ctx: &mut Context<'_>) -> Poll<()> {
784        if let Some(state) = self.state.as_mut() {
785            if state.poll_ready(&mut fdomain.event_queue, ctx).is_ready() {
786                let state = self.state.take().unwrap();
787                let ShuttingDownHandle::Ready(handle) = state else {
788                    unreachable!();
789                };
790                self.action.perform(fdomain, handle);
791                Poll::Ready(())
792            } else {
793                Poll::Pending
794            }
795        } else {
796            Poll::Ready(())
797        }
798    }
799}
800
801/// When the client requests a handle to be closed or moved or otherwise
802/// destroyed, it goes into limbo for a bit while pending read and write actions
803/// are flushed. This is how we mark what should happen to the handle after that
804/// period ends.
805enum CloseAction {
806    Close { tid: NonZeroU32, count: AtomicU32, result: Result<()> },
807    Replace { tid: NonZeroU32, new_hid: proto::NewHandleId, rights: fidl::Rights },
808}
809
810impl CloseAction {
811    fn perform(&self, fdomain: &mut FDomain, handle: AnyHandle) {
812        match self {
813            CloseAction::Close { tid, count, result } => {
814                if count.fetch_sub(1, Ordering::Relaxed) == 1 {
815                    fdomain
816                        .event_queue
817                        .push_back(FDomainEvent::ClosedHandle(*tid, result.clone()).into());
818                }
819            }
820            CloseAction::Replace { tid, new_hid, rights } => {
821                let result = handle
822                    .replace(*rights)
823                    .and_then(|handle| fdomain.alloc_client_handles([*new_hid], [handle]));
824                fdomain.event_queue.push_back(FDomainEvent::ReplacedHandle(*tid, result).into());
825            }
826        }
827    }
828}
829
830/// This is a container of handles that is manipulable via the FDomain protocol.
831/// See [RFC-0228].
832///
833/// Most of the methods simply handle FIDL requests from the FDomain protocol.
834#[pin_project::pin_project]
835pub struct FDomain {
836    namespace: Box<dyn Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send>,
837    handles: HashMap<proto::HandleId, HandleState>,
838    closing_handles: Vec<ClosingHandle>,
839    event_queue: VecDeque<UnprocessedFDomainEvent>,
840    waker: Option<Waker>,
841}
842
843impl FDomain {
844    /// Create a new FDomain. The new FDomain is empty and ready to be connected
845    /// to by a client.
846    pub fn new_empty() -> Self {
847        Self::new(|| Err(fidl::Status::NOT_FOUND))
848    }
849
850    /// Create a new FDomain populated with the given namespace entries.
851    pub fn new(
852        namespace: impl Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send + 'static,
853    ) -> Self {
854        FDomain {
855            namespace: Box::new(namespace),
856            handles: HashMap::new(),
857            closing_handles: Vec::new(),
858            event_queue: VecDeque::new(),
859            waker: None,
860        }
861    }
862
863    /// Add an event to be emitted by this FDomain.
864    fn push_event(&mut self, event: impl Into<UnprocessedFDomainEvent>) {
865        self.event_queue.push_back(event.into());
866        self.waker.take().map(Waker::wake);
867    }
868
869    /// Given a [`fidl::MessageBufEtc`], load all of the handles from it into this
870    /// FDomain and return a [`ReadChannelPayload`](proto::ReadChannelPayload)
871    /// with the same data and the IDs for the handles.
872    fn process_message(
873        &mut self,
874        message: fidl::MessageBufEtc,
875    ) -> Result<proto::ChannelMessage, proto::Error> {
876        let (data, handles) = message.split();
877        let handles = handles
878            .into_iter()
879            .map(|info| {
880                let type_ = info.object_type;
881
882                let handle = match info.object_type {
883                    fidl::ObjectType::CHANNEL => {
884                        AnyHandle::Channel(fidl::Channel::from(info.handle))
885                    }
886                    fidl::ObjectType::SOCKET => AnyHandle::Socket(fidl::Socket::from(info.handle)),
887                    fidl::ObjectType::EVENTPAIR => {
888                        AnyHandle::EventPair(fidl::EventPair::from(info.handle))
889                    }
890                    fidl::ObjectType::EVENT => AnyHandle::Event(fidl::Event::from(info.handle)),
891                    _ => AnyHandle::Unknown(handles::Unknown(info.handle, info.object_type)),
892                };
893
894                Ok(proto::HandleInfo {
895                    rights: info.rights,
896                    handle: self.alloc_fdomain_handle(handle)?,
897                    type_,
898                })
899            })
900            .collect::<Result<Vec<_>, proto::Error>>()?;
901
902        Ok(proto::ChannelMessage { data, handles })
903    }
904
905    /// Allocate `N` new handle IDs. These are allocated from
906    /// [`NewHandleId`](proto::NewHandleId) and are expected to follow the protocol
907    /// rules for client-allocated handle IDs.
908    ///
909    /// If any of the handles passed fail to allocate, none of the handles will
910    /// be allocated.
911    fn alloc_client_handles<const N: usize>(
912        &mut self,
913        ids: [proto::NewHandleId; N],
914        handles: [AnyHandle; N],
915    ) -> Result<(), proto::Error> {
916        for id in ids {
917            if id.id & (1 << 31) != 0 {
918                return Err(proto::Error::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange {
919                    id: id.id,
920                }));
921            }
922
923            if self.handles.contains_key(&proto::HandleId { id: id.id }) {
924                return Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
925                    id: id.id,
926                    same_call: false,
927                }));
928            }
929        }
930
931        let mut sorted_ids = ids;
932        sorted_ids.sort();
933
934        if let Some([a, _]) = sorted_ids.array_windows().find(|&[a, b]| a == b) {
935            Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
936                id: a.id,
937                same_call: true,
938            }))
939        } else {
940            let ids = ids.into_iter().map(|id| proto::HandleId { id: id.id });
941            let handles = ids
942                .zip(handles.into_iter())
943                .map(|(id, h)| HandleState::new(h, id).map(|x| (id, x)))
944                .collect::<Result<Vec<_>, proto::Error>>()?;
945
946            self.handles.extend(handles);
947
948            Ok(())
949        }
950    }
951
952    /// Allocate a new handle ID. These are allocated internally and are
953    /// expected to follow the protocol rules for FDomain-allocated handle IDs.
954    fn alloc_fdomain_handle(&mut self, handle: AnyHandle) -> Result<proto::HandleId, proto::Error> {
955        loop {
956            let id = proto::HandleId { id: rand::random::<u32>() | (1u32 << 31) };
957            if let Entry::Vacant(v) = self.handles.entry(id) {
958                v.insert(HandleState::new(handle, id)?);
959                break Ok(id);
960            }
961        }
962    }
963
964    /// If a handle exists in this FDomain, remove it.
965    fn take_handle(&mut self, handle: proto::HandleId) -> Result<HandleState, proto::Error> {
966        self.handles
967            .remove(&handle)
968            .ok_or(proto::Error::BadHandleId(proto::BadHandleId { id: handle.id }))
969    }
970
971    /// Use a handle in our handle table, if it exists.
972    fn using_handle<T>(
973        &mut self,
974        id: proto::HandleId,
975        f: impl FnOnce(&mut HandleState) -> Result<T, proto::Error>,
976    ) -> Result<T, proto::Error> {
977        if let Some(s) = self.handles.get_mut(&id) {
978            f(s)
979        } else {
980            Err(proto::Error::BadHandleId(proto::BadHandleId { id: id.id }))
981        }
982    }
983
984    pub fn get_namespace(&mut self, request: proto::FDomainGetNamespaceRequest) -> Result<()> {
985        match (self.namespace)() {
986            Ok(endpoint) => self.alloc_client_handles(
987                [request.new_handle],
988                [AnyHandle::Channel(endpoint.into_channel())],
989            ),
990            Err(e) => Err(proto::Error::TargetError(e.into_raw())),
991        }
992    }
993
994    pub fn create_channel(&mut self, request: proto::ChannelCreateChannelRequest) -> Result<()> {
995        let (a, b) = fidl::Channel::create();
996        self.alloc_client_handles(request.handles, [AnyHandle::Channel(a), AnyHandle::Channel(b)])
997    }
998
999    pub fn create_socket(&mut self, request: proto::SocketCreateSocketRequest) -> Result<()> {
1000        let (a, b) = match request.options {
1001            proto::SocketType::Stream => fidl::Socket::create_stream(),
1002            proto::SocketType::Datagram => fidl::Socket::create_datagram(),
1003            type_ => {
1004                return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown { type_ }));
1005            }
1006        };
1007
1008        self.alloc_client_handles(request.handles, [AnyHandle::Socket(a), AnyHandle::Socket(b)])
1009    }
1010
1011    pub fn create_event_pair(
1012        &mut self,
1013        request: proto::EventPairCreateEventPairRequest,
1014    ) -> Result<()> {
1015        let (a, b) = fidl::EventPair::create();
1016        self.alloc_client_handles(
1017            request.handles,
1018            [AnyHandle::EventPair(a), AnyHandle::EventPair(b)],
1019        )
1020    }
1021
1022    pub fn create_event(&mut self, request: proto::EventCreateEventRequest) -> Result<()> {
1023        let a = fidl::Event::create();
1024        self.alloc_client_handles([request.handle], [AnyHandle::Event(a)])
1025    }
1026
1027    pub fn set_socket_disposition(
1028        &mut self,
1029        tid: NonZeroU32,
1030        request: proto::SocketSetSocketDispositionRequest,
1031    ) {
1032        if let Err(err) = self.using_handle(request.handle, |h| {
1033            h.write_queue.push_back(WriteOp::SetDisposition(
1034                tid,
1035                request.disposition,
1036                request.disposition_peer,
1037            ));
1038            Ok(())
1039        }) {
1040            self.push_event(FDomainEvent::SocketDispositionSet(tid, Err(err)));
1041        }
1042    }
1043
1044    pub fn read_socket(&mut self, tid: NonZeroU32, request: proto::SocketReadSocketRequest) {
1045        if let Err(e) = self.using_handle(request.handle, |h| {
1046            h.read_queue.push_back(ReadOp::Socket(tid, request.max_bytes));
1047            Ok(())
1048        }) {
1049            self.push_event(FDomainEvent::SocketData(tid, Err(e)));
1050        }
1051    }
1052
1053    pub fn read_channel(&mut self, tid: NonZeroU32, request: proto::ChannelReadChannelRequest) {
1054        if let Err(e) = self.using_handle(request.handle, |h| {
1055            h.read_queue.push_back(ReadOp::Channel(tid));
1056            Ok(())
1057        }) {
1058            self.push_event(FDomainEvent::ChannelData(tid, Err(e)));
1059        }
1060    }
1061
1062    pub fn write_socket(&mut self, tid: NonZeroU32, request: proto::SocketWriteSocketRequest) {
1063        if let Err(error) = self.using_handle(request.handle, |h| {
1064            h.write_queue.push_back(WriteOp::Socket(SocketWrite {
1065                tid,
1066                wrote: 0,
1067                to_write: request.data,
1068            }));
1069            Ok(())
1070        }) {
1071            self.push_event(FDomainEvent::WroteSocket(
1072                tid,
1073                Err(proto::WriteSocketError { error, wrote: 0 }),
1074            ));
1075        }
1076    }
1077
1078    pub fn write_channel(&mut self, tid: NonZeroU32, request: proto::ChannelWriteChannelRequest) {
1079        // Go through the list of handles in the requests (which will either be
1080        // a simple list of handles or a list of HandleDispositions) and obtain
1081        // for each a `ShuttingDownHandle` which contains our handle state (the
1082        // "Shutting down" refers to the fact that we're pulling the handle out
1083        // of the FDomain in order to send it) and the rights the requester
1084        // would like the handle to have upon arrival at the other end of the
1085        // channel.
1086        let handles: Vec<Result<(ShuttingDownHandle, fidl::Rights)>> = match request.handles {
1087            proto::Handles::Handles(h) => h
1088                .into_iter()
1089                .map(|h| {
1090                    if h != request.handle {
1091                        self.take_handle(h).map(|handle_state| {
1092                            (ShuttingDownHandle::InUse(h, handle_state), fidl::Rights::SAME_RIGHTS)
1093                        })
1094                    } else {
1095                        Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1096                    }
1097                })
1098                .collect(),
1099            proto::Handles::Dispositions(d) => d
1100                .into_iter()
1101                .map(|d| {
1102                    let res = match d.handle {
1103                        proto::HandleOp::Move_(h) => {
1104                            if h != request.handle {
1105                                self.take_handle(h).map(|x| ShuttingDownHandle::InUse(h, x))
1106                            } else {
1107                                Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1108                            }
1109                        }
1110                        proto::HandleOp::Duplicate(h) => {
1111                            if h != request.handle {
1112                                // If the requester wants us to duplicate the
1113                                // handle, we do so now rather than letting
1114                                // `write_etc` do it. Otherwise we have to use a
1115                                // reference to the handle, and we get lifetime
1116                                // hell.
1117                                self.using_handle(h, |h| {
1118                                    h.handle.duplicate(fidl::Rights::SAME_RIGHTS)
1119                                })
1120                                .map(ShuttingDownHandle::Ready)
1121                            } else {
1122                                Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1123                            }
1124                        }
1125                    };
1126
1127                    res.and_then(|x| Ok((x, d.rights)))
1128                })
1129                .collect(),
1130        };
1131
1132        if handles.iter().any(|x| x.is_err()) {
1133            let e = handles.into_iter().map(|x| x.err().map(Box::new)).collect();
1134
1135            self.push_event(FDomainEvent::WroteChannel(
1136                tid,
1137                Err(proto::WriteChannelError::OpErrors(e)),
1138            ));
1139            return;
1140        }
1141
1142        let handles = handles.into_iter().map(|x| x.unwrap()).collect::<Vec<_>>();
1143
1144        if let Err(e) = self.using_handle(request.handle, |h| {
1145            h.write_queue.push_back(WriteOp::Channel(
1146                tid,
1147                request.data,
1148                HandlesToWrite::SomeInUse(handles),
1149            ));
1150            Ok(())
1151        }) {
1152            self.push_event(FDomainEvent::WroteChannel(
1153                tid,
1154                Err(proto::WriteChannelError::Error(e)),
1155            ));
1156        }
1157    }
1158
1159    pub fn wait_for_signals(
1160        &mut self,
1161        tid: NonZeroU32,
1162        request: proto::FDomainWaitForSignalsRequest,
1163    ) {
1164        let result = self.using_handle(request.handle, |h| {
1165            let signals = fidl::Signals::from_bits_retain(request.signals);
1166            h.signal_waiters.push(SignalWaiter {
1167                tid,
1168                waiter: OnSignals::new(AnyHandleRef(Arc::clone(&h.handle)), signals),
1169            });
1170            Ok(())
1171        });
1172
1173        if let Err(e) = result {
1174            self.push_event(FDomainEvent::WaitForSignals(tid, Err(e)));
1175        } else {
1176            self.waker.take().map(Waker::wake);
1177        }
1178    }
1179
1180    pub fn close(&mut self, tid: NonZeroU32, request: proto::FDomainCloseRequest) {
1181        let mut states = Vec::with_capacity(request.handles.len());
1182        let mut result = Ok(());
1183        for hid in request.handles {
1184            match self.take_handle(hid) {
1185                Ok(state) => states.push((hid, state)),
1186
1187                Err(e) => {
1188                    result = result.and(Err(e));
1189                }
1190            }
1191        }
1192
1193        let action = Arc::new(CloseAction::Close {
1194            tid,
1195            count: AtomicU32::new(states.len().try_into().unwrap()),
1196            result,
1197        });
1198
1199        for (hid, state) in states {
1200            self.closing_handles.push(ClosingHandle {
1201                action: Arc::clone(&action),
1202                state: Some(ShuttingDownHandle::InUse(hid, state)),
1203            });
1204        }
1205    }
1206
1207    pub fn duplicate(&mut self, request: proto::FDomainDuplicateRequest) -> Result<()> {
1208        let rights = request.rights;
1209        let handle = self.using_handle(request.handle, |h| h.handle.duplicate(rights));
1210        handle.and_then(|h| self.alloc_client_handles([request.new_handle], [h]))
1211    }
1212
1213    pub fn replace(
1214        &mut self,
1215        tid: NonZeroU32,
1216        request: proto::FDomainReplaceRequest,
1217    ) -> Result<()> {
1218        let rights = request.rights;
1219        let new_hid = request.new_handle;
1220        match self.take_handle(request.handle) {
1221            Ok(state) => self.closing_handles.push(ClosingHandle {
1222                action: Arc::new(CloseAction::Replace { tid, new_hid, rights }),
1223                state: Some(ShuttingDownHandle::InUse(request.handle, state)),
1224            }),
1225            Err(e) => self.event_queue.push_back(UnprocessedFDomainEvent::Ready(
1226                FDomainEvent::ReplacedHandle(tid, Err(e)),
1227            )),
1228        }
1229
1230        Ok(())
1231    }
1232
1233    pub fn signal(&mut self, request: proto::FDomainSignalRequest) -> Result<()> {
1234        let set = fidl::Signals::from_bits_retain(request.set);
1235        let clear = fidl::Signals::from_bits_retain(request.clear);
1236
1237        self.using_handle(request.handle, |h| h.handle.signal(clear, set))
1238    }
1239
1240    pub fn signal_peer(&mut self, request: proto::FDomainSignalPeerRequest) -> Result<()> {
1241        let set = fidl::Signals::from_bits_retain(request.set);
1242        let clear = fidl::Signals::from_bits_retain(request.clear);
1243
1244        self.using_handle(request.handle, |h| h.handle.signal_peer(clear, set))
1245    }
1246
1247    pub fn get_koid(
1248        &mut self,
1249        request: proto::FDomainGetKoidRequest,
1250    ) -> Result<proto::FDomainGetKoidResponse> {
1251        self.using_handle(request.handle, |h| {
1252            h.handle
1253                .as_handle_ref()
1254                .koid()
1255                .map(|k| proto::FDomainGetKoidResponse { koid: k.raw_koid() })
1256                .map_err(|e| proto::Error::TargetError(e.into_raw()))
1257        })
1258    }
1259
1260    pub fn read_channel_streaming_start(
1261        &mut self,
1262        tid: NonZeroU32,
1263        request: proto::ChannelReadChannelStreamingStartRequest,
1264    ) {
1265        if let Err(err) = self.using_handle(request.handle, |h| {
1266            h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1267            h.read_queue.push_back(ReadOp::StreamingChannel(tid, true));
1268            Ok(())
1269        }) {
1270            self.event_queue
1271                .push_back(FDomainEvent::ChannelStreamingReadStart(tid, Err(err)).into())
1272        }
1273    }
1274
1275    pub fn read_channel_streaming_stop(
1276        &mut self,
1277        tid: NonZeroU32,
1278        request: proto::ChannelReadChannelStreamingStopRequest,
1279    ) {
1280        if let Err(err) = self.using_handle(request.handle, |h| {
1281            h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1282            h.read_queue.push_back(ReadOp::StreamingChannel(tid, false));
1283            Ok(())
1284        }) {
1285            self.event_queue.push_back(FDomainEvent::ChannelStreamingReadStop(tid, Err(err)).into())
1286        }
1287    }
1288
1289    pub fn read_socket_streaming_start(
1290        &mut self,
1291        tid: NonZeroU32,
1292        request: proto::SocketReadSocketStreamingStartRequest,
1293    ) {
1294        if let Err(err) = self.using_handle(request.handle, |h| {
1295            h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1296            h.read_queue.push_back(ReadOp::StreamingSocket(tid, true));
1297            Ok(())
1298        }) {
1299            self.event_queue.push_back(FDomainEvent::SocketStreamingReadStart(tid, Err(err)).into())
1300        }
1301    }
1302
1303    pub fn read_socket_streaming_stop(
1304        &mut self,
1305        tid: NonZeroU32,
1306        request: proto::SocketReadSocketStreamingStopRequest,
1307    ) {
1308        if let Err(err) = self.using_handle(request.handle, |h| {
1309            h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1310            h.read_queue.push_back(ReadOp::StreamingSocket(tid, false));
1311            Ok(())
1312        }) {
1313            self.event_queue.push_back(FDomainEvent::SocketStreamingReadStop(tid, Err(err)).into())
1314        }
1315    }
1316}
1317
1318/// [`FDomain`] implements a stream of events, for protocol events and for
1319/// replies to long-running methods.
1320impl futures::Stream for FDomain {
1321    type Item = FDomainEvent;
1322
1323    fn poll_next(
1324        mut self: std::pin::Pin<&mut Self>,
1325        ctx: &mut Context<'_>,
1326    ) -> Poll<Option<Self::Item>> {
1327        let this = &mut *self;
1328
1329        let mut closing_handles = std::mem::replace(&mut this.closing_handles, Vec::new());
1330        closing_handles.retain_mut(|x| x.poll_ready(this, ctx).is_pending());
1331        this.closing_handles = closing_handles;
1332
1333        let handles = &mut this.handles;
1334        let event_queue = &mut this.event_queue;
1335        for state in handles.values_mut() {
1336            state.poll(event_queue, ctx);
1337        }
1338
1339        if let Some(event) = self.event_queue.pop_front() {
1340            match event {
1341                UnprocessedFDomainEvent::Ready(event) => Poll::Ready(Some(event)),
1342                UnprocessedFDomainEvent::ChannelData(tid, message) => {
1343                    Poll::Ready(Some(FDomainEvent::ChannelData(tid, self.process_message(message))))
1344                }
1345                UnprocessedFDomainEvent::ChannelStreamingData(hid, message) => {
1346                    match self.process_message(message) {
1347                        Ok(message) => Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1348                            proto::ChannelOnChannelStreamingDataRequest {
1349                                handle: hid,
1350                                channel_sent: proto::ChannelSent::Message(message),
1351                            },
1352                        ))),
1353                        Err(e) => {
1354                            self.handles.get_mut(&hid).unwrap().async_read_in_progress = false;
1355                            Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1356                                proto::ChannelOnChannelStreamingDataRequest {
1357                                    handle: hid,
1358                                    channel_sent: proto::ChannelSent::Stopped(proto::AioStopped {
1359                                        error: Some(Box::new(e)),
1360                                    }),
1361                                },
1362                            )))
1363                        }
1364                    }
1365                }
1366            }
1367        } else {
1368            self.waker = Some(ctx.waker().clone());
1369            Poll::Pending
1370        }
1371    }
1372}