fdomain_client/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_message::TransactionHeader;
6use fuchsia_sync::Mutex;
7use futures::FutureExt;
8use futures::channel::oneshot::Sender as OneshotSender;
9use futures::stream::Stream as StreamTrait;
10use std::collections::{HashMap, VecDeque};
11use std::convert::Infallible;
12use std::future::Future;
13use std::num::NonZeroU32;
14use std::pin::Pin;
15use std::sync::{Arc, LazyLock};
16use std::task::{Context, Poll, Waker, ready};
17use {fidl_fuchsia_fdomain as proto, fuchsia_async as _};
18
19mod channel;
20mod event;
21mod event_pair;
22mod handle;
23mod responder;
24mod socket;
25
26#[cfg(test)]
27mod test;
28
29pub mod fidl;
30pub mod fidl_next;
31
32use responder::Responder;
33
34pub use channel::{
35    AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
36};
37pub use event::Event;
38pub use event_pair::Eventpair as EventPair;
39pub use handle::unowned::Unowned;
40pub use handle::{
41    AsHandleRef, Handle, HandleBased, HandleRef, NullableHandle, OnFDomainSignals, Peered,
42};
43pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
44pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
45
46// Unsupported handle types.
47#[rustfmt::skip]
48pub use Handle as Fifo;
49#[rustfmt::skip]
50pub use Handle as Job;
51#[rustfmt::skip]
52pub use Handle as Process;
53#[rustfmt::skip]
54pub use Handle as Resource;
55#[rustfmt::skip]
56pub use Handle as Stream;
57#[rustfmt::skip]
58pub use Handle as Thread;
59#[rustfmt::skip]
60pub use Handle as Vmar;
61#[rustfmt::skip]
62pub use Handle as Vmo;
63
64use proto::f_domain_ordinals as ordinals;
65
66fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67    match error {
68        FDomainError::TargetError(e) => {
69            let e = zx_status::Status::from_raw(*e);
70            write!(f, "Target-side error {e}")
71        }
72        FDomainError::BadHandleId(proto::BadHandleId { id }) => {
73            write!(f, "Tried to use invalid handle id {id}")
74        }
75        FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
76            f,
77            "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
78        ),
79        FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
80            write!(f, "Handle is occupied delivering streaming reads")
81        }
82        FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
83            write!(f, "No streaming read was in progress")
84        }
85        FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
86            write!(
87                f,
88                "Tried to create a handle with id {id}, which is outside the valid range for client handles"
89            )
90        }
91        FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
92            if *same_call {
93                write!(f, "Tried to create two or more new handles with the same id {id}")
94            } else {
95                write!(
96                    f,
97                    "Tried to create a new handle with id {id}, which is already the id of an existing handle"
98                )
99            }
100        }
101        FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
102            write!(f, "Tried to write a channel into itself")
103        }
104        FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
105            write!(f, "Handle closed while being read")
106        }
107        _ => todo!(),
108    }
109}
110
111/// Result type alias.
112pub type Result<T, E = Error> = std::result::Result<T, E>;
113
114/// Error type emitted by FDomain operations.
115#[derive(Clone)]
116pub enum Error {
117    SocketWrite(WriteSocketError),
118    ChannelWrite(WriteChannelError),
119    FDomain(FDomainError),
120    Protocol(::fidl::Error),
121    ProtocolObjectTypeIncompatible,
122    ProtocolRightsIncompatible,
123    ProtocolSignalsIncompatible,
124    ProtocolStreamEventIncompatible,
125    Transport(Option<Arc<std::io::Error>>),
126    ConnectionMismatch,
127    StreamingAborted,
128}
129
130impl std::fmt::Display for Error {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        match self {
133            Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
134                write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
135                write_fdomain_error(error, f)
136            }
137            Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
138                write!(f, "While writing channel: ")?;
139                write_fdomain_error(error, f)
140            }
141            Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
142                write!(f, "Couldn't write all handles into a channel:")?;
143                for (pos, error) in
144                    errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
145                {
146                    write!(f, "\n  Handle in position {pos}: ")?;
147                    write_fdomain_error(error, f)?;
148                }
149                Ok(())
150            }
151            Self::ProtocolObjectTypeIncompatible => {
152                write!(f, "The FDomain protocol does not recognize an object type")
153            }
154            Self::ProtocolRightsIncompatible => {
155                write!(f, "The FDomain protocol does not recognize some rights")
156            }
157            Self::ProtocolSignalsIncompatible => {
158                write!(f, "The FDomain protocol does not recognize some signals")
159            }
160            Self::ProtocolStreamEventIncompatible => {
161                write!(f, "The FDomain protocol does not recognize a received streaming IO event")
162            }
163            Self::FDomain(e) => write_fdomain_error(e, f),
164            Self::Protocol(e) => write!(f, "Protocol error: {e}"),
165            Self::Transport(Some(e)) => write!(f, "Transport error: {e:?}"),
166            Self::Transport(None) => write!(f, "Transport closed"),
167            Self::ConnectionMismatch => {
168                write!(f, "Tried to use an FDomain handle from a different connection")
169            }
170            Self::StreamingAborted => write!(f, "This channel is no longer streaming"),
171        }
172    }
173}
174
175impl std::fmt::Debug for Error {
176    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177        match self {
178            Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
179            Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
180            Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
181            Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
182            Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
183            Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
184            Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
185            Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
186            Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
187            Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
188            Self::StreamingAborted => write!(f, "StreamingAborted"),
189        }
190    }
191}
192
193impl std::error::Error for Error {}
194
195impl From<FDomainError> for Error {
196    fn from(other: FDomainError) -> Self {
197        Self::FDomain(other)
198    }
199}
200
201impl From<::fidl::Error> for Error {
202    fn from(other: ::fidl::Error) -> Self {
203        Self::Protocol(other)
204    }
205}
206
207impl From<WriteSocketError> for Error {
208    fn from(other: WriteSocketError) -> Self {
209        Self::SocketWrite(other)
210    }
211}
212
213impl From<WriteChannelError> for Error {
214    fn from(other: WriteChannelError) -> Self {
215        Self::ChannelWrite(other)
216    }
217}
218
219/// An error emitted internally by the client. Similar to [`Error`] but does not
220/// contain several variants which are irrelevant in the contexts where it is
221/// used.
222#[derive(Clone)]
223enum InnerError {
224    Protocol(::fidl::Error),
225    ProtocolStreamEventIncompatible,
226    Transport(Option<Arc<std::io::Error>>),
227}
228
229impl From<InnerError> for Error {
230    fn from(other: InnerError) -> Self {
231        match other {
232            InnerError::Protocol(p) => Error::Protocol(p),
233            InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
234            InnerError::Transport(t) => Error::Transport(t),
235        }
236    }
237}
238
239impl From<::fidl::Error> for InnerError {
240    fn from(other: ::fidl::Error) -> Self {
241        InnerError::Protocol(other)
242    }
243}
244
245// TODO(399717689) Figure out if we could just use AsyncRead/Write instead of a special trait.
246/// Implemented by objects which provide a transport over which we can speak the
247/// FDomain protocol.
248///
249/// The implementer must provide two things:
250/// 1) An incoming stream of messages presented as `Vec<u8>`. This is provided
251///    via the `Stream` trait, which this trait requires.
252/// 2) A way to send messages. This is provided by implementing the
253///    `poll_send_message` method.
254pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
255    /// Attempt to send a message asynchronously. Messages should be sent so
256    /// that they arrive at the target in order.
257    fn poll_send_message(
258        self: Pin<&mut Self>,
259        msg: &[u8],
260        ctx: &mut Context<'_>,
261    ) -> Poll<Result<(), Option<std::io::Error>>>;
262}
263
264/// Wrapper for an `FDomainTransport` implementer that:
265/// 1) Provides a queue for outgoing messages so we need not have an await point
266///    when we submit a message.
267/// 2) Drops the transport on error, then returns the last observed error for
268///    all future operations.
269enum Transport {
270    Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
271    Error(InnerError),
272}
273
274impl Transport {
275    /// Get the failure mode of the transport if it has failed.
276    fn error(&self) -> Option<InnerError> {
277        match self {
278            Transport::Transport(_, _, _) => None,
279            Transport::Error(inner_error) => Some(inner_error.clone()),
280        }
281    }
282
283    /// Enqueue a message to be sent on this transport.
284    fn push_msg(&mut self, msg: Box<[u8]>) {
285        if let Transport::Transport(_, v, w) = self {
286            v.push_back(msg);
287            w.drain(..).for_each(Waker::wake);
288        }
289    }
290
291    /// Push messages in the send queue out through the transport.
292    fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
293        match self {
294            Transport::Error(e) => Poll::Ready(e.clone()),
295            Transport::Transport(t, v, w) => {
296                while let Some(msg) = v.front() {
297                    match t.as_mut().poll_send_message(msg, ctx) {
298                        Poll::Ready(Ok(())) => {
299                            v.pop_front();
300                        }
301                        Poll::Ready(Err(e)) => {
302                            let e = e.map(Arc::new);
303                            *self = Transport::Error(InnerError::Transport(e.clone()));
304                            return Poll::Ready(InnerError::Transport(e));
305                        }
306                        Poll::Pending => return Poll::Pending,
307                    }
308                }
309
310                if v.is_empty() {
311                    w.push(ctx.waker().clone());
312                } else {
313                    ctx.waker().wake_by_ref();
314                }
315                Poll::Pending
316            }
317        }
318    }
319
320    /// Get the next incoming message from the transport.
321    fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Result<Box<[u8]>, InnerError>> {
322        match self {
323            Transport::Error(e) => Poll::Ready(Err(e.clone())),
324            Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
325                Some(Ok(x)) => Poll::Ready(Ok(x)),
326                Some(Err(e)) => {
327                    let e = Arc::new(e);
328                    *self = Transport::Error(InnerError::Transport(Some(Arc::clone(&e))));
329                    Poll::Ready(Err(InnerError::Transport(Some(e))))
330                }
331                Option::None => Poll::Ready(Err(InnerError::Transport(None))),
332            },
333        }
334    }
335}
336
337/// State of a socket that is or has been read from.
338struct SocketReadState {
339    wakers: Vec<Waker>,
340    queued: VecDeque<Result<proto::SocketData, Error>>,
341    read_request_pending: bool,
342    is_streaming: bool,
343}
344
345impl SocketReadState {
346    /// Handle an incoming message, which is either a channel streaming event or
347    /// response to a `ChannelRead` request.
348    fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) {
349        self.queued.push_back(msg);
350        self.wakers.drain(..).for_each(Waker::wake);
351    }
352}
353
354/// State of a channel that is or has been read from.
355struct ChannelReadState {
356    wakers: Vec<Waker>,
357    queued: VecDeque<Result<proto::ChannelMessage, Error>>,
358    read_request_pending: bool,
359    is_streaming: bool,
360}
361
362impl ChannelReadState {
363    /// Handle an incoming message, which is either a channel streaming event or
364    /// response to a `ChannelRead` request.
365    fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) {
366        self.queued.push_back(msg);
367        self.wakers.drain(..).for_each(Waker::wake);
368    }
369}
370
371/// Lock-protected interior of `Client`
372struct ClientInner {
373    transport: Transport,
374    transactions: HashMap<NonZeroU32, responder::Responder>,
375    channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
376    socket_read_states: HashMap<proto::HandleId, SocketReadState>,
377    next_tx_id: u32,
378    waiting_to_close: Vec<proto::HandleId>,
379    waiting_to_close_waker: Waker,
380}
381
382impl ClientInner {
383    /// Serialize and enqueue a new transaction, including header and transaction ID.
384    fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
385        let tx_id = self.next_tx_id;
386
387        let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
388        let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
389        self.next_tx_id += 1;
390        assert!(
391            self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
392            "Allocated same tx id twice!"
393        );
394        self.transport.push_msg(msg.into());
395    }
396
397    /// Polls the underlying transport to ensure any incoming or outgoing
398    /// messages are processed as far as possible. Errors if the transport has failed.
399    fn try_poll_transport(
400        &mut self,
401        ctx: &mut Context<'_>,
402    ) -> Poll<Result<Infallible, InnerError>> {
403        if !self.waiting_to_close.is_empty() {
404            let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
405            // We've dropped the handle object. Nobody is going to wait to read
406            // the buffers anymore. This is a safe time to drop the read state.
407            for handle in &handles {
408                let _ = self.channel_read_states.remove(handle);
409                let _ = self.socket_read_states.remove(handle);
410            }
411            self.request(
412                ordinals::CLOSE,
413                proto::FDomainCloseRequest { handles },
414                Responder::Ignore,
415            );
416        }
417
418        self.waiting_to_close_waker = ctx.waker().clone();
419
420        loop {
421            if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
422                for state in std::mem::take(&mut self.socket_read_states).into_values() {
423                    state.wakers.into_iter().for_each(Waker::wake);
424                }
425                for (_, state) in self.channel_read_states.drain() {
426                    state.wakers.into_iter().for_each(Waker::wake);
427                }
428                return Poll::Ready(Err(e));
429            }
430            let Poll::Ready(result) = self.transport.poll_next(ctx) else {
431                return Poll::Pending;
432            };
433            let data = result?;
434            let (header, data) = match fidl_message::decode_transaction_header(&data) {
435                Ok(x) => x,
436                Err(e) => {
437                    self.transport = Transport::Error(InnerError::Protocol(e));
438                    continue;
439                }
440            };
441
442            let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
443                if let Err(e) = self.process_event(header, data) {
444                    self.transport = Transport::Error(e);
445                }
446                continue;
447            };
448
449            let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
450            match tx.handle(self, Ok((header, data))) {
451                Ok(x) => x,
452                Err(e) => {
453                    self.transport = Transport::Error(InnerError::Protocol(e));
454                    continue;
455                }
456            }
457        }
458    }
459
460    /// Process an incoming message that arose from an event rather than a transaction reply.
461    fn process_event(&mut self, header: TransactionHeader, data: &[u8]) -> Result<(), InnerError> {
462        match header.ordinal {
463            ordinals::ON_SOCKET_STREAMING_DATA => {
464                let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
465                    header, data,
466                )?;
467                let o =
468                    self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
469                        wakers: Vec::new(),
470                        queued: VecDeque::new(),
471                        is_streaming: false,
472                        read_request_pending: false,
473                    });
474                match msg.socket_message {
475                    proto::SocketMessage::Data(data) => {
476                        o.handle_incoming_message(Ok(data));
477                        Ok(())
478                    }
479                    proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
480                        if let Some(error) = error {
481                            o.handle_incoming_message(Err(Error::FDomain(*error)));
482                        }
483                        o.is_streaming = false;
484                        Ok(())
485                    }
486                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
487                }
488            }
489            ordinals::ON_CHANNEL_STREAMING_DATA => {
490                let msg = fidl_message::decode_message::<
491                    proto::ChannelOnChannelStreamingDataRequest,
492                >(header, data)?;
493                let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
494                    ChannelReadState {
495                        wakers: Vec::new(),
496                        queued: VecDeque::new(),
497                        is_streaming: false,
498                        read_request_pending: false,
499                    }
500                });
501                match msg.channel_sent {
502                    proto::ChannelSent::Message(data) => {
503                        o.handle_incoming_message(Ok(data));
504                        Ok(())
505                    }
506                    proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
507                        if let Some(error) = error {
508                            o.handle_incoming_message(Err(Error::FDomain(*error)));
509                        }
510                        o.is_streaming = false;
511                        Ok(())
512                    }
513                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
514                }
515            }
516            _ => Err(::fidl::Error::UnknownOrdinal {
517                ordinal: header.ordinal,
518                protocol_name:
519                    <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
520            }
521            .into()),
522        }
523    }
524
525    /// Polls the underlying transport to ensure any incoming or outgoing
526    /// messages are processed as far as possible. If a failure occurs, puts the
527    /// transport into an error state and fails all pending transactions.
528    fn poll_transport(&mut self, ctx: &mut Context<'_>) {
529        if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
530            for (_, v) in std::mem::take(&mut self.transactions) {
531                let _ = v.handle(self, Err(e.clone()));
532            }
533        }
534    }
535
536    /// Handles the response to a `SocketRead` protocol message.
537    pub(crate) fn handle_socket_read_response(
538        &mut self,
539        msg: Result<proto::SocketData, Error>,
540        id: proto::HandleId,
541    ) {
542        let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
543            wakers: Vec::new(),
544            queued: VecDeque::new(),
545            is_streaming: false,
546            read_request_pending: false,
547        });
548        state.handle_incoming_message(msg);
549        state.read_request_pending = false;
550    }
551
552    /// Handles the response to a `ChannelRead` protocol message.
553    pub(crate) fn handle_channel_read_response(
554        &mut self,
555        msg: Result<proto::ChannelMessage, Error>,
556        id: proto::HandleId,
557    ) {
558        let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
559            wakers: Vec::new(),
560            queued: VecDeque::new(),
561            is_streaming: false,
562            read_request_pending: false,
563        });
564        state.handle_incoming_message(msg);
565        state.read_request_pending = false;
566    }
567}
568
569impl Drop for ClientInner {
570    fn drop(&mut self) {
571        let responders = self.transactions.drain().map(|x| x.1).collect::<Vec<_>>();
572        for responder in responders {
573            let _ = responder.handle(self, Err(InnerError::Transport(None)));
574        }
575        for state in self.channel_read_states.values_mut() {
576            state.wakers.drain(..).for_each(Waker::wake);
577        }
578        for state in self.socket_read_states.values_mut() {
579            state.wakers.drain(..).for_each(Waker::wake);
580        }
581    }
582}
583
584/// Represents a connection to an FDomain.
585///
586/// The client is constructed by passing it a transport object which represents
587/// the raw connection to the remote FDomain. The `Client` wrapper then allows
588/// us to construct and use handles which behave similarly to their counterparts
589/// on a Fuchsia device.
590pub struct Client(pub(crate) Mutex<ClientInner>);
591
592impl std::fmt::Debug for Client {
593    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
594        f.debug_tuple("Client").field(&"...").finish()
595    }
596}
597
598/// A client which is always disconnected. Handles that lose their clients
599/// connect to this client instead, which always returns a "Client Lost"
600/// transport failure.
601pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
602    Arc::new(Client(Mutex::new(ClientInner {
603        transport: Transport::Error(InnerError::Transport(None)),
604        transactions: HashMap::new(),
605        channel_read_states: HashMap::new(),
606        socket_read_states: HashMap::new(),
607        next_tx_id: 1,
608        waiting_to_close: Vec::new(),
609        waiting_to_close_waker: futures::task::noop_waker(),
610    })))
611});
612
613impl Client {
614    /// Create a new FDomain client. The `transport` argument should contain the
615    /// established connection to the target, ready to communicate the FDomain
616    /// protocol.
617    ///
618    /// The second return item is a future that must be polled to keep
619    /// transactions running.
620    pub fn new(
621        transport: impl FDomainTransport + 'static,
622    ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
623        let ret = Arc::new(Client(Mutex::new(ClientInner {
624            transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
625            transactions: HashMap::new(),
626            socket_read_states: HashMap::new(),
627            channel_read_states: HashMap::new(),
628            next_tx_id: 1,
629            waiting_to_close: Vec::new(),
630            waiting_to_close_waker: futures::task::noop_waker(),
631        })));
632
633        let client_weak = Arc::downgrade(&ret);
634        let fut = futures::future::poll_fn(move |ctx| {
635            let Some(client) = client_weak.upgrade() else {
636                return Poll::Ready(());
637            };
638
639            client.0.lock().poll_transport(ctx);
640            Poll::Pending
641        });
642
643        (ret, fut)
644    }
645
646    /// Get the namespace for the connected FDomain. Calling this more than once is an error.
647    pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
648        let new_handle = self.new_hid();
649        self.transaction(
650            ordinals::GET_NAMESPACE,
651            proto::FDomainGetNamespaceRequest { new_handle },
652            Responder::Namespace,
653        )
654        .await?;
655        Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
656    }
657
658    /// Create a new channel in the connected FDomain.
659    pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
660        let id_a = self.new_hid();
661        let id_b = self.new_hid();
662        let fut = self.transaction(
663            ordinals::CREATE_CHANNEL,
664            proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
665            Responder::CreateChannel,
666        );
667
668        fuchsia_async::Task::spawn(async move {
669            if let Err(e) = fut.await {
670                log::debug!("FDomain channel creation failed: {e}");
671            }
672        })
673        .detach();
674
675        (
676            Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
677            Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
678        )
679    }
680
681    /// Creates client and server endpoints connected to by a channel.
682    pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
683        self: &Arc<Self>,
684    ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
685        let (client, server) = self.create_channel();
686        let client_end = crate::fidl::ClientEnd::<F>::new(client);
687        let server_end = crate::fidl::ServerEnd::new(server);
688        (client_end, server_end)
689    }
690
691    /// Creates a client proxy and a server endpoint connected by a channel.
692    pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
693        self: &Arc<Self>,
694    ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
695        let (client_end, server_end) = self.create_endpoints::<F>();
696        (client_end.into_proxy(), server_end)
697    }
698
699    /// Creates a client proxy and a server request stream connected by a channel.
700    pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
701        self: &Arc<Self>,
702    ) -> (F::Proxy, F::RequestStream) {
703        let (client_end, server_end) = self.create_endpoints::<F>();
704        (client_end.into_proxy(), server_end.into_stream())
705    }
706
707    /// Create a new socket in the connected FDomain.
708    fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
709        let id_a = self.new_hid();
710        let id_b = self.new_hid();
711        let fut = self.transaction(
712            ordinals::CREATE_SOCKET,
713            proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
714            Responder::CreateSocket,
715        );
716
717        fuchsia_async::Task::spawn(async move {
718            if let Err(e) = fut.await {
719                log::debug!("FDomain socket creation failed: {e}");
720            }
721        })
722        .detach();
723
724        (
725            Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
726            Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
727        )
728    }
729
730    /// Create a new streaming socket in the connected FDomain.
731    pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
732        self.create_socket(proto::SocketType::Stream)
733    }
734
735    /// Create a new datagram socket in the connected FDomain.
736    pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
737        self.create_socket(proto::SocketType::Datagram)
738    }
739
740    /// Create a new event pair in the connected FDomain.
741    pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
742        let id_a = self.new_hid();
743        let id_b = self.new_hid();
744        let fut = self.transaction(
745            ordinals::CREATE_EVENT_PAIR,
746            proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
747            Responder::CreateEventPair,
748        );
749
750        fuchsia_async::Task::spawn(async move {
751            if let Err(e) = fut.await {
752                log::debug!("FDomain event pair creation failed: {e}");
753            }
754        })
755        .detach();
756
757        (
758            EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
759            EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
760        )
761    }
762
763    /// Create a new event handle in the connected FDomain.
764    pub fn create_event(self: &Arc<Self>) -> Event {
765        let id = self.new_hid();
766        let fut = self.transaction(
767            ordinals::CREATE_EVENT,
768            proto::EventCreateEventRequest { handle: id },
769            Responder::CreateEvent,
770        );
771
772        fuchsia_async::Task::spawn(async move {
773            if let Err(e) = fut.await {
774                log::debug!("FDomain event creation failed: {e}");
775            }
776        })
777        .detach();
778
779        Event(Handle { id: id.id, client: Arc::downgrade(self) })
780    }
781
782    /// Allocate a new HID, which should be suitable for use with the connected FDomain.
783    pub(crate) fn new_hid(&self) -> proto::NewHandleId {
784        // TODO: On the target side we have to keep a table of these which means
785        // we can automatically detect collisions in the random value. On the
786        // client side we'd have to add a whole data structure just for that
787        // purpose. Should we?
788        proto::NewHandleId { id: rand::random::<u32>() >> 1 }
789    }
790
791    /// Create a future which sends a FIDL message to the connected FDomain and
792    /// waits for a response.
793    ///
794    /// Calling this method queues the transaction synchronously. Awaiting is
795    /// only necessary to wait for the response.
796    pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
797        self: &Arc<Self>,
798        ordinal: u64,
799        request: S,
800        f: F,
801    ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
802    where
803        F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
804    {
805        let mut inner = self.0.lock();
806
807        let (sender, receiver) = futures::channel::oneshot::channel();
808        inner.request(ordinal, request, f(sender));
809        receiver.map(|x| x.expect("Oneshot went away without reply!"))
810    }
811
812    /// Start getting streaming events for socket reads.
813    pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
814        let mut inner = self.0.lock();
815        if let Some(e) = inner.transport.error() {
816            return Err(e.into());
817        }
818
819        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
820            wakers: Vec::new(),
821            queued: VecDeque::new(),
822            is_streaming: false,
823            read_request_pending: false,
824        });
825
826        assert!(!state.is_streaming, "Initiated streaming twice!");
827        state.is_streaming = true;
828
829        inner.request(
830            ordinals::READ_SOCKET_STREAMING_START,
831            proto::SocketReadSocketStreamingStartRequest { handle: id },
832            Responder::Ignore,
833        );
834        Ok(())
835    }
836
837    /// Stop getting streaming events for socket reads. Doesn't return errors
838    /// because it's exclusively called in destructors where we have nothing to
839    /// do with them.
840    pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
841        let mut inner = self.0.lock();
842        if let Some(state) = inner.socket_read_states.get_mut(&id) {
843            if state.is_streaming {
844                state.is_streaming = false;
845                // TODO: Log?
846                let _ = inner.request(
847                    ordinals::READ_SOCKET_STREAMING_STOP,
848                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
849                    Responder::Ignore,
850                );
851            }
852        }
853    }
854
855    /// Start getting streaming events for socket reads.
856    pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
857        let mut inner = self.0.lock();
858        if let Some(e) = inner.transport.error() {
859            return Err(e.into());
860        }
861        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
862            wakers: Vec::new(),
863            queued: VecDeque::new(),
864            is_streaming: false,
865            read_request_pending: false,
866        });
867
868        assert!(!state.is_streaming, "Initiated streaming twice!");
869        state.is_streaming = true;
870
871        inner.request(
872            ordinals::READ_CHANNEL_STREAMING_START,
873            proto::ChannelReadChannelStreamingStartRequest { handle: id },
874            Responder::Ignore,
875        );
876
877        Ok(())
878    }
879
880    /// Stop getting streaming events for socket reads. Doesn't return errors
881    /// because it's exclusively called in destructors where we have nothing to
882    /// do with them.
883    pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
884        let mut inner = self.0.lock();
885        if let Some(state) = inner.channel_read_states.get_mut(&id) {
886            if state.is_streaming {
887                state.is_streaming = false;
888                // TODO: Log?
889                let _ = inner.request(
890                    ordinals::READ_CHANNEL_STREAMING_STOP,
891                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
892                    Responder::Ignore,
893                );
894            }
895        }
896    }
897
898    /// Execute a read from a channel.
899    pub(crate) fn poll_socket(
900        &self,
901        id: proto::HandleId,
902        ctx: &mut Context<'_>,
903        out: &mut [u8],
904    ) -> Poll<Result<usize, Error>> {
905        let mut inner = self.0.lock();
906        if let Some(error) = inner.transport.error() {
907            return Poll::Ready(Err(error.into()));
908        }
909
910        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
911            wakers: Vec::new(),
912            queued: VecDeque::new(),
913            is_streaming: false,
914            read_request_pending: false,
915        });
916
917        if let Some(got) = state.queued.front_mut() {
918            match got.as_mut() {
919                Ok(data) => {
920                    let read_size = std::cmp::min(data.data.len(), out.len());
921                    out[..read_size].copy_from_slice(&data.data[..read_size]);
922
923                    if data.data.len() > read_size && !data.is_datagram {
924                        let _ = data.data.drain(..read_size);
925                    } else {
926                        let _ = state.queued.pop_front();
927                    }
928
929                    return Poll::Ready(Ok(read_size));
930                }
931                Err(_) => {
932                    let err = state.queued.pop_front().unwrap().unwrap_err();
933                    return Poll::Ready(Err(err));
934                }
935            }
936        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
937            state.wakers.push(ctx.waker().clone());
938        }
939
940        if !state.read_request_pending && !state.is_streaming {
941            inner.request(
942                ordinals::READ_SOCKET,
943                proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
944                Responder::ReadSocket(id),
945            );
946        }
947
948        Poll::Pending
949    }
950
951    /// Execute a read from a channel.
952    pub(crate) fn poll_channel(
953        &self,
954        id: proto::HandleId,
955        ctx: &mut Context<'_>,
956        for_stream: bool,
957    ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
958        let mut inner = self.0.lock();
959        if let Some(error) = inner.transport.error() {
960            return Poll::Ready(Some(Err(error.into())));
961        }
962
963        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
964            wakers: Vec::new(),
965            queued: VecDeque::new(),
966            is_streaming: false,
967            read_request_pending: false,
968        });
969
970        if let Some(got) = state.queued.pop_front() {
971            return Poll::Ready(Some(got));
972        } else if for_stream && !state.is_streaming {
973            return Poll::Ready(None);
974        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
975            state.wakers.push(ctx.waker().clone());
976        }
977
978        if !state.read_request_pending && !state.is_streaming {
979            inner.request(
980                ordinals::READ_CHANNEL,
981                proto::ChannelReadChannelRequest { handle: id },
982                Responder::ReadChannel(id),
983            );
984        }
985
986        Poll::Pending
987    }
988
989    /// Check whether this channel is streaming
990    pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
991        let inner = self.0.lock();
992        let Some(state) = inner.channel_read_states.get(&id) else {
993            return false;
994        };
995        state.is_streaming
996    }
997
998    /// Check that all the given handles are safe to transfer through a channel
999    /// e.g. that there's no chance of in-flight reads getting dropped.
1000    pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
1001        let inner = self.0.lock();
1002        match handles {
1003            proto::Handles::Handles(handles) => {
1004                for handle in handles {
1005                    assert!(
1006                        !(inner.channel_read_states.contains_key(handle)
1007                            || inner.socket_read_states.contains_key(handle)),
1008                        "Tried to transfer handle after reading"
1009                    );
1010                }
1011            }
1012            proto::Handles::Dispositions(dispositions) => {
1013                for disposition in dispositions {
1014                    match &disposition.handle {
1015                        proto::HandleOp::Move_(handle) => assert!(
1016                            !(inner.channel_read_states.contains_key(handle)
1017                                || inner.socket_read_states.contains_key(handle)),
1018                            "Tried to transfer handle after reading"
1019                        ),
1020                        // Pretty sure this should be fine regardless of read state.
1021                        proto::HandleOp::Duplicate(_) => (),
1022                    }
1023                }
1024            }
1025        }
1026    }
1027}