fdomain_container/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::endpoints::ClientEnd;
6use fidl::{AsHandleRef, HandleBased};
7use futures::prelude::*;
8use replace_with::replace_with;
9use std::collections::hash_map::Entry;
10use std::collections::{HashMap, VecDeque};
11use std::num::NonZeroU32;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicU32, Ordering};
14use std::task::{Context, Poll, Waker};
15use {fidl_fuchsia_fdomain as proto, fidl_fuchsia_io as fio, fuchsia_async as fasync};
16
17mod handles;
18pub mod wire;
19
20#[cfg(test)]
21mod test;
22
23pub type Result<T, E = proto::Error> = std::result::Result<T, E>;
24
25use handles::{AnyHandle, HandleType as _, IsDatagramSocket};
26
27/// A queue. Basically just a `VecDeque` except we can asynchronously wait for
28/// an element to pop if it is empty.
29struct Queue<T>(VecDeque<T>, Option<Waker>);
30
31impl<T> Queue<T> {
32    /// Create a new queue.
33    fn new() -> Self {
34        Queue(VecDeque::new(), None)
35    }
36
37    /// Whether the queue is empty.
38    fn is_empty(&self) -> bool {
39        self.0.is_empty()
40    }
41
42    /// Removes and discards the first element in the queue.
43    ///
44    /// # Panics
45    /// There *must* be a first element or this will panic.
46    fn destroy_front(&mut self) {
47        assert!(self.0.pop_front().is_some(), "Expected to find a value!");
48    }
49
50    /// Pop the first element from the queue if available.
51    fn pop_front(&mut self, ctx: &mut Context<'_>) -> Poll<T> {
52        if let Some(t) = self.0.pop_front() {
53            Poll::Ready(t)
54        } else {
55            self.1 = Some(ctx.waker().clone());
56            Poll::Pending
57        }
58    }
59
60    /// Return an element to the front of the queue. Does not wake any waiters
61    /// as it is assumed the waiter is the one who popped it to begin with.
62    ///
63    /// This is used when we'd *like* to use `front_mut` but we can't borrow the
64    /// source of `self` for that long without giving ourselves lifetime
65    /// headaches.
66    fn push_front_no_wake(&mut self, t: T) {
67        self.0.push_front(t)
68    }
69
70    /// Push a new element to the back of the queue.
71    fn push_back(&mut self, t: T) {
72        self.0.push_back(t);
73        self.1.take().map(Waker::wake);
74    }
75
76    /// Get a mutable reference to the first element in the queue.
77    fn front_mut(&mut self, ctx: &mut Context<'_>) -> Poll<&mut T> {
78        if let Some(t) = self.0.front_mut() {
79            Poll::Ready(t)
80        } else {
81            self.1 = Some(ctx.waker().clone());
82            Poll::Pending
83        }
84    }
85}
86
87/// Maximum amount to read for an async socket read.
88const ASYNC_READ_BUFSIZE: u64 = 40960;
89
90/// Wraps the various FIDL Event types that can be produced by an FDomain
91#[derive(Debug)]
92pub enum FDomainEvent {
93    ChannelStreamingReadStart(NonZeroU32, Result<()>),
94    ChannelStreamingReadStop(NonZeroU32, Result<()>),
95    SocketStreamingReadStart(NonZeroU32, Result<()>),
96    SocketStreamingReadStop(NonZeroU32, Result<()>),
97    WaitForSignals(NonZeroU32, Result<proto::FDomainWaitForSignalsResponse>),
98    SocketData(NonZeroU32, Result<proto::SocketData>),
99    SocketStreamingData(proto::SocketOnSocketStreamingDataRequest),
100    SocketDispositionSet(NonZeroU32, Result<()>),
101    WroteSocket(NonZeroU32, Result<proto::SocketWriteSocketResponse, proto::WriteSocketError>),
102    ChannelData(NonZeroU32, Result<proto::ChannelMessage>),
103    ChannelStreamingData(proto::ChannelOnChannelStreamingDataRequest),
104    WroteChannel(NonZeroU32, Result<(), proto::WriteChannelError>),
105    ClosedHandle(NonZeroU32, Result<()>),
106    ReplacedHandle(NonZeroU32, Result<()>),
107}
108
109/// An [`FDomainEvent`] that needs a bit more processing before it can be sent.
110/// I.e. it still contains `fidl::Handle` objects that need to be replaced with
111/// FDomain IDs.
112enum UnprocessedFDomainEvent {
113    Ready(FDomainEvent),
114    ChannelData(NonZeroU32, fidl::MessageBufEtc),
115    ChannelStreamingData(proto::HandleId, fidl::MessageBufEtc),
116}
117
118impl From<FDomainEvent> for UnprocessedFDomainEvent {
119    fn from(other: FDomainEvent) -> UnprocessedFDomainEvent {
120        UnprocessedFDomainEvent::Ready(other)
121    }
122}
123
124/// Operations on a handle which are processed from the read queue.
125enum ReadOp {
126    /// Enable or disable async reads on a channel.
127    StreamingChannel(NonZeroU32, bool),
128    /// Enable or disable async reads on a socket.
129    StreamingSocket(NonZeroU32, bool),
130    Socket(NonZeroU32, u64),
131    Channel(NonZeroU32),
132}
133
134/// An in-progress socket write. It may take multiple syscalls to write to a
135/// socket, so this tracks how many bytes were written already and how many
136/// remain to be written.
137struct SocketWrite {
138    tid: NonZeroU32,
139    wrote: usize,
140    to_write: Vec<u8>,
141}
142
143/// Operations on a handle which are processed from the write queue.
144enum WriteOp {
145    Socket(SocketWrite),
146    Channel(NonZeroU32, Vec<u8>, HandlesToWrite),
147    SetDisposition(NonZeroU32, proto::SocketDisposition, proto::SocketDisposition),
148}
149
150/// A handle which is being moved out of the FDomain by a channel write call or
151/// closure/replacement.  There may still be operations to perform on this
152/// handle, so the write should not proceed while the handle is in the `InUse`
153/// state.
154enum ShuttingDownHandle {
155    InUse(proto::HandleId, HandleState),
156    Ready(AnyHandle),
157}
158
159impl ShuttingDownHandle {
160    fn poll_ready(
161        &mut self,
162        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
163        ctx: &mut Context<'_>,
164    ) -> Poll<()> {
165        replace_with(self, |this| match this {
166            this @ ShuttingDownHandle::Ready(_) => this,
167            ShuttingDownHandle::InUse(hid, mut state) => {
168                state.poll(event_queue, ctx);
169
170                if state.write_queue.is_empty() {
171                    while let Poll::Ready(op) = state.read_queue.pop_front(ctx) {
172                        match op {
173                            ReadOp::StreamingChannel(tid, start) => {
174                                let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
175                                    id: hid.id,
176                                }));
177                                let event = if start {
178                                    FDomainEvent::ChannelStreamingReadStart(tid, err)
179                                } else {
180                                    FDomainEvent::ChannelStreamingReadStop(tid, err)
181                                };
182                                event_queue.push_back(event.into());
183                            }
184                            ReadOp::StreamingSocket(tid, start) => {
185                                let err = Err(proto::Error::BadHandleId(proto::BadHandleId {
186                                    id: hid.id,
187                                }));
188                                let event = if start {
189                                    FDomainEvent::SocketStreamingReadStart(tid, err)
190                                } else {
191                                    FDomainEvent::SocketStreamingReadStop(tid, err)
192                                };
193                                event_queue.push_back(event.into());
194                            }
195                            ReadOp::Channel(tid) => {
196                                let err = state
197                                    .handle
198                                    .expected_type(fidl::ObjectType::CHANNEL)
199                                    .err()
200                                    .unwrap_or(proto::Error::ClosedDuringRead(
201                                        proto::ClosedDuringRead,
202                                    ));
203                                event_queue
204                                    .push_back(FDomainEvent::ChannelData(tid, Err(err)).into());
205                            }
206                            ReadOp::Socket(tid, _max_bytes) => {
207                                let err = state
208                                    .handle
209                                    .expected_type(fidl::ObjectType::SOCKET)
210                                    .err()
211                                    .unwrap_or(proto::Error::ClosedDuringRead(
212                                        proto::ClosedDuringRead,
213                                    ));
214                                event_queue
215                                    .push_back(FDomainEvent::SocketData(tid, Err(err)).into());
216                            }
217                        }
218                    }
219
220                    if state.async_read_in_progress {
221                        match &*state.handle {
222                            AnyHandle::Channel(_) => event_queue.push_back(
223                                FDomainEvent::ChannelStreamingData(
224                                    proto::ChannelOnChannelStreamingDataRequest {
225                                        handle: hid,
226                                        channel_sent: proto::ChannelSent::Stopped(
227                                            proto::AioStopped { error: None },
228                                        ),
229                                    },
230                                )
231                                .into(),
232                            ),
233                            AnyHandle::Socket(_) => event_queue.push_back(
234                                FDomainEvent::SocketStreamingData(
235                                    proto::SocketOnSocketStreamingDataRequest {
236                                        handle: hid,
237                                        socket_message: proto::SocketMessage::Stopped(
238                                            proto::AioStopped { error: None },
239                                        ),
240                                    },
241                                )
242                                .into(),
243                            ),
244                            AnyHandle::EventPair(_)
245                            | AnyHandle::Event(_)
246                            | AnyHandle::Unknown(_) => unreachable!(),
247                        }
248                    }
249
250                    state.signal_waiters.clear();
251                    state.io_waiter = None;
252
253                    ShuttingDownHandle::Ready(
254                        Arc::into_inner(state.handle).expect("Unaccounted-for handle reference!"),
255                    )
256                } else {
257                    ShuttingDownHandle::InUse(hid, state)
258                }
259            }
260        });
261
262        if matches!(self, ShuttingDownHandle::Ready(_)) { Poll::Ready(()) } else { Poll::Pending }
263    }
264}
265
266/// A vector of [`ShuttingDownHandle`] paired with rights for the new handles, which
267/// can transition into being a vector of [`fidl::HandleDisposition`] when all the
268/// handles are ready.
269enum HandlesToWrite {
270    SomeInUse(Vec<(ShuttingDownHandle, fidl::Rights)>),
271    AllReady(Vec<fidl::HandleDisposition<'static>>),
272}
273
274impl HandlesToWrite {
275    fn poll_ready(
276        &mut self,
277        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
278        ctx: &mut Context<'_>,
279    ) -> Poll<&mut Vec<fidl::HandleDisposition<'static>>> {
280        match self {
281            HandlesToWrite::AllReady(s) => Poll::Ready(s),
282            HandlesToWrite::SomeInUse(handles) => {
283                let mut ready = true;
284                for (handle, _) in handles.iter_mut() {
285                    ready = ready && handle.poll_ready(event_queue, ctx).is_ready();
286                }
287
288                if !ready {
289                    return Poll::Pending;
290                }
291
292                *self = HandlesToWrite::AllReady(
293                    handles
294                        .drain(..)
295                        .map(|(handle, rights)| {
296                            let ShuttingDownHandle::Ready(handle) = handle else { unreachable!() };
297
298                            fidl::HandleDisposition::new(
299                                fidl::HandleOp::Move(handle.into()),
300                                fidl::ObjectType::NONE,
301                                rights,
302                                fidl::Status::OK,
303                            )
304                        })
305                        .collect(),
306                );
307
308                let HandlesToWrite::AllReady(s) = self else { unreachable!() };
309                Poll::Ready(s)
310            }
311        }
312    }
313}
314
315struct AnyHandleRef(Arc<AnyHandle>);
316
317impl AsHandleRef for AnyHandleRef {
318    fn as_handle_ref(&self) -> fidl::HandleRef<'_> {
319        self.0.as_handle_ref()
320    }
321}
322
323#[cfg(target_os = "fuchsia")]
324type OnSignals = fasync::OnSignals<'static, AnyHandleRef>;
325
326#[cfg(not(target_os = "fuchsia"))]
327type OnSignals = fasync::OnSignalsRef<'static>;
328
329/// Represents a `WaitForSignals` transaction from a client. When the contained
330/// `OnSignals` polls to completion we can reply to the transaction.
331struct SignalWaiter {
332    tid: NonZeroU32,
333    waiter: OnSignals,
334}
335
336/// Information about a single handle within the [`FDomain`].
337struct HandleState {
338    /// The handle itself.
339    handle: Arc<AnyHandle>,
340    /// Our handle ID.
341    hid: proto::HandleId,
342    /// Whether this is a datagram socket. We have to handle data coming out of
343    /// datagram sockets a bit differently to preserve their semantics from the
344    /// perspective of the host and avoid data loss.
345    is_datagram_socket: bool,
346    /// Indicates we are sending `On*StreamingData` events to the client
347    /// presently. It is an error for the user to try to move the handle out of
348    /// the FDomain (e.g. send it through a channel or close it) until after
349    /// they request that streaming events stop.
350    async_read_in_progress: bool,
351    /// Queue of client requests to read from the handle. We have to queue read
352    /// requests because they may block, and we don't want to block the event
353    /// loop or be unable to handle further requests while a long read request
354    /// is blocking. Also we want to retire read requests in the order they were
355    /// submitted, otherwise pipelined reads could return data in a strange order.
356    read_queue: Queue<ReadOp>,
357    /// Queue of client requests to write to the handle. We have to queue write
358    /// requests for the same reason we have to queue read requests. Since we
359    /// process the queue one at a time, we need a separate queue for writes
360    /// otherwise we'd effectively make handles half-duplex, with read requests
361    /// unable to proceed if a write request is blocked at the head of the
362    /// queue.
363    write_queue: Queue<WriteOp>,
364    /// List of outstanding `WaitForSignals` transactions.
365    signal_waiters: Vec<SignalWaiter>,
366    /// Contains a waiter on this handle for IO reading and writing. Populated
367    /// whenever we need to block on IO to service a request.
368    io_waiter: Option<OnSignals>,
369}
370
371impl HandleState {
372    fn new(handle: AnyHandle, hid: proto::HandleId) -> Result<Self, proto::Error> {
373        let is_datagram_socket = match handle.is_datagram_socket() {
374            IsDatagramSocket::Unknown => {
375                return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown {
376                    type_: proto::SocketType::unknown(),
377                }));
378            }
379            other => other.is_datagram(),
380        };
381        Ok(HandleState {
382            handle: Arc::new(handle),
383            hid,
384            async_read_in_progress: false,
385            is_datagram_socket,
386            read_queue: Queue::new(),
387            write_queue: Queue::new(),
388            signal_waiters: Vec::new(),
389            io_waiter: None,
390        })
391    }
392
393    /// Poll this handle state. Lets us handle our IO queues and wait for the
394    /// next IO event.
395    fn poll(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>, ctx: &mut Context<'_>) {
396        self.signal_waiters.retain_mut(|x| {
397            let Poll::Ready(result) = x.waiter.poll_unpin(ctx) else {
398                return true;
399            };
400
401            event_queue.push_back(
402                FDomainEvent::WaitForSignals(
403                    x.tid,
404                    result
405                        .map(|x| proto::FDomainWaitForSignalsResponse { signals: x.bits() })
406                        .map_err(|e| proto::Error::TargetError(e.into_raw())),
407                )
408                .into(),
409            );
410
411            false
412        });
413
414        let read_signals = self.handle.read_signals();
415        let write_signals = self.handle.write_signals();
416
417        loop {
418            if let Some(signal_waiter) = self.io_waiter.as_mut() {
419                if let Poll::Ready(sigs) = signal_waiter.poll_unpin(ctx) {
420                    if let Ok(sigs) = sigs {
421                        if sigs.intersects(read_signals) {
422                            self.process_read_queue(event_queue, ctx);
423                        }
424                        if sigs.intersects(write_signals) {
425                            self.process_write_queue(event_queue, ctx);
426                        }
427                    }
428                } else {
429                    let need_read = matches!(
430                        self.read_queue.front_mut(ctx),
431                        Poll::Ready(ReadOp::StreamingChannel(_, _) | ReadOp::StreamingSocket(_, _))
432                    );
433                    let need_write = matches!(
434                        self.write_queue.front_mut(ctx),
435                        Poll::Ready(WriteOp::SetDisposition(_, _, _))
436                    );
437
438                    self.process_read_queue(event_queue, ctx);
439                    self.process_write_queue(event_queue, ctx);
440
441                    if !(need_read || need_write) {
442                        break;
443                    }
444                }
445            }
446
447            let subscribed_signals =
448                if self.async_read_in_progress || !self.read_queue.is_empty() {
449                    read_signals
450                } else {
451                    fidl::Signals::NONE
452                } | if !self.write_queue.is_empty() { write_signals } else { fidl::Signals::NONE };
453
454            if !subscribed_signals.is_empty() {
455                self.io_waiter = Some(OnSignals::new(
456                    AnyHandleRef(Arc::clone(&self.handle)),
457                    subscribed_signals,
458                ));
459            } else {
460                self.io_waiter = None;
461                break;
462            }
463        }
464    }
465
466    /// Set `async_read_in_progress` to `true`. Return an error if it was already `true`.
467    fn try_enable_async_read(&mut self) -> Result<()> {
468        if self.async_read_in_progress {
469            Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress))
470        } else {
471            self.async_read_in_progress = true;
472            Ok(())
473        }
474    }
475
476    /// Set `async_read_in_progress` to `false`. Return an error if it was already `false`.
477    fn try_disable_async_read(&mut self) -> Result<()> {
478        if !self.async_read_in_progress {
479            Err(proto::Error::NoReadInProgress(proto::NoReadInProgress))
480        } else {
481            self.async_read_in_progress = false;
482            Ok(())
483        }
484    }
485
486    /// Handle events from the front of the read queue.
487    fn process_read_queue(
488        &mut self,
489        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
490        ctx: &mut Context<'_>,
491    ) {
492        while let Poll::Ready(op) = self.read_queue.front_mut(ctx) {
493            match op {
494                ReadOp::StreamingChannel(tid, true) => {
495                    let tid = *tid;
496                    let result = self.try_enable_async_read();
497                    event_queue
498                        .push_back(FDomainEvent::ChannelStreamingReadStart(tid, result).into());
499                    self.read_queue.destroy_front();
500                }
501                ReadOp::StreamingChannel(tid, false) => {
502                    let tid = *tid;
503                    let result = self.try_disable_async_read();
504                    event_queue
505                        .push_back(FDomainEvent::ChannelStreamingReadStop(tid, result).into());
506                    self.read_queue.destroy_front();
507                }
508                ReadOp::StreamingSocket(tid, true) => {
509                    let tid = *tid;
510                    let result = self.try_enable_async_read();
511                    event_queue
512                        .push_back(FDomainEvent::SocketStreamingReadStart(tid, result).into());
513                    self.read_queue.destroy_front();
514                }
515                ReadOp::StreamingSocket(tid, false) => {
516                    let tid = *tid;
517                    let result = self.try_disable_async_read();
518                    event_queue
519                        .push_back(FDomainEvent::SocketStreamingReadStop(tid, result).into());
520                    self.read_queue.destroy_front();
521                }
522                ReadOp::Socket(tid, max_bytes) => {
523                    let (tid, max_bytes) = (*tid, *max_bytes);
524                    if let Some(event) = self.do_read_socket(tid, max_bytes) {
525                        let _ = self.read_queue.pop_front(ctx);
526                        event_queue.push_back(event.into());
527                    } else {
528                        break;
529                    }
530                }
531                ReadOp::Channel(tid) => {
532                    let tid = *tid;
533                    if let Some(event) = self.do_read_channel(tid) {
534                        let _ = self.read_queue.pop_front(ctx);
535                        event_queue.push_back(event.into());
536                    } else {
537                        break;
538                    }
539                }
540            }
541        }
542
543        if self.async_read_in_progress {
544            // We should have error'd out of any blocking operations if we had a
545            // read in progress.
546            assert!(self.read_queue.is_empty());
547            self.process_async_read(event_queue);
548        }
549    }
550
551    fn process_async_read(&mut self, event_queue: &mut VecDeque<UnprocessedFDomainEvent>) {
552        assert!(self.async_read_in_progress);
553
554        match &*self.handle {
555            AnyHandle::Channel(_) => {
556                'read_loop: while let Some(result) = self.handle.read_channel().transpose() {
557                    match result {
558                        Ok(msg) => event_queue.push_back(
559                            UnprocessedFDomainEvent::ChannelStreamingData(self.hid, msg),
560                        ),
561                        Err(e) => {
562                            event_queue.push_back(
563                                FDomainEvent::ChannelStreamingData(
564                                    proto::ChannelOnChannelStreamingDataRequest {
565                                        handle: self.hid,
566                                        channel_sent: proto::ChannelSent::Stopped(
567                                            proto::AioStopped { error: Some(Box::new(e)) },
568                                        ),
569                                    },
570                                )
571                                .into(),
572                            );
573                            self.async_read_in_progress = false;
574                            break 'read_loop;
575                        }
576                    }
577                }
578            }
579
580            AnyHandle::Socket(_) => {
581                'read_loop: while let Some(result) =
582                    self.handle.read_socket(ASYNC_READ_BUFSIZE).transpose()
583                {
584                    match result {
585                        Ok(data) => {
586                            event_queue.push_back(
587                                FDomainEvent::SocketStreamingData(
588                                    proto::SocketOnSocketStreamingDataRequest {
589                                        handle: self.hid,
590                                        socket_message: proto::SocketMessage::Data(
591                                            proto::SocketData {
592                                                data,
593                                                is_datagram: self.is_datagram_socket,
594                                            },
595                                        ),
596                                    },
597                                )
598                                .into(),
599                            );
600                        }
601                        Err(e) => {
602                            event_queue.push_back(
603                                FDomainEvent::SocketStreamingData(
604                                    proto::SocketOnSocketStreamingDataRequest {
605                                        handle: self.hid,
606                                        socket_message: proto::SocketMessage::Stopped(
607                                            proto::AioStopped { error: Some(Box::new(e)) },
608                                        ),
609                                    },
610                                )
611                                .into(),
612                            );
613                            self.async_read_in_progress = false;
614                            break 'read_loop;
615                        }
616                    }
617                }
618            }
619
620            _ => unreachable!("Processed async read for unreadable handle type!"),
621        }
622    }
623
624    /// Handle events from the front of the write queue.
625    fn process_write_queue(
626        &mut self,
627        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
628        ctx: &mut Context<'_>,
629    ) {
630        // We want to mutate and *maybe* pop the front of the write queue, but
631        // lifetime shenanigans mean we can't do that and also access `self`,
632        // which we need. So we pop the item always, and then maybe push it to
633        // the front again if we didn't actually want to pop it.
634        while let Poll::Ready(op) = self.write_queue.pop_front(ctx) {
635            match op {
636                WriteOp::Socket(mut op) => {
637                    if let Some(event) = self.do_write_socket(&mut op) {
638                        event_queue.push_back(event.into());
639                    } else {
640                        self.write_queue.push_front_no_wake(WriteOp::Socket(op));
641                        break;
642                    }
643                }
644                WriteOp::SetDisposition(tid, disposition, disposition_peer) => {
645                    let result = { self.handle.socket_disposition(disposition, disposition_peer) };
646                    event_queue.push_back(FDomainEvent::SocketDispositionSet(tid, result).into())
647                }
648                WriteOp::Channel(tid, data, mut handles) => {
649                    if self
650                        .do_write_channel(tid, &data, &mut handles, event_queue, ctx)
651                        .is_pending()
652                    {
653                        self.write_queue.push_front_no_wake(WriteOp::Channel(tid, data, handles));
654                        break;
655                    }
656                }
657            }
658        }
659    }
660
661    /// Attempt to read from the handle in this [`HandleState`] as if it were a
662    /// socket. If the read succeeds or produces an error that should not be
663    /// retried, produce an [`FDomainEvent`] containing the result.
664    fn do_read_socket(&mut self, tid: NonZeroU32, max_bytes: u64) -> Option<FDomainEvent> {
665        if self.async_read_in_progress {
666            return Some(
667                FDomainEvent::SocketData(
668                    tid,
669                    Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
670                )
671                .into(),
672            );
673        }
674
675        let max_bytes = if self.is_datagram_socket {
676            let AnyHandle::Socket(s) = &*self.handle else {
677                unreachable!("Read socket from state that wasn't for a socket!");
678            };
679            match s.info() {
680                Ok(x) => x.rx_buf_available as u64,
681                // We should always succeed. The only failures are if we don't
682                // have the rights or something's screwed up with the handle. We
683                // know we have the rights because figuring out this was a
684                // datagram socket to begin with meant calling the same call on
685                // the same handle earlier.
686                Err(e) => {
687                    return Some(FDomainEvent::SocketData(
688                        tid,
689                        Err(proto::Error::TargetError(e.into_raw())),
690                    ));
691                }
692            }
693        } else {
694            max_bytes
695        };
696        self.handle.read_socket(max_bytes).transpose().map(|x| {
697            FDomainEvent::SocketData(
698                tid,
699                x.map(|data| proto::SocketData { data, is_datagram: self.is_datagram_socket }),
700            )
701        })
702    }
703
704    /// Attempt to write to the handle in this [`HandleState`] as if it were a
705    /// socket. If the write succeeds or produces an error that should not be
706    /// retried, produce an [`FDomainEvent`] containing the result.
707    fn do_write_socket(&mut self, op: &mut SocketWrite) -> Option<FDomainEvent> {
708        match self.handle.write_socket(&op.to_write) {
709            Ok(wrote) => {
710                op.wrote += wrote;
711                op.to_write.drain(..wrote);
712
713                if op.to_write.is_empty() {
714                    Some(FDomainEvent::WroteSocket(
715                        op.tid,
716                        Ok(proto::SocketWriteSocketResponse {
717                            wrote: op.wrote.try_into().unwrap(),
718                        }),
719                    ))
720                } else {
721                    None
722                }
723            }
724            Err(error) => Some(FDomainEvent::WroteSocket(
725                op.tid,
726                Err(proto::WriteSocketError { error, wrote: op.wrote.try_into().unwrap() }),
727            )),
728        }
729    }
730
731    /// Attempt to write to the handle in this [`HandleState`] as if it were a
732    /// channel. If the write succeeds or produces an error that should not be
733    /// retried, produce an [`FDomainEvent`] containing the result.
734    fn do_write_channel(
735        &mut self,
736        tid: NonZeroU32,
737        data: &[u8],
738        handles: &mut HandlesToWrite,
739        event_queue: &mut VecDeque<UnprocessedFDomainEvent>,
740        ctx: &mut Context<'_>,
741    ) -> Poll<()> {
742        let Poll::Ready(handles) = handles.poll_ready(event_queue, ctx) else {
743            return Poll::Pending;
744        };
745
746        let ret = self.handle.write_channel(data, handles);
747        if let Some(ret) = ret {
748            event_queue.push_back(FDomainEvent::WroteChannel(tid, ret).into())
749        }
750        Poll::Ready(())
751    }
752
753    /// Attempt to read from the handle in this [`HandleState`] as if it were a
754    /// channel. If the read succeeds or produces an error that should not be
755    /// retried, produce an [`FDomainEvent`] containing the result.
756    fn do_read_channel(&mut self, tid: NonZeroU32) -> Option<UnprocessedFDomainEvent> {
757        if self.async_read_in_progress {
758            return Some(
759                FDomainEvent::ChannelData(
760                    tid,
761                    Err(proto::Error::StreamingReadInProgress(proto::StreamingReadInProgress)),
762                )
763                .into(),
764            );
765        }
766        match self.handle.read_channel() {
767            Ok(x) => x.map(|x| UnprocessedFDomainEvent::ChannelData(tid, x)),
768            Err(e) => Some(FDomainEvent::ChannelData(tid, Err(e)).into()),
769        }
770    }
771}
772
773/// State for a handle which is closing, but which needs its read and write
774/// queues flushed first.
775struct ClosingHandle {
776    action: Arc<CloseAction>,
777    state: Option<ShuttingDownHandle>,
778}
779
780impl ClosingHandle {
781    fn poll_ready(&mut self, fdomain: &mut FDomain, ctx: &mut Context<'_>) -> Poll<()> {
782        if let Some(state) = self.state.as_mut() {
783            if state.poll_ready(&mut fdomain.event_queue, ctx).is_ready() {
784                let state = self.state.take().unwrap();
785                let ShuttingDownHandle::Ready(handle) = state else {
786                    unreachable!();
787                };
788                self.action.perform(fdomain, handle);
789                Poll::Ready(())
790            } else {
791                Poll::Pending
792            }
793        } else {
794            Poll::Ready(())
795        }
796    }
797}
798
799/// When the client requests a handle to be closed or moved or otherwise
800/// destroyed, it goes into limbo for a bit while pending read and write actions
801/// are flushed. This is how we mark what should happen to the handle after that
802/// period ends.
803enum CloseAction {
804    Close { tid: NonZeroU32, count: AtomicU32, result: Result<()> },
805    Replace { tid: NonZeroU32, new_hid: proto::NewHandleId, rights: fidl::Rights },
806}
807
808impl CloseAction {
809    fn perform(&self, fdomain: &mut FDomain, handle: AnyHandle) {
810        match self {
811            CloseAction::Close { tid, count, result } => {
812                if count.fetch_sub(1, Ordering::Relaxed) == 1 {
813                    fdomain
814                        .event_queue
815                        .push_back(FDomainEvent::ClosedHandle(*tid, result.clone()).into());
816                }
817            }
818            CloseAction::Replace { tid, new_hid, rights } => {
819                let result = handle
820                    .replace(*rights)
821                    .and_then(|handle| fdomain.alloc_client_handles([*new_hid], [handle]));
822                fdomain.event_queue.push_back(FDomainEvent::ReplacedHandle(*tid, result).into());
823            }
824        }
825    }
826}
827
828/// This is a container of handles that is manipulable via the FDomain protocol.
829/// See [RFC-0228].
830///
831/// Most of the methods simply handle FIDL requests from the FDomain protocol.
832#[pin_project::pin_project]
833pub struct FDomain {
834    namespace: Box<dyn Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send>,
835    handles: HashMap<proto::HandleId, HandleState>,
836    closing_handles: Vec<ClosingHandle>,
837    event_queue: VecDeque<UnprocessedFDomainEvent>,
838    waker: Option<Waker>,
839}
840
841impl FDomain {
842    /// Create a new FDomain. The new FDomain is empty and ready to be connected
843    /// to by a client.
844    pub fn new_empty() -> Self {
845        Self::new(|| Err(fidl::Status::NOT_FOUND))
846    }
847
848    /// Create a new FDomain populated with the given namespace entries.
849    pub fn new(
850        namespace: impl Fn() -> Result<ClientEnd<fio::DirectoryMarker>, fidl::Status> + Send + 'static,
851    ) -> Self {
852        FDomain {
853            namespace: Box::new(namespace),
854            handles: HashMap::new(),
855            closing_handles: Vec::new(),
856            event_queue: VecDeque::new(),
857            waker: None,
858        }
859    }
860
861    /// Add an event to be emitted by this FDomain.
862    fn push_event(&mut self, event: impl Into<UnprocessedFDomainEvent>) {
863        self.event_queue.push_back(event.into());
864        self.waker.take().map(Waker::wake);
865    }
866
867    /// Given a [`fidl::MessageBufEtc`], load all of the handles from it into this
868    /// FDomain and return a [`ReadChannelPayload`](proto::ReadChannelPayload)
869    /// with the same data and the IDs for the handles.
870    fn process_message(
871        &mut self,
872        message: fidl::MessageBufEtc,
873    ) -> Result<proto::ChannelMessage, proto::Error> {
874        let (data, handles) = message.split();
875        let handles = handles
876            .into_iter()
877            .map(|info| {
878                let type_ = info.object_type;
879
880                let handle = match info.object_type {
881                    fidl::ObjectType::CHANNEL => {
882                        AnyHandle::Channel(fidl::Channel::from_handle(info.handle))
883                    }
884                    fidl::ObjectType::SOCKET => {
885                        AnyHandle::Socket(fidl::Socket::from_handle(info.handle))
886                    }
887                    fidl::ObjectType::EVENTPAIR => {
888                        AnyHandle::EventPair(fidl::EventPair::from_handle(info.handle))
889                    }
890                    fidl::ObjectType::EVENT => {
891                        AnyHandle::Event(fidl::Event::from_handle(info.handle))
892                    }
893                    _ => AnyHandle::Unknown(handles::Unknown(info.handle, info.object_type)),
894                };
895
896                Ok(proto::HandleInfo {
897                    rights: info.rights,
898                    handle: self.alloc_fdomain_handle(handle)?,
899                    type_,
900                })
901            })
902            .collect::<Result<Vec<_>, proto::Error>>()?;
903
904        Ok(proto::ChannelMessage { data, handles })
905    }
906
907    /// Allocate `N` new handle IDs. These are allocated from
908    /// [`NewHandleId`](proto::NewHandleId) and are expected to follow the protocol
909    /// rules for client-allocated handle IDs.
910    ///
911    /// If any of the handles passed fail to allocate, none of the handles will
912    /// be allocated.
913    fn alloc_client_handles<const N: usize>(
914        &mut self,
915        ids: [proto::NewHandleId; N],
916        handles: [AnyHandle; N],
917    ) -> Result<(), proto::Error> {
918        for id in ids {
919            if id.id & (1 << 31) != 0 {
920                return Err(proto::Error::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange {
921                    id: id.id,
922                }));
923            }
924
925            if self.handles.contains_key(&proto::HandleId { id: id.id }) {
926                return Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
927                    id: id.id,
928                    same_call: false,
929                }));
930            }
931        }
932
933        let mut sorted_ids = ids;
934        sorted_ids.sort();
935
936        if let Some(a) = sorted_ids.windows(2).find(|x| x[0] == x[1]) {
937            Err(proto::Error::NewHandleIdReused(proto::NewHandleIdReused {
938                id: a[0].id,
939                same_call: true,
940            }))
941        } else {
942            let ids = ids.into_iter().map(|id| proto::HandleId { id: id.id });
943            let handles = ids
944                .zip(handles.into_iter())
945                .map(|(id, h)| HandleState::new(h, id).map(|x| (id, x)))
946                .collect::<Result<Vec<_>, proto::Error>>()?;
947
948            self.handles.extend(handles);
949
950            Ok(())
951        }
952    }
953
954    /// Allocate a new handle ID. These are allocated internally and are
955    /// expected to follow the protocol rules for FDomain-allocated handle IDs.
956    fn alloc_fdomain_handle(&mut self, handle: AnyHandle) -> Result<proto::HandleId, proto::Error> {
957        loop {
958            let id = proto::HandleId { id: rand::random::<u32>() | (1u32 << 31) };
959            if let Entry::Vacant(v) = self.handles.entry(id) {
960                v.insert(HandleState::new(handle, id)?);
961                break Ok(id);
962            }
963        }
964    }
965
966    /// If a handle exists in this FDomain, remove it.
967    fn take_handle(&mut self, handle: proto::HandleId) -> Result<HandleState, proto::Error> {
968        self.handles
969            .remove(&handle)
970            .ok_or(proto::Error::BadHandleId(proto::BadHandleId { id: handle.id }))
971    }
972
973    /// Use a handle in our handle table, if it exists.
974    fn using_handle<T>(
975        &mut self,
976        id: proto::HandleId,
977        f: impl FnOnce(&mut HandleState) -> Result<T, proto::Error>,
978    ) -> Result<T, proto::Error> {
979        if let Some(s) = self.handles.get_mut(&id) {
980            f(s)
981        } else {
982            Err(proto::Error::BadHandleId(proto::BadHandleId { id: id.id }))
983        }
984    }
985
986    pub fn get_namespace(&mut self, request: proto::FDomainGetNamespaceRequest) -> Result<()> {
987        match (self.namespace)() {
988            Ok(endpoint) => self.alloc_client_handles(
989                [request.new_handle],
990                [AnyHandle::Channel(endpoint.into_channel())],
991            ),
992            Err(e) => Err(proto::Error::TargetError(e.into_raw())),
993        }
994    }
995
996    pub fn create_channel(&mut self, request: proto::ChannelCreateChannelRequest) -> Result<()> {
997        let (a, b) = fidl::Channel::create();
998        self.alloc_client_handles(request.handles, [AnyHandle::Channel(a), AnyHandle::Channel(b)])
999    }
1000
1001    pub fn create_socket(&mut self, request: proto::SocketCreateSocketRequest) -> Result<()> {
1002        let (a, b) = match request.options {
1003            proto::SocketType::Stream => fidl::Socket::create_stream(),
1004            proto::SocketType::Datagram => fidl::Socket::create_datagram(),
1005            type_ => {
1006                return Err(proto::Error::SocketTypeUnknown(proto::SocketTypeUnknown { type_ }));
1007            }
1008        };
1009
1010        self.alloc_client_handles(request.handles, [AnyHandle::Socket(a), AnyHandle::Socket(b)])
1011    }
1012
1013    pub fn create_event_pair(
1014        &mut self,
1015        request: proto::EventPairCreateEventPairRequest,
1016    ) -> Result<()> {
1017        let (a, b) = fidl::EventPair::create();
1018        self.alloc_client_handles(
1019            request.handles,
1020            [AnyHandle::EventPair(a), AnyHandle::EventPair(b)],
1021        )
1022    }
1023
1024    pub fn create_event(&mut self, request: proto::EventCreateEventRequest) -> Result<()> {
1025        let a = fidl::Event::create();
1026        self.alloc_client_handles([request.handle], [AnyHandle::Event(a)])
1027    }
1028
1029    pub fn set_socket_disposition(
1030        &mut self,
1031        tid: NonZeroU32,
1032        request: proto::SocketSetSocketDispositionRequest,
1033    ) {
1034        if let Err(err) = self.using_handle(request.handle, |h| {
1035            h.write_queue.push_back(WriteOp::SetDisposition(
1036                tid,
1037                request.disposition,
1038                request.disposition_peer,
1039            ));
1040            Ok(())
1041        }) {
1042            self.push_event(FDomainEvent::SocketDispositionSet(tid, Err(err)));
1043        }
1044    }
1045
1046    pub fn read_socket(&mut self, tid: NonZeroU32, request: proto::SocketReadSocketRequest) {
1047        if let Err(e) = self.using_handle(request.handle, |h| {
1048            h.read_queue.push_back(ReadOp::Socket(tid, request.max_bytes));
1049            Ok(())
1050        }) {
1051            self.push_event(FDomainEvent::SocketData(tid, Err(e)));
1052        }
1053    }
1054
1055    pub fn read_channel(&mut self, tid: NonZeroU32, request: proto::ChannelReadChannelRequest) {
1056        if let Err(e) = self.using_handle(request.handle, |h| {
1057            h.read_queue.push_back(ReadOp::Channel(tid));
1058            Ok(())
1059        }) {
1060            self.push_event(FDomainEvent::ChannelData(tid, Err(e)));
1061        }
1062    }
1063
1064    pub fn write_socket(&mut self, tid: NonZeroU32, request: proto::SocketWriteSocketRequest) {
1065        if let Err(error) = self.using_handle(request.handle, |h| {
1066            h.write_queue.push_back(WriteOp::Socket(SocketWrite {
1067                tid,
1068                wrote: 0,
1069                to_write: request.data,
1070            }));
1071            Ok(())
1072        }) {
1073            self.push_event(FDomainEvent::WroteSocket(
1074                tid,
1075                Err(proto::WriteSocketError { error, wrote: 0 }),
1076            ));
1077        }
1078    }
1079
1080    pub fn write_channel(&mut self, tid: NonZeroU32, request: proto::ChannelWriteChannelRequest) {
1081        // Go through the list of handles in the requests (which will either be
1082        // a simple list of handles or a list of HandleDispositions) and obtain
1083        // for each a `ShuttingDownHandle` which contains our handle state (the
1084        // "Shutting down" refers to the fact that we're pulling the handle out
1085        // of the FDomain in order to send it) and the rights the requester
1086        // would like the handle to have upon arrival at the other end of the
1087        // channel.
1088        let handles: Vec<Result<(ShuttingDownHandle, fidl::Rights)>> = match request.handles {
1089            proto::Handles::Handles(h) => h
1090                .into_iter()
1091                .map(|h| {
1092                    if h != request.handle {
1093                        self.take_handle(h).map(|handle_state| {
1094                            (ShuttingDownHandle::InUse(h, handle_state), fidl::Rights::SAME_RIGHTS)
1095                        })
1096                    } else {
1097                        Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1098                    }
1099                })
1100                .collect(),
1101            proto::Handles::Dispositions(d) => d
1102                .into_iter()
1103                .map(|d| {
1104                    let res = match d.handle {
1105                        proto::HandleOp::Move_(h) => {
1106                            if h != request.handle {
1107                                self.take_handle(h).map(|x| ShuttingDownHandle::InUse(h, x))
1108                            } else {
1109                                Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1110                            }
1111                        }
1112                        proto::HandleOp::Duplicate(h) => {
1113                            if h != request.handle {
1114                                // If the requester wants us to duplicate the
1115                                // handle, we do so now rather than letting
1116                                // `write_etc` do it. Otherwise we have to use a
1117                                // reference to the handle, and we get lifetime
1118                                // hell.
1119                                self.using_handle(h, |h| {
1120                                    h.handle.duplicate(fidl::Rights::SAME_RIGHTS)
1121                                })
1122                                .map(ShuttingDownHandle::Ready)
1123                            } else {
1124                                Err(proto::Error::WroteToSelf(proto::WroteToSelf))
1125                            }
1126                        }
1127                    };
1128
1129                    res.and_then(|x| Ok((x, d.rights)))
1130                })
1131                .collect(),
1132        };
1133
1134        if handles.iter().any(|x| x.is_err()) {
1135            let e = handles.into_iter().map(|x| x.err().map(Box::new)).collect();
1136
1137            self.push_event(FDomainEvent::WroteChannel(
1138                tid,
1139                Err(proto::WriteChannelError::OpErrors(e)),
1140            ));
1141            return;
1142        }
1143
1144        let handles = handles.into_iter().map(|x| x.unwrap()).collect::<Vec<_>>();
1145
1146        if let Err(e) = self.using_handle(request.handle, |h| {
1147            h.write_queue.push_back(WriteOp::Channel(
1148                tid,
1149                request.data,
1150                HandlesToWrite::SomeInUse(handles),
1151            ));
1152            Ok(())
1153        }) {
1154            self.push_event(FDomainEvent::WroteChannel(
1155                tid,
1156                Err(proto::WriteChannelError::Error(e)),
1157            ));
1158        }
1159    }
1160
1161    pub fn wait_for_signals(
1162        &mut self,
1163        tid: NonZeroU32,
1164        request: proto::FDomainWaitForSignalsRequest,
1165    ) {
1166        let result = self.using_handle(request.handle, |h| {
1167            let signals = fidl::Signals::from_bits_retain(request.signals);
1168            h.signal_waiters.push(SignalWaiter {
1169                tid,
1170                waiter: OnSignals::new(AnyHandleRef(Arc::clone(&h.handle)), signals),
1171            });
1172            Ok(())
1173        });
1174
1175        if let Err(e) = result {
1176            self.push_event(FDomainEvent::WaitForSignals(tid, Err(e)));
1177        } else {
1178            self.waker.take().map(Waker::wake);
1179        }
1180    }
1181
1182    pub fn close(&mut self, tid: NonZeroU32, request: proto::FDomainCloseRequest) {
1183        let mut states = Vec::with_capacity(request.handles.len());
1184        let mut result = Ok(());
1185        for hid in request.handles {
1186            match self.take_handle(hid) {
1187                Ok(state) => states.push((hid, state)),
1188
1189                Err(e) => {
1190                    result = result.and(Err(e));
1191                }
1192            }
1193        }
1194
1195        let action = Arc::new(CloseAction::Close {
1196            tid,
1197            count: AtomicU32::new(states.len().try_into().unwrap()),
1198            result,
1199        });
1200
1201        for (hid, state) in states {
1202            self.closing_handles.push(ClosingHandle {
1203                action: Arc::clone(&action),
1204                state: Some(ShuttingDownHandle::InUse(hid, state)),
1205            });
1206        }
1207    }
1208
1209    pub fn duplicate(&mut self, request: proto::FDomainDuplicateRequest) -> Result<()> {
1210        let rights = request.rights;
1211        let handle = self.using_handle(request.handle, |h| h.handle.duplicate(rights));
1212        handle.and_then(|h| self.alloc_client_handles([request.new_handle], [h]))
1213    }
1214
1215    pub fn replace(
1216        &mut self,
1217        tid: NonZeroU32,
1218        request: proto::FDomainReplaceRequest,
1219    ) -> Result<()> {
1220        let rights = request.rights;
1221        let new_hid = request.new_handle;
1222        match self.take_handle(request.handle) {
1223            Ok(state) => self.closing_handles.push(ClosingHandle {
1224                action: Arc::new(CloseAction::Replace { tid, new_hid, rights }),
1225                state: Some(ShuttingDownHandle::InUse(request.handle, state)),
1226            }),
1227            Err(e) => self.event_queue.push_back(UnprocessedFDomainEvent::Ready(
1228                FDomainEvent::ReplacedHandle(tid, Err(e)),
1229            )),
1230        }
1231
1232        Ok(())
1233    }
1234
1235    pub fn signal(&mut self, request: proto::FDomainSignalRequest) -> Result<()> {
1236        let set = fidl::Signals::from_bits_retain(request.set);
1237        let clear = fidl::Signals::from_bits_retain(request.clear);
1238
1239        self.using_handle(request.handle, |h| {
1240            h.handle.signal_handle(clear, set).map_err(|e| proto::Error::TargetError(e.into_raw()))
1241        })
1242    }
1243
1244    pub fn signal_peer(&mut self, request: proto::FDomainSignalPeerRequest) -> Result<()> {
1245        let set = fidl::Signals::from_bits_retain(request.set);
1246        let clear = fidl::Signals::from_bits_retain(request.clear);
1247
1248        self.using_handle(request.handle, |h| h.handle.signal_peer(clear, set))
1249    }
1250
1251    pub fn read_channel_streaming_start(
1252        &mut self,
1253        tid: NonZeroU32,
1254        request: proto::ChannelReadChannelStreamingStartRequest,
1255    ) {
1256        if let Err(err) = self.using_handle(request.handle, |h| {
1257            h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1258            h.read_queue.push_back(ReadOp::StreamingChannel(tid, true));
1259            Ok(())
1260        }) {
1261            self.event_queue
1262                .push_back(FDomainEvent::ChannelStreamingReadStart(tid, Err(err)).into())
1263        }
1264    }
1265
1266    pub fn read_channel_streaming_stop(
1267        &mut self,
1268        tid: NonZeroU32,
1269        request: proto::ChannelReadChannelStreamingStopRequest,
1270    ) {
1271        if let Err(err) = self.using_handle(request.handle, |h| {
1272            h.handle.expected_type(fidl::ObjectType::CHANNEL)?;
1273            h.read_queue.push_back(ReadOp::StreamingChannel(tid, false));
1274            Ok(())
1275        }) {
1276            self.event_queue.push_back(FDomainEvent::ChannelStreamingReadStop(tid, Err(err)).into())
1277        }
1278    }
1279
1280    pub fn read_socket_streaming_start(
1281        &mut self,
1282        tid: NonZeroU32,
1283        request: proto::SocketReadSocketStreamingStartRequest,
1284    ) {
1285        if let Err(err) = self.using_handle(request.handle, |h| {
1286            h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1287            h.read_queue.push_back(ReadOp::StreamingSocket(tid, true));
1288            Ok(())
1289        }) {
1290            self.event_queue.push_back(FDomainEvent::SocketStreamingReadStart(tid, Err(err)).into())
1291        }
1292    }
1293
1294    pub fn read_socket_streaming_stop(
1295        &mut self,
1296        tid: NonZeroU32,
1297        request: proto::SocketReadSocketStreamingStopRequest,
1298    ) {
1299        if let Err(err) = self.using_handle(request.handle, |h| {
1300            h.handle.expected_type(fidl::ObjectType::SOCKET)?;
1301            h.read_queue.push_back(ReadOp::StreamingSocket(tid, false));
1302            Ok(())
1303        }) {
1304            self.event_queue.push_back(FDomainEvent::SocketStreamingReadStop(tid, Err(err)).into())
1305        }
1306    }
1307}
1308
1309/// [`FDomain`] implements a stream of events, for protocol events and for
1310/// replies to long-running methods.
1311impl futures::Stream for FDomain {
1312    type Item = FDomainEvent;
1313
1314    fn poll_next(
1315        mut self: std::pin::Pin<&mut Self>,
1316        ctx: &mut Context<'_>,
1317    ) -> Poll<Option<Self::Item>> {
1318        let this = &mut *self;
1319
1320        let mut closing_handles = std::mem::replace(&mut this.closing_handles, Vec::new());
1321        closing_handles.retain_mut(|x| x.poll_ready(this, ctx).is_pending());
1322        this.closing_handles = closing_handles;
1323
1324        let handles = &mut this.handles;
1325        let event_queue = &mut this.event_queue;
1326        for state in handles.values_mut() {
1327            state.poll(event_queue, ctx);
1328        }
1329
1330        if let Some(event) = self.event_queue.pop_front() {
1331            match event {
1332                UnprocessedFDomainEvent::Ready(event) => Poll::Ready(Some(event)),
1333                UnprocessedFDomainEvent::ChannelData(tid, message) => {
1334                    Poll::Ready(Some(FDomainEvent::ChannelData(tid, self.process_message(message))))
1335                }
1336                UnprocessedFDomainEvent::ChannelStreamingData(hid, message) => {
1337                    match self.process_message(message) {
1338                        Ok(message) => Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1339                            proto::ChannelOnChannelStreamingDataRequest {
1340                                handle: hid,
1341                                channel_sent: proto::ChannelSent::Message(message),
1342                            },
1343                        ))),
1344                        Err(e) => {
1345                            self.handles.get_mut(&hid).unwrap().async_read_in_progress = false;
1346                            Poll::Ready(Some(FDomainEvent::ChannelStreamingData(
1347                                proto::ChannelOnChannelStreamingDataRequest {
1348                                    handle: hid,
1349                                    channel_sent: proto::ChannelSent::Stopped(proto::AioStopped {
1350                                        error: Some(Box::new(e)),
1351                                    }),
1352                                },
1353                            )))
1354                        }
1355                    }
1356                }
1357            }
1358        } else {
1359            self.waker = Some(ctx.waker().clone());
1360            Poll::Pending
1361        }
1362    }
1363}