Skip to main content

fdomain_container/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::AsHandleRef;
6use fidl::endpoints::ClientEnd;
7use futures::prelude::*;
8use replace_with::replace_with;
9use std::collections::hash_map::Entry;
10use std::collections::{HashMap, VecDeque};
11use std::num::NonZeroU32;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicU32, Ordering};
14use std::task::{Context, Poll, Waker};
15use {fidl_fuchsia_fdomain as proto, fidl_fuchsia_io as fio, fuchsia_async as fasync};
16
17mod handles;
18pub mod wire;
19
20#[cfg(test)]
21mod test;
22
23pub type Result<T, E = proto::Error> = std::result::Result<T, E>;
24
25use handles::{AnyHandle, HandleType as _, IsDatagramSocket};
26
27/// A queue. Basically just a `VecDeque` except we can asynchronously wait for
28/// an element to pop if it is empty.
29struct Queue<T>(VecDeque<T>, Option<Waker>);
30
31impl<T> Queue<T> {
32    /// Create a new queue.
33    fn new() -> Self {
34        Queue(VecDeque::new(), None)
35    }
36
37    /// Whether the queue is empty.
38    fn is_empty(&self) -> bool {
39        self.0.is_empty()
40    }
41
42    /// Removes and discards the first element in the queue.
43    ///
44    /// # Panics
45    /// There *must* be a first element or this will panic.
46    fn destroy_front(&mut self) {
47        assert!(self.0.pop_front().is_some(), "Expected to find a value!");
48    }
49
50    /// Pop the first element from the queue if available.
51    fn pop_front(&mut self, ctx: &mut Context<'_>) -> Poll<T> {
52        if let Some(t) = self.0.pop_front() {
53            Poll::Ready(t)
54        } else {
55            self.1 = Some(ctx.waker().clone());
56            Poll::Pending
57        }
58    }
59
60    /// Return an element to the front of the queue. Does not wake any waiters
61    /// as it is assumed the waiter is the one who popped it to begin with.
62    ///
63    /// This is used when we'd *like* to use `front_mut` but we can't borrow the
64    /// source of `self` for that long without giving ourselves lifetime
65    /// headaches.
66    fn push_front_no_wake(&mut self, t: T) {
67        self.0.push_front(t)
68    }
69
70    /// Push a new element to the back of the queue.
71    fn push_back(&mut self, t: T) {
72        self.0.push_back(t);
73        self.1.take().map(Waker::wake);
74    }
75
76    /// Get a mutable reference to the first element in the queue.
77    fn front_mut(&mut self, ctx: &mut Context<'_>) -> Poll<&mut T> {
78        if let Some(t) = self.0.front_mut() {
79            Poll::Ready(t)
80        } else {
81            self.1 = Some(ctx.waker().clone());
82            Poll::Pending
83        }
84    }
85}
86
87/// Maximum amount to read for an async socket read.
88const ASYNC_READ_BUFSIZE: u64 = 40960;
89
90/// Wraps the various FIDL Event types that can be produced by an FDomain
91#[derive(Debug)]
92pub enum FDomainEvent {
93    ChannelStreamingReadStart(NonZeroU32, Result<()>),
94    ChannelStreamingReadStop(NonZeroU32, Result<()>),
95    SocketStreamingReadStart(NonZeroU32, Result<()>),
96    SocketStreamingReadStop(NonZeroU32, Result<()>),
97    WaitForSignals(NonZeroU32, Result<proto::FDomainWaitForSignalsResponse>),
98    SocketData(NonZeroU32, Result<proto::SocketData>),
99    SocketStreamingData(proto::SocketOnSocketStreamingDataRequest),
100    SocketDispositionSet(NonZeroU32, Result<()>),
101    WroteSocket(NonZeroU32, Result<proto::SocketWriteSocketResponse, proto::WriteSocketError>),
102    ChannelData(NonZeroU32, Result<proto::ChannelMessage>),
103    ChannelStreamingData(proto::ChannelOnChannelStreamingDataRequest),
104    WroteChannel(NonZeroU32, Result<(), proto::WriteChannelError>),
105    ClosedHandle(NonZeroU32, Result<()>),
106    ReplacedHandle(NonZeroU32, Result<()>),
107}
108
109/// An [`FDomainEvent`] that needs a bit more processing before it can be sent.
110/// I.e. it still contains `fidl::NullableHandle` objects that need to be replaced with
111/// FDomain IDs.
112enum UnprocessedFDomainEvent {
113    Ready(FDomainEvent),
114    ChannelData(NonZeroU32, fidl::MessageBufEtc),
115    ChannelStreamingData(proto::HandleId, fidl::MessageBufEtc),
116}
117
118impl From<FDomainEvent> for UnprocessedFDomainEvent {
119    fn from(other: FDomainEvent) -> UnprocessedFDomainEvent {
120        UnprocessedFDomainEvent::Ready(other)
121    }
122}
123
124/// Operations on a handle which are processed from the read queue.
125enum ReadOp {
126    /// Enable or disable async reads on a channel.
127    StreamingChannel(NonZeroU32, bool),
128    /// Enable or disable async reads on a socket.
129    StreamingSocket(NonZeroU32, bool),
130    Socket(NonZeroU32, u64),
131    Channel(NonZeroU32),
132}
133
134/// An in-progress socket write. It may take multiple syscalls to write to a
135/// socket, so this tracks how many bytes were written already and how many
136/// remain to be written.
137struct SocketWrite {
138    tid: NonZeroU32,
139    wrote: usize,
140    to_write: Vec<u8>,
141}
142
143/// Operations on a handle which are processed from the write queue.
144enum WriteOp {
145    Socket(SocketWrite),
146    Channel(NonZeroU32, Vec<u8>, HandlesToWrite),
147    SetDisposition(NonZeroU32, proto::SocketDisposition, proto::SocketDisposition),
148}
149
150/// A handle which is being moved out of the FDomain by a channel write call or
151/// closure/replacement.  There may still be operations to perform on this
152/// handle, so the write should not proceed while the handle is in the `InUse`
153/// state.
154enum ShuttingDownHandle {
155    InUse(proto::HandleId, HandleState),
156    Ready(AnyHandle),
157}
158
159impl ShuttingDownHandle {
160    fn poll_ready(
161        &mut self,
162        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
163        ctx: &mut Context<'_>,
164    ) -> Poll<()> {
165        replace_with(self, |this| match this {
166            this @ ShuttingDownHandle::Ready(_) => this,
167            ShuttingDownHandle::InUse(hid, mut state) => {
168                state.poll(event_queue, ctx);
169
170                if state.write_queue.is_empty() {
171                    while let Poll::Ready(op) = state.read_queue.pop_front(ctx) {
172                        match op {
173                            ReadOp::StreamingChannel(tid, start) => {
174                                let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
175                                    id: hid.id,
176                                }));
177                                let event = if start {
178                                    FDomainEvent::ChannelStreamingReadStart(tid, err)
179                                } else {
180                                    FDomainEvent::ChannelStreamingReadStop(tid, err)
181                                };
182                                event_queue.push_back(event.into());
183                            }
184                            ReadOp::StreamingSocket(tid, start) => {
185                                let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
186                                    id: hid.id,
187                                }));
188                                let event = if start {
189                                    FDomainEvent::SocketStreamingReadStart(tid, err)
190                                } else {
191                                    FDomainEvent::SocketStreamingReadStop(tid, err)
192                                };
193                                event_queue.push_back(event.into());
194                            }
195                            ReadOp::Channel(tid) => {
196                                let err = state
197                                    .handle
198                                    .expected_type(fidl::ObjectType::CHANNEL)
199                                    .err()
200                                    .unwrap_or(proto::Error::ClosedDuringRead(
201                                        proto::ClosedDuringRead,
202                                    ));
203                                event_queue
204                                    .push_back(FDomainEvent::ChannelData(tid, Err(err)).into());
205                            }
206                            ReadOp::Socket(tid, _max_bytes) => {
207                                let err = state
208                                    .handle
209                                    .expected_type(fidl::ObjectType::SOCKET)
210                                    .err()
211                                    .unwrap_or(proto::Error::ClosedDuringRead(
212                                        proto::ClosedDuringRead,
213                                    ));
214                                event_queue
215                                    .push_back(FDomainEvent::SocketData(tid, Err(err)).into());
216                            }
217                        }
218                    }
219
220                    if state.async_read_in_progress {
221                        match &*state.handle {
222                            AnyHandle::Channel(_) => event_queue.push_back(
223                                FDomainEvent::ChannelStreamingData(
224                                    proto::ChannelOnChannelStreamingDataRequest {
225                                        handle: hid,
226                                        channel_sent: proto::ChannelSent::Stopped(
227                                            proto::AioStopped { error: None },
228                                        ),
229                                    },
230                                )
231                                .into(),
232                            ),
233                            AnyHandle::Socket(_) => event_queue.push_back(
234                                FDomainEvent::SocketStreamingData(
235                                    proto::SocketOnSocketStreamingDataRequest {
236                                        handle: hid,
237                                        socket_message: proto::SocketMessage::Stopped(
238                                            proto::AioStopped { error: None },
239                                        ),
240                                    },
241                                )
242                                .into(),
243                            ),
244                            AnyHandle::EventPair(_)
245                            | AnyHandle::Event(_)
246                            | AnyHandle::Unknown(_) => unreachable!(),
247                        }
248                    }
249
250                    state.signal_waiters.clear();
251                    state.io_waiter = None;
252
253                    ShuttingDownHandle::Ready(
254                        Arc::into_inner(state.handle).expect("Unaccounted-for handle reference!"),
255                    )
256                } else {
257                    ShuttingDownHandle::InUse(hid, state)
258                }
259            }
260        });
261
262        if matches!(self, ShuttingDownHandle::Ready(_)) { Poll::Ready(()) } else { Poll::Pending }
263    }
264}
265
266/// A vector of [`ShuttingDownHandle`] paired with rights for the new handles, which
267/// can transition into being a vector of [`fidl::HandleDisposition`] when all the
268/// handles are ready.
269enum HandlesToWrite {
270    SomeInUse(Vec<(ShuttingDownHandle, fidl::Rights)>),
271    AllReady(Vec<fidl::HandleDisposition<'static>>),
272}
273
274impl HandlesToWrite {
275    fn poll_ready(
276        &mut self,
277        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
278        ctx: &mut Context<'_>,
279    ) -> Poll<&mut Vec<fidl::HandleDisposition<'static>>> {
280        match self {
281            HandlesToWrite::AllReady(s) => Poll::Ready(s),
282            HandlesToWrite::SomeInUse(handles) => {
283                let mut ready = true;
284                for (handle, _) in handles.iter_mut() {
285                    ready = ready && handle.poll_ready(event_queue, ctx).is_ready();
286                }
287
288                if !ready {
289                    return Poll::Pending;
290                }
291
292                *self = HandlesToWrite::AllReady(
293                    handles
294                        .drain(..)
295                        .map(|(handle, rights)| {
296                            let ShuttingDownHandle::Ready(handle) = handle else { unreachable!() };
297
298                            fidl::HandleDisposition::new(
299                                fidl::HandleOp::Move(handle.into()),
300                                fidl::ObjectType::NONE,
301                                rights,
302                                fidl::Status::OK,
303                            )
304                        })
305                        .collect(),
306                );
307
308                let HandlesToWrite::AllReady(s) = self else { unreachable!() };
309                Poll::Ready(s)
310            }
311        }
312    }
313}
314
315struct AnyHandleRef(Arc<AnyHandle>);
316
317impl AsHandleRef for AnyHandleRef {
318    fn as_handle_ref(&self) -> fidl::HandleRef<'_> {
319        self.0.as_handle_ref()
320    }
321}
322
323#[cfg(target_os = "fuchsia")]
324type OnSignals = fasync::OnSignals<'static, AnyHandleRef>;
325
326#[cfg(not(target_os = "fuchsia"))]
327type OnSignals = fasync::OnSignalsRef<'static>;
328
329/// Represents a `WaitForSignals` transaction from a client. When the contained
330/// `OnSignals` polls to completion we can reply to the transaction.
331struct SignalWaiter {
332    tid: NonZeroU32,
333    waiter: OnSignals,
334}
335
336/// Information about a single handle within the [`FDomain`].
337struct HandleState {
338    /// The handle itself.
339    handle: Arc<AnyHandle>,
340    /// Our handle ID.
341    hid: proto::HandleId,
342    /// Whether this is a datagram socket. We have to handle data coming out of
343    /// datagram sockets a bit differently to preserve their semantics from the
344    /// perspective of the host and avoid data loss.
345    is_datagram_socket: bool,
346    /// Indicates we are sending `On*StreamingData` events to the client
347    /// presently. It is an error for the user to try to move the handle out of
348    /// the FDomain (e.g. send it through a channel or close it) until after
349    /// they request that streaming events stop.
350    async_read_in_progress: bool,
351    /// Queue of client requests to read from the handle. We have to queue read
352    /// requests because they may block, and we don't want to block the event
353    /// loop or be unable to handle further requests while a long read request
354    /// is blocking. Also we want to retire read requests in the order they were
355    /// submitted, otherwise pipelined reads could return data in a strange order.
356    read_queue: Queue<ReadOp>,
357    /// Queue of client requests to write to the handle. We have to queue write
358    /// requests for the same reason we have to queue read requests. Since we
359    /// process the queue one at a time, we need a separate queue for writes
360    /// otherwise we'd effectively make handles half-duplex, with read requests
361    /// unable to proceed if a write request is blocked at the head of the
362    /// queue.
363    write_queue: Queue<WriteOp>,
364    /// List of outstanding `WaitForSignals` transactions.
365    signal_waiters: Vec<SignalWaiter>,
366    /// Contains a waiter on this handle for IO reading and writing. Populated
367    /// whenever we need to block on IO to service a request.
368    io_waiter: Option<OnSignals>,
369}
370
371impl HandleState {
372    fn new(handle: AnyHandle, hid: proto::HandleId) -> Result<Self, proto::Error> {
373        let is_datagram_socket = match handle.is_datagram_socket() {
374            IsDatagramSocket::Unknown => {
375                return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown {
376                    type_: proto::SocketType::unknown(),
377                }));
378            }
379            other => other.is_datagram(),
380        };
381        Ok(HandleState {
382            handle: Arc::new(handle),
383            hid,
384            async_read_in_progress: false,
385            is_datagram_socket,
386            read_queue: Queue::new(),
387            write_queue: Queue::new(),
388            signal_waiters: Vec::new(),
389            io_waiter: None,
390        })
391    }
392
393    /// Poll this handle state. Lets us handle our IO queues and wait for the
394    /// next IO event.
395    fn poll(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>, ctx: &mut Context<'_>) {
396        self.signal_waiters.retain_mut(|x| {
397            let Poll::Ready(result) = x.waiter.poll_unpin(ctx) else {
398                return true;
399            };
400
401            event_queue.push_back(
402                FDomainEvent::WaitForSignals(
403                    x.tid,
404                    result
405                        .map(|x| proto::FDomainWaitForSignalsResponse { signals: x.bits() })
406                        .map_err(|e| proto::Error::TargetError(e.into_raw())),
407                )
408                .into(),
409            );
410
411            false
412        });
413
414        let read_signals = self.handle.read_signals();
415        let write_signals = self.handle.write_signals();
416
417        loop {
418            if let Some(signal_waiter) = self.io_waiter.as_mut() {
419                if let Poll::Ready(sigs) = signal_waiter.poll_unpin(ctx) {
420                    if let Ok(sigs) = sigs {
421                        if sigs.intersects(read_signals) {
422                            self.process_read_queue(event_queue, ctx);
423                        }
424                        if sigs.intersects(write_signals) {
425                            self.process_write_queue(event_queue, ctx);
426                        }
427                    }
428                } else {
429                    let need_read = matches!(
430                        self.read_queue.front_mut(ctx),
431                        Poll::Ready(ReadOp::StreamingChannel(_, _) | ReadOp::StreamingSocket(_, _))
432                    );
433                    let need_write = matches!(
434                        self.write_queue.front_mut(ctx),
435                        Poll::Ready(WriteOp::SetDisposition(_, _, _))
436                    );
437
438                    self.process_read_queue(event_queue, ctx);
439                    self.process_write_queue(event_queue, ctx);
440
441                    if !(need_read || need_write) {
442                        break;
443                    }
444                }
445            }
446
447            let subscribed_signals =
448                if self.async_read_in_progress || !self.read_queue.is_empty() {
449                    read_signals
450                } else {
451                    fidl::Signals::NONE
452                } | if !self.write_queue.is_empty() { write_signals } else { fidl::Signals::NONE };
453
454            if !subscribed_signals.is_empty() {
455                self.io_waiter = Some(OnSignals::new(
456                    AnyHandleRef(Arc::clone(&self.handle)),
457                    subscribed_signals,
458                ));
459            } else {
460                self.io_waiter = None;
461                break;
462            }
463        }
464    }
465
466    /// Set `async_read_in_progress` to `true`. Return an error if it was already `true`.
467    fn try_enable_async_read(&mut self) -> Result<()> {
468        if self.async_read_in_progress {
469            Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress))
470        } else {
471            self.async_read_in_progress = true;
472            Ok(())
473        }
474    }
475
476    /// Set `async_read_in_progress` to `false`. Return an error if it was already `false`.
477    fn try_disable_async_read(&mut self) -> Result<()> {
478        if !self.async_read_in_progress {
479            Err(proto::Error::NoReadInProgress(proto::NoReadInProgress))
480        } else {
481            self.async_read_in_progress = false;
482            Ok(())
483        }
484    }
485
486    /// Handle events from the front of the read queue.
487    fn process_read_queue(
488        &mut self,
489        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
490        ctx: &mut Context<'_>,
491    ) {
492        while let Poll::Ready(op) = self.read_queue.front_mut(ctx) {
493            match op {
494                ReadOp::StreamingChannel(tid, true) => {
495                    let tid = *tid;
496                    let result = self.try_enable_async_read();
497                    event_queue
498                        .push_back(FDomainEvent::ChannelStreamingReadStart(tid, result).into());
499                    self.read_queue.destroy_front();
500                }
501                ReadOp::StreamingChannel(tid, false) => {
502                    let tid = *tid;
503                    let result = self.try_disable_async_read();
504                    event_queue
505                        .push_back(FDomainEvent::ChannelStreamingReadStop(tid, result).into());
506                    self.read_queue.destroy_front();
507                }
508                ReadOp::StreamingSocket(tid, true) => {
509                    let tid = *tid;
510                    let result = self.try_enable_async_read();
511                    event_queue
512                        .push_back(FDomainEvent::SocketStreamingReadStart(tid, result).into());
513                    self.read_queue.destroy_front();
514                }
515                ReadOp::StreamingSocket(tid, false) => {
516                    let tid = *tid;
517                    let result = self.try_disable_async_read();
518                    event_queue
519                        .push_back(FDomainEvent::SocketStreamingReadStop(tid, result).into());
520                    self.read_queue.destroy_front();
521                }
522                ReadOp::Socket(tid, max_bytes) => {
523                    let (tid, max_bytes) = (*tid, *max_bytes);
524                    if let Some(event) = self.do_read_socket(tid, max_bytes) {
525                        let _ = self.read_queue.pop_front(ctx);
526                        event_queue.push_back(event.into());
527                    } else {
528                        break;
529                    }
530                }
531                ReadOp::Channel(tid) => {
532                    let tid = *tid;
533                    if let Some(event) = self.do_read_channel(tid) {
534                        let _ = self.read_queue.pop_front(ctx);
535                        event_queue.push_back(event.into());
536                    } else {
537                        break;
538                    }
539                }
540            }
541        }
542
543        if self.async_read_in_progress {
544            // We should have error'd out of any blocking operations if we had a
545            // read in progress.
546            assert!(self.read_queue.is_empty());
547            self.process_async_read(event_queue);
548        }
549    }
550
551    fn process_async_read(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>) {
552        assert!(self.async_read_in_progress);
553
554        match &*self.handle {
555            AnyHandle::Channel(_) => {
556                'read_loop: while let Some(result) = self.handle.read_channel().transpose() {
557                    match result {
558                        Ok(msg) => event_queue.push_back(
559                            UnprocessedFDomainEvent::ChannelStreamingData(self.hid, msg),
560                        ),
561                        Err(e) => {
562                            event_queue.push_back(
563                                FDomainEvent::ChannelStreamingData(
564                                    proto::ChannelOnChannelStreamingDataRequest {
565                                        handle: self.hid,
566                                        channel_sent: proto::ChannelSent::Stopped(
567                                            proto::AioStopped { error: Some(Box::new(e)) },
568                                        ),
569                                    },
570                                )
571                                .into(),
572                            );
573                            self.async_read_in_progress = false;
574                            break 'read_loop;
575                        }
576                    }
577                }
578            }
579
580            AnyHandle::Socket(_) => {
581                'read_loop: while let Some(result) =
582                    self.handle.read_socket(ASYNC_READ_BUFSIZE).transpose()
583                {
584                    match result {
585                        Ok(data) => {
586                            event_queue.push_back(
587                                FDomainEvent::SocketStreamingData(
588                                    proto::SocketOnSocketStreamingDataRequest {
589                                        handle: self.hid,
590                                        socket_message: proto::SocketMessage::Data(
591                                            proto::SocketData {
592                                                data,
593                                                is_datagram: self.is_datagram_socket,
594                                            },
595                                        ),
596                                    },
597                                )
598                                .into(),
599                            );
600                        }
601                        Err(e) => {
602                            event_queue.push_back(
603                                FDomainEvent::SocketStreamingData(
604                                    proto::SocketOnSocketStreamingDataRequest {
605                                        handle: self.hid,
606                                        socket_message: proto::SocketMessage::Stopped(
607                                            proto::AioStopped { error: Some(Box::new(e)) },
608                                        ),
609                                    },
610                                )
611                                .into(),
612                            );
613                            self.async_read_in_progress = false;
614                            break 'read_loop;
615                        }
616                    }
617                }
618            }
619
620            _ => unreachable!("Processed async read for unreadable handle type!"),
621        }
622    }
623
624    /// Handle events from the front of the write queue.
625    fn process_write_queue(
626        &mut self,
627        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
628        ctx: &mut Context<'_>,
629    ) {
630        // We want to mutate and *maybe* pop the front of the write queue, but
631        // lifetime shenanigans mean we can't do that and also access `self`,
632        // which we need. So we pop the item always, and then maybe push it to
633        // the front again if we didn't actually want to pop it.
634        while let Poll::Ready(op) = self.write_queue.pop_front(ctx) {
635            match op {
636                WriteOp::Socket(mut op) => {
637                    if let Some(event) = self.do_write_socket(&mut op) {
638                        event_queue.push_back(event.into());
639                    } else {
640                        self.write_queue.push_front_no_wake(WriteOp::Socket(op));
641                        break;
642                    }
643                }
644                WriteOp::SetDisposition(tid, disposition, disposition_peer) => {
645                    let result = { self.handle.socket_disposition(disposition, disposition_peer) };
646                    event_queue.push_back(FDomainEvent::SocketDispositionSet(tid, result).into())
647                }
648                WriteOp::Channel(tid, data, mut handles) => {
649                    if self
650                        .do_write_channel(tid, &data, &mut handles, event_queue, ctx)
651                        .is_pending()
652                    {
653                        self.write_queue.push_front_no_wake(WriteOp::Channel(tid, data, handles));
654                        break;
655                    }
656                }
657            }
658        }
659    }
660
661    /// Attempt to read from the handle in this [`HandleState`] as if it were a
662    /// socket. If the read succeeds or produces an error that should not be
663    /// retried, produce an [`FDomainEvent`] containing the result.
664    fn do_read_socket(&mut self, tid: NonZeroU32, max_bytes: u64) -> Option<FDomainEvent> {
665        if self.async_read_in_progress {
666            return Some(
667                FDomainEvent::SocketData(
668                    tid,
669                    Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
670                )
671                .into(),
672            );
673        }
674
675        let max_bytes = if self.is_datagram_socket {
676            let AnyHandle::Socket(s) = &*self.handle else {
677                unreachable!("Read socket from state that wasn't for a socket!");
678            };
679            match s.info() {
680                Ok(x) => x.rx_buf_available as u64,
681                // We should always succeed. The only failures are if we don't
682                // have the rights or something's screwed up with the handle. We
683                // know we have the rights because figuring out this was a
684                // datagram socket to begin with meant calling the same call on
685                // the same handle earlier.
686                Err(e) => {
687                    return Some(FDomainEvent::SocketData(
688                        tid,
689                        Err(proto::Error::TargetError(e.into_raw())),
690                    ));
691                }
692            }
693        } else {
694            max_bytes
695        };
696        self.handle.read_socket(max_bytes).transpose().map(|x| {
697            FDomainEvent::SocketData(
698                tid,
699                x.map(|data| proto::SocketData { data, is_datagram: self.is_datagram_socket }),
700            )
701        })
702    }
703
704    /// Attempt to write to the handle in this [`HandleState`] as if it were a
705    /// socket. If the write succeeds or produces an error that should not be
706    /// retried, produce an [`FDomainEvent`] containing the result.
707    fn do_write_socket(&mut self, op: &mut SocketWrite) -> Option<FDomainEvent> {
708        match self.handle.write_socket(&op.to_write) {
709            Ok(wrote) => {
710                op.wrote += wrote;
711                op.to_write.drain(..wrote);
712
713                if op.to_write.is_empty() {
714                    Some(FDomainEvent::WroteSocket(
715                        op.tid,
716                        Ok(proto::SocketWriteSocketResponse {
717                            wrote: op.wrote.try_into().unwrap(),
718                        }),
719                    ))
720                } else {
721                    None
722                }
723            }
724            Err(error) => Some(FDomainEvent::WroteSocket(
725                op.tid,
726                Err(proto::WriteSocketError { error, wrote: op.wrote.try_into().unwrap() }),
727            )),
728        }
729    }
730
731    /// Attempt to write to the handle in this [`HandleState`] as if it were a
732    /// channel. If the write succeeds or produces an error that should not be
733    /// retried, produce an [`FDomainEvent`] containing the result.
734    fn do_write_channel(
735        &mut self,
736        tid: NonZeroU32,
737        data: &[u8],
738        handles: &mut HandlesToWrite,
739        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
740        ctx: &mut Context<'_>,
741    ) -> Poll<()> {
742        let Poll::Ready(handles) = handles.poll_ready(event_queue, ctx) else {
743            return Poll::Pending;
744        };
745
746        let ret = self.handle.write_channel(data, handles);
747        if let Some(ret) = ret {
748            event_queue.push_back(FDomainEvent::WroteChannel(tid, ret).into())
749        }
750        Poll::Ready(())
751    }
752
753    /// Attempt to read from the handle in this [`HandleState`] as if it were a
754    /// channel. If the read succeeds or produces an error that should not be
755    /// retried, produce an [`FDomainEvent`] containing the result.
756    fn do_read_channel(&mut self, tid: NonZeroU32) -> Option<UnprocessedFDomainEvent> {
757        if self.async_read_in_progress {
758            return Some(
759                FDomainEvent::ChannelData(
760                    tid,
761                    Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
762                )
763                .into(),
764            );
765        }
766        match self.handle.read_channel() {
767            Ok(x) => x.map(|x| UnprocessedFDomainEvent::ChannelData(tid, x)),
768            Err(e) => Some(FDomainEvent::ChannelData(tid, Err(e)).into()),
769        }
770    }
771}
772
773/// State for a handle which is closing, but which needs its read and write
774/// queues flushed first.
775struct ClosingHandle {
776    action: Arc<CloseAction>,
777    state: Option<ShuttingDownHandle>,
778}
779
780impl ClosingHandle {
781    fn poll_ready(&mut self, fdomain: &mut FDomain, ctx: &mut Context<'_>) -> Poll<()> {
782        if let Some(state) = self.state.as_mut() {
783            if state.poll_ready(&mut fdomain.event_queue, ctx).is_ready() {
784                let state = self.state.take().unwrap();
785                let ShuttingDownHandle::Ready(handle) = state else {
786                    unreachable!();
787                };
788                self.action.perform(fdomain, handle);
789                Poll::Ready(())
790            } else {
791                Poll::Pending
792            }
793        } else {
794            Poll::Ready(())
795        }
796    }
797}
798
799/// When the client requests a handle to be closed or moved or otherwise
800/// destroyed, it goes into limbo for a bit while pending read and write actions
801/// are flushed. This is how we mark what should happen to the handle after that
802/// period ends.
803enum CloseAction {
804    Close { tid: NonZeroU32, count: AtomicU32, result: Result<()> },
805    Replace { tid: NonZeroU32, new_hid: proto::NewHandleId, rights: fidl::Rights },
806}
807
808impl CloseAction {
809    fn perform(&self, fdomain: &mut FDomain, handle: AnyHandle) {
810        match self {
811            CloseAction::Close { tid, count, result } => {
812                if count.fetch_sub(1, Ordering::Relaxed) == 1 {
813                    fdomain
814                        .event_queue
815                        .push_back(FDomainEvent::ClosedHandle(*tid, result.clone()).into());
816                }
817            }
818            CloseAction::Replace { tid, new_hid, rights } => {
819                let result = handle
820                    .replace(*rights)
821                    .and_then(|handle| fdomain.alloc_client_handles([*new_hid], [handle]));
822                fdomain.event_queue.push_back(FDomainEvent::ReplacedHandle(*tid, result).into());
823            }
824        }
825    }
826}
827
828/// This is a container of handles that is manipulable via the FDomain protocol.
829/// See [RFC-0228].
830///
831/// Most of the methods simply handle FIDL requests from the FDomain protocol.
832#[pin_project::pin_project]
833pub struct FDomain {
834    namespace: Box<dyn Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send>,
835    handles: HashMap<proto::HandleId, HandleState>,
836    closing_handles: Vec<ClosingHandle>,
837    event_queue: VecDeque<UnprocessedFDomainEvent>,
838    waker: Option<Waker>,
839}
840
841impl FDomain {
842    /// Create a new FDomain. The new FDomain is empty and ready to be connected
843    /// to by a client.
844    pub fn new_empty() -> Self {
845        Self::new(|| Err(fidl::Status::NOT_FOUND))
846    }
847
848    /// Create a new FDomain populated with the given namespace entries.
849    pub fn new(
850        namespace: impl Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send + 'static,
851    ) -> Self {
852        FDomain {
853            namespace: Box::new(namespace),
854            handles: HashMap::new(),
855            closing_handles: Vec::new(),
856            event_queue: VecDeque::new(),
857            waker: None,
858        }
859    }
860
861    /// Add an event to be emitted by this FDomain.
862    fn push_event(&mut self, event: impl Into<UnprocessedFDomainEvent>) {
863        self.event_queue.push_back(event.into());
864        self.waker.take().map(Waker::wake);
865    }
866
867    /// Given a [`fidl::MessageBufEtc`], load all of the handles from it into this
868    /// FDomain and return a [`ReadChannelPayload`](proto::ReadChannelPayload)
869    /// with the same data and the IDs for the handles.
870    fn process_message(
871        &mut self,
872        message: fidl::MessageBufEtc,
873    ) -> Result<proto::ChannelMessage, proto::Error> {
874        let (data, handles) = message.split();
875        let handles = handles
876            .into_iter()
877            .map(|info| {
878                let type_ = info.object_type;
879
880                let handle = match info.object_type {
881                    fidl::ObjectType::CHANNEL => {
882                        AnyHandle::Channel(fidl::Channel::from(info.handle))
883                    }
884                    fidl::ObjectType::SOCKET => AnyHandle::Socket(fidl::Socket::from(info.handle)),
885                    fidl::ObjectType::EVENTPAIR => {
886                        AnyHandle::EventPair(fidl::EventPair::from(info.handle))
887                    }
888                    fidl::ObjectType::EVENT => AnyHandle::Event(fidl::Event::from(info.handle)),
889                    _ => AnyHandle::Unknown(handles::Unknown(info.handle, info.object_type)),
890                };
891
892                Ok(proto::HandleInfo {
893                    rights: info.rights,
894                    handle: self.alloc_fdomain_handle(handle)?,
895                    type_,
896                })
897            })
898            .collect::<Result<Vec<_>, proto::Error>>()?;
899
900        Ok(proto::ChannelMessage { data, handles })
901    }
902
903    /// Allocate `N` new handle IDs. These are allocated from
904    /// [`NewHandleId`](proto::NewHandleId) and are expected to follow the protocol
905    /// rules for client-allocated handle IDs.
906    ///
907    /// If any of the handles passed fail to allocate, none of the handles will
908    /// be allocated.
909    fn alloc_client_handles<const N: usize>(
910        &mut self,
911        ids: [proto::NewHandleId; N],
912        handles: [AnyHandle; N],
913    ) -> Result<(), proto::Error> {
914        for id in ids {
915            if id.id & (1 << 31) != 0 {
916                return Err(proto::Error::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange {
917                    id: id.id,
918                }));
919            }
920
921            if self.handles.contains_key(&proto::HandleId { id: id.id }) {
922                return Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
923                    id: id.id,
924                    same_call: false,
925                }));
926            }
927        }
928
929        let mut sorted_ids = ids;
930        sorted_ids.sort();
931
932        if let Some(a) = sorted_ids.windows(2).find(|x| x[0] == x[1]) {
933            Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
934                id: a[0].id,
935                same_call: true,
936            }))
937        } else {
938            let ids = ids.into_iter().map(|id| proto::HandleId { id: id.id });
939            let handles = ids
940                .zip(handles.into_iter())
941                .map(|(id, h)| HandleState::new(h, id).map(|x| (id, x)))
942                .collect::<Result<Vec<_>, proto::Error>>()?;
943
944            self.handles.extend(handles);
945
946            Ok(())
947        }
948    }
949
950    /// Allocate a new handle ID. These are allocated internally and are
951    /// expected to follow the protocol rules for FDomain-allocated handle IDs.
952    fn alloc_fdomain_handle(&mut self, handle: AnyHandle) -> Result<proto::HandleId, proto::Error> {
953        loop {
954            let id = proto::HandleId { id: rand::random::<u32>() | (1u32 << 31) };
955            if let Entry::Vacant(v) = self.handles.entry(id) {
956                v.insert(HandleState::new(handle, id)?);
957                break Ok(id);
958            }
959        }
960    }
961
962    /// If a handle exists in this FDomain, remove it.
963    fn take_handle(&mut self, handle: proto::HandleId) -> Result<HandleState, proto::Error> {
964        self.handles
965            .remove(&handle)
966            .ok_or(proto::Error::BadHandleId(proto::BadHandleId { id: handle.id }))
967    }
968
969    /// Use a handle in our handle table, if it exists.
970    fn using_handle<T>(
971        &mut self,
972        id: proto::HandleId,
973        f: impl FnOnce(&mut HandleState) -> Result<T, proto::Error>,
974    ) -> Result<T, proto::Error> {
975        if let Some(s) = self.handles.get_mut(&id) {
976            f(s)
977        } else {
978            Err(proto::Error::BadHandleId(proto::BadHandleId { id: id.id }))
979        }
980    }
981
982    pub fn get_namespace(&mut self, request: proto::FDomainGetNamespaceRequest) -> Result<()> {
983        match (self.namespace)() {
984            Ok(endpoint) => self.alloc_client_handles(
985                [request.new_handle],
986                [AnyHandle::Channel(endpoint.into_channel())],
987            ),
988            Err(e) => Err(proto::Error::TargetError(e.into_raw())),
989        }
990    }
991
992    pub fn create_channel(&mut self, request: proto::ChannelCreateChannelRequest) -> Result<()> {
993        let (a, b) = fidl::Channel::create();
994        self.alloc_client_handles(request.handles, [AnyHandle::Channel(a), AnyHandle::Channel(b)])
995    }
996
997    pub fn create_socket(&mut self, request: proto::SocketCreateSocketRequest) -> Result<()> {
998        let (a, b) = match request.options {
999            proto::SocketType::Stream => fidl::Socket::create_stream(),
1000            proto::SocketType::Datagram => fidl::Socket::create_datagram(),
1001            type_ => {
1002                return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown { type_ }));
1003            }
1004        };
1005
1006        self.alloc_client_handles(request.handles, [AnyHandle::Socket(a), AnyHandle::Socket(b)])
1007    }
1008
1009    pub fn create_event_pair(
1010        &mut self,
1011        request: proto::EventPairCreateEventPairRequest,
1012    ) -> Result<()> {
1013        let (a, b) = fidl::EventPair::create();
1014        self.alloc_client_handles(
1015            request.handles,
1016            [AnyHandle::EventPair(a), AnyHandle::EventPair(b)],
1017        )
1018    }
1019
1020    pub fn create_event(&mut self, request: proto::EventCreateEventRequest) -> Result<()> {
1021        let a = fidl::Event::create();
1022        self.alloc_client_handles([request.handle], [AnyHandle::Event(a)])
1023    }
1024
1025    pub fn set_socket_disposition(
1026        &mut self,
1027        tid: NonZeroU32,
1028        request: proto::SocketSetSocketDispositionRequest,
1029    ) {
1030        if let Err(err) = self.using_handle(request.handle, |h| {
1031            h.write_queue.push_back(WriteOp::SetDisposition(
1032                tid,
1033                request.disposition,
1034                request.disposition_peer,
1035            ));
1036            Ok(())
1037        }) {
1038            self.push_event(FDomainEvent::SocketDispositionSet(tid, Err(err)));
1039        }
1040    }
1041
1042    pub fn read_socket(&mut self, tid: NonZeroU32, request: proto::SocketReadSocketRequest) {
1043        if let Err(e) = self.using_handle(request.handle, |h| {
1044            h.read_queue.push_back(ReadOp::Socket(tid, request.max_bytes));
1045            Ok(())
1046        }) {
1047            self.push_event(FDomainEvent::SocketData(tid, Err(e)));
1048        }
1049    }
1050
1051    pub fn read_channel(&mut self, tid: NonZeroU32, request: proto::ChannelReadChannelRequest) {
1052        if let Err(e) = self.using_handle(request.handle, |h| {
1053            h.read_queue.push_back(ReadOp::Channel(tid));
1054            Ok(())
1055        }) {
1056            self.push_event(FDomainEvent::ChannelData(tid, Err(e)));
1057        }
1058    }
1059
1060    pub fn write_socket(&mut self, tid: NonZeroU32, request: proto::SocketWriteSocketRequest) {
1061        if let Err(error) = self.using_handle(request.handle, |h| {
1062            h.write_queue.push_back(WriteOp::Socket(SocketWrite {
1063                tid,
1064                wrote: 0,
1065                to_write: request.data,
1066            }));
1067            Ok(())
1068        }) {
1069            self.push_event(FDomainEvent::WroteSocket(
1070                tid,
1071                Err(proto::WriteSocketError { error, wrote: 0 }),
1072            ));
1073        }
1074    }
1075
1076    pub fn write_channel(&mut self, tid: NonZeroU32, request: proto::ChannelWriteChannelRequest) {
1077        // Go through the list of handles in the requests (which will either be
1078        // a simple list of handles or a list of HandleDispositions) and obtain
1079        // for each a `ShuttingDownHandle` which contains our handle state (the
1080        // "Shutting down" refers to the fact that we're pulling the handle out
1081        // of the FDomain in order to send it) and the rights the requester
1082        // would like the handle to have upon arrival at the other end of the
1083        // channel.
1084        let handles: Vec<Result<(ShuttingDownHandle, fidl::Rights)>> = match request.handles {
1085            proto::Handles::Handles(h) => h
1086                .into_iter()
1087                .map(|h| {
1088                    if h != request.handle {
1089                        self.take_handle(h).map(|handle_state| {
1090                            (ShuttingDownHandle::InUse(h, handle_state), fidl::Rights::SAME_RIGHTS)
1091                        })
1092                    } else {
1093                        Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1094                    }
1095                })
1096                .collect(),
1097            proto::Handles::Dispositions(d) => d
1098                .into_iter()
1099                .map(|d| {
1100                    let res = match d.handle {
1101                        proto::HandleOp::Move_(h) => {
1102                            if h != request.handle {
1103                                self.take_handle(h).map(|x| ShuttingDownHandle::InUse(h, x))
1104                            } else {
1105                                Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1106                            }
1107                        }
1108                        proto::HandleOp::Duplicate(h) => {
1109                            if h != request.handle {
1110                                // If the requester wants us to duplicate the
1111                                // handle, we do so now rather than letting
1112                                // `write_etc` do it. Otherwise we have to use a
1113                                // reference to the handle, and we get lifetime
1114                                // hell.
1115                                self.using_handle(h, |h| {
1116                                    h.handle.duplicate(fidl::Rights::SAME_RIGHTS)
1117                                })
1118                                .map(ShuttingDownHandle::Ready)
1119                            } else {
1120                                Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1121                            }
1122                        }
1123                    };
1124
1125                    res.and_then(|x| Ok((x, d.rights)))
1126                })
1127                .collect(),
1128        };
1129
1130        if handles.iter().any(|x| x.is_err()) {
1131            let e = handles.into_iter().map(|x| x.err().map(Box::new)).collect();
1132
1133            self.push_event(FDomainEvent::WroteChannel(
1134                tid,
1135                Err(proto::WriteChannelError::OpErrors(e)),
1136            ));
1137            return;
1138        }
1139
1140        let handles = handles.into_iter().map(|x| x.unwrap()).collect::<Vec<_>>();
1141
1142        if let Err(e) = self.using_handle(request.handle, |h| {
1143            h.write_queue.push_back(WriteOp::Channel(
1144                tid,
1145                request.data,
1146                HandlesToWrite::SomeInUse(handles),
1147            ));
1148            Ok(())
1149        }) {
1150            self.push_event(FDomainEvent::WroteChannel(
1151                tid,
1152                Err(proto::WriteChannelError::Error(e)),
1153            ));
1154        }
1155    }
1156
1157    pub fn wait_for_signals(
1158        &mut self,
1159        tid: NonZeroU32,
1160        request: proto::FDomainWaitForSignalsRequest,
1161    ) {
1162        let result = self.using_handle(request.handle, |h| {
1163            let signals = fidl::Signals::from_bits_retain(request.signals);
1164            h.signal_waiters.push(SignalWaiter {
1165                tid,
1166                waiter: OnSignals::new(AnyHandleRef(Arc::clone(&h.handle)), signals),
1167            });
1168            Ok(())
1169        });
1170
1171        if let Err(e) = result {
1172            self.push_event(FDomainEvent::WaitForSignals(tid, Err(e)));
1173        } else {
1174            self.waker.take().map(Waker::wake);
1175        }
1176    }
1177
1178    pub fn close(&mut self, tid: NonZeroU32, request: proto::FDomainCloseRequest) {
1179        let mut states = Vec::with_capacity(request.handles.len());
1180        let mut result = Ok(());
1181        for hid in request.handles {
1182            match self.take_handle(hid) {
1183                Ok(state) => states.push((hid, state)),
1184
1185                Err(e) => {
1186                    result = result.and(Err(e));
1187                }
1188            }
1189        }
1190
1191        let action = Arc::new(CloseAction::Close {
1192            tid,
1193            count: AtomicU32::new(states.len().try_into().unwrap()),
1194            result,
1195        });
1196
1197        for (hid, state) in states {
1198            self.closing_handles.push(ClosingHandle {
1199                action: Arc::clone(&action),
1200                state: Some(ShuttingDownHandle::InUse(hid, state)),
1201            });
1202        }
1203    }
1204
1205    pub fn duplicate(&mut self, request: proto::FDomainDuplicateRequest) -> Result<()> {
1206        let rights = request.rights;
1207        let handle = self.using_handle(request.handle, |h| h.handle.duplicate(rights));
1208        handle.and_then(|h| self.alloc_client_handles([request.new_handle], [h]))
1209    }
1210
1211    pub fn replace(
1212        &mut self,
1213        tid: NonZeroU32,
1214        request: proto::FDomainReplaceRequest,
1215    ) -> Result<()> {
1216        let rights = request.rights;
1217        let new_hid = request.new_handle;
1218        match self.take_handle(request.handle) {
1219            Ok(state) => self.closing_handles.push(ClosingHandle {
1220                action: Arc::new(CloseAction::Replace { tid, new_hid, rights }),
1221                state: Some(ShuttingDownHandle::InUse(request.handle, state)),
1222            }),
1223            Err(e) => self.event_queue.push_back(UnprocessedFDomainEvent::Ready(
1224                FDomainEvent::ReplacedHandle(tid, Err(e)),
1225            )),
1226        }
1227
1228        Ok(())
1229    }
1230
1231    pub fn signal(&mut self, request: proto::FDomainSignalRequest) -> Result<()> {
1232        let set = fidl::Signals::from_bits_retain(request.set);
1233        let clear = fidl::Signals::from_bits_retain(request.clear);
1234
1235        self.using_handle(request.handle, |h| h.handle.signal(clear, set))
1236    }
1237
1238    pub fn signal_peer(&mut self, request: proto::FDomainSignalPeerRequest) -> Result<()> {
1239        let set = fidl::Signals::from_bits_retain(request.set);
1240        let clear = fidl::Signals::from_bits_retain(request.clear);
1241
1242        self.using_handle(request.handle, |h| h.handle.signal_peer(clear, set))
1243    }
1244
1245    pub fn get_koid(
1246        &mut self,
1247        request: proto::FDomainGetKoidRequest,
1248    ) -> Result<proto::FDomainGetKoidResponse> {
1249        self.using_handle(request.handle, |h| {
1250            h.handle
1251                .as_handle_ref()
1252                .koid()
1253                .map(|k| proto::FDomainGetKoidResponse { koid: k.raw_koid() })
1254                .map_err(|e| proto::Error::TargetError(e.into_raw()))
1255        })
1256    }
1257
1258    pub fn read_channel_streaming_start(
1259        &mut self,
1260        tid: NonZeroU32,
1261        request: proto::ChannelReadChannelStreamingStartRequest,
1262    ) {
1263        if let Err(err) = self.using_handle(request.handle, |h| {
1264            h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1265            h.read_queue.push_back(ReadOp::StreamingChannel(tid, true));
1266            Ok(())
1267        }) {
1268            self.event_queue
1269                .push_back(FDomainEvent::ChannelStreamingReadStart(tid, Err(err)).into())
1270        }
1271    }
1272
1273    pub fn read_channel_streaming_stop(
1274        &mut self,
1275        tid: NonZeroU32,
1276        request: proto::ChannelReadChannelStreamingStopRequest,
1277    ) {
1278        if let Err(err) = self.using_handle(request.handle, |h| {
1279            h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1280            h.read_queue.push_back(ReadOp::StreamingChannel(tid, false));
1281            Ok(())
1282        }) {
1283            self.event_queue.push_back(FDomainEvent::ChannelStreamingReadStop(tid, Err(err)).into())
1284        }
1285    }
1286
1287    pub fn read_socket_streaming_start(
1288        &mut self,
1289        tid: NonZeroU32,
1290        request: proto::SocketReadSocketStreamingStartRequest,
1291    ) {
1292        if let Err(err) = self.using_handle(request.handle, |h| {
1293            h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1294            h.read_queue.push_back(ReadOp::StreamingSocket(tid, true));
1295            Ok(())
1296        }) {
1297            self.event_queue.push_back(FDomainEvent::SocketStreamingReadStart(tid, Err(err)).into())
1298        }
1299    }
1300
1301    pub fn read_socket_streaming_stop(
1302        &mut self,
1303        tid: NonZeroU32,
1304        request: proto::SocketReadSocketStreamingStopRequest,
1305    ) {
1306        if let Err(err) = self.using_handle(request.handle, |h| {
1307            h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1308            h.read_queue.push_back(ReadOp::StreamingSocket(tid, false));
1309            Ok(())
1310        }) {
1311            self.event_queue.push_back(FDomainEvent::SocketStreamingReadStop(tid, Err(err)).into())
1312        }
1313    }
1314}
1315
1316/// [`FDomain`] implements a stream of events, for protocol events and for
1317/// replies to long-running methods.
1318impl futures::Stream for FDomain {
1319    type Item = FDomainEvent;
1320
1321    fn poll_next(
1322        mut self: std::pin::Pin<&mut Self>,
1323        ctx: &mut Context<'_>,
1324    ) -> Poll<Option<Self::Item>> {
1325        let this = &mut *self;
1326
1327        let mut closing_handles = std::mem::replace(&mut this.closing_handles, Vec::new());
1328        closing_handles.retain_mut(|x| x.poll_ready(this, ctx).is_pending());
1329        this.closing_handles = closing_handles;
1330
1331        let handles = &mut this.handles;
1332        let event_queue = &mut this.event_queue;
1333        for state in handles.values_mut() {
1334            state.poll(event_queue, ctx);
1335        }
1336
1337        if let Some(event) = self.event_queue.pop_front() {
1338            match event {
1339                UnprocessedFDomainEvent::Ready(event) => Poll::Ready(Some(event)),
1340                UnprocessedFDomainEvent::ChannelData(tid, message) => {
1341                    Poll::Ready(Some(FDomainEvent::ChannelData(tid, self.process_message(message))))
1342                }
1343                UnprocessedFDomainEvent::ChannelStreamingData(hid, message) => {
1344                    match self.process_message(message) {
1345                        Ok(message) => Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1346                            proto::ChannelOnChannelStreamingDataRequest {
1347                                handle: hid,
1348                                channel_sent: proto::ChannelSent::Message(message),
1349                            },
1350                        ))),
1351                        Err(e) => {
1352                            self.handles.get_mut(&hid).unwrap().async_read_in_progress = false;
1353                            Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1354                                proto::ChannelOnChannelStreamingDataRequest {
1355                                    handle: hid,
1356                                    channel_sent: proto::ChannelSent::Stopped(proto::AioStopped {
1357                                        error: Some(Box::new(e)),
1358                                    }),
1359                                },
1360                            )))
1361                        }
1362                    }
1363                }
1364            }
1365        } else {
1366            self.waker = Some(ctx.waker().clone());
1367            Poll::Pending
1368        }
1369    }
1370}