fdomain_client/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_message::TransactionHeader;
6use futures::FutureExt;
7use futures::channel::oneshot::Sender as OneshotSender;
8use futures::stream::Stream as StreamTrait;
9use std::collections::{HashMap, VecDeque};
10use std::convert::Infallible;
11use std::future::Future;
12use std::num::NonZeroU32;
13use std::pin::Pin;
14use std::sync::{Arc, LazyLock, Mutex};
15use std::task::{Context, Poll, Waker, ready};
16use {fidl_fuchsia_fdomain as proto, fuchsia_async as _};
17
18mod channel;
19mod event;
20mod event_pair;
21mod handle;
22mod responder;
23mod socket;
24
25#[cfg(test)]
26mod test;
27
28pub mod fidl;
29
30use responder::Responder;
31
32pub use channel::{
33    AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, MessageBuf,
34};
35pub use event::Event;
36pub use event_pair::Eventpair as EventPair;
37pub use handle::{AsHandleRef, Handle, HandleBased, HandleRef, OnFDomainSignals, Peered};
38pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
39pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
40
41// Unsupported handle types.
42#[rustfmt::skip]
43pub use Handle as Fifo;
44#[rustfmt::skip]
45pub use Handle as Job;
46#[rustfmt::skip]
47pub use Handle as Process;
48#[rustfmt::skip]
49pub use Handle as Resource;
50#[rustfmt::skip]
51pub use Handle as Stream;
52#[rustfmt::skip]
53pub use Handle as Thread;
54#[rustfmt::skip]
55pub use Handle as Vmar;
56#[rustfmt::skip]
57pub use Handle as Vmo;
58
59use proto::f_domain_ordinals as ordinals;
60
61fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62    match error {
63        FDomainError::TargetError(e) => write!(f, "Target-side error {e}"),
64        FDomainError::BadHandleId(proto::BadHandleId { id }) => {
65            write!(f, "Tried to use invalid handle id {id}")
66        }
67        FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
68            f,
69            "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
70        ),
71        FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
72            write!(f, "Handle is occupied delivering streaming reads")
73        }
74        FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
75            write!(f, "No streaming read was in progress")
76        }
77        FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
78            write!(
79                f,
80                "Tried to create a handle with id {id}, which is outside the valid range for client handles"
81            )
82        }
83        FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
84            if *same_call {
85                write!(f, "Tried to create two or more new handles with the same id {id}")
86            } else {
87                write!(
88                    f,
89                    "Tried to create a new handle with id {id}, which is already the id of an existing handle"
90                )
91            }
92        }
93        FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
94            write!(f, "Tried to write a channel into itself")
95        }
96        FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
97            write!(f, "Handle closed while being read")
98        }
99        _ => todo!(),
100    }
101}
102
103/// Result type alias.
104pub type Result<T, E = Error> = std::result::Result<T, E>;
105
106/// Error type emitted by FDomain operations.
107#[derive(Clone)]
108pub enum Error {
109    SocketWrite(WriteSocketError),
110    ChannelWrite(WriteChannelError),
111    FDomain(FDomainError),
112    Protocol(::fidl::Error),
113    ProtocolObjectTypeIncompatible,
114    ProtocolRightsIncompatible,
115    ProtocolSignalsIncompatible,
116    ProtocolStreamEventIncompatible,
117    Transport(Arc<std::io::Error>),
118    ConnectionMismatch,
119    StreamingAborted,
120}
121
122impl std::fmt::Display for Error {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        match self {
125            Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
126                write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
127                write_fdomain_error(error, f)
128            }
129            Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
130                write!(f, "While writing channel: ")?;
131                write_fdomain_error(error, f)
132            }
133            Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
134                write!(f, "Couldn't write all handles into a channel:")?;
135                for (pos, error) in
136                    errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
137                {
138                    write!(f, "\n  Handle in position {pos}: ")?;
139                    write_fdomain_error(error, f)?;
140                }
141                Ok(())
142            }
143            Self::ProtocolObjectTypeIncompatible => {
144                write!(f, "The FDomain protocol does not recognize an object type")
145            }
146            Self::ProtocolRightsIncompatible => {
147                write!(f, "The FDomain protocol does not recognize some rights")
148            }
149            Self::ProtocolSignalsIncompatible => {
150                write!(f, "The FDomain protocol does not recognize some signals")
151            }
152            Self::ProtocolStreamEventIncompatible => {
153                write!(f, "The FDomain protocol does not recognize a received streaming IO event")
154            }
155            Self::FDomain(e) => write_fdomain_error(e, f),
156            Self::Protocol(e) => write!(f, "Protocol error: {e}"),
157            Self::Transport(e) => write!(f, "Transport error: {e:?}"),
158            Self::ConnectionMismatch => {
159                write!(f, "Tried to use an FDomain handle from a different connection")
160            }
161            Self::StreamingAborted => write!(f, "This channel is no longer streaming"),
162        }
163    }
164}
165
166impl std::fmt::Debug for Error {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        match self {
169            Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
170            Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
171            Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
172            Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
173            Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
174            Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
175            Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
176            Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
177            Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
178            Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
179            Self::StreamingAborted => write!(f, "StreamingAborted"),
180        }
181    }
182}
183
184impl std::error::Error for Error {}
185
186impl From<FDomainError> for Error {
187    fn from(other: FDomainError) -> Self {
188        Self::FDomain(other)
189    }
190}
191
192impl From<::fidl::Error> for Error {
193    fn from(other: ::fidl::Error) -> Self {
194        Self::Protocol(other)
195    }
196}
197
198impl From<WriteSocketError> for Error {
199    fn from(other: WriteSocketError) -> Self {
200        Self::SocketWrite(other)
201    }
202}
203
204impl From<WriteChannelError> for Error {
205    fn from(other: WriteChannelError) -> Self {
206        Self::ChannelWrite(other)
207    }
208}
209
210/// An error emitted internally by the client. Similar to [`Error`] but does not
211/// contain several variants which are irrelevant in the contexts where it is
212/// used.
213enum InnerError {
214    Protocol(::fidl::Error),
215    ProtocolStreamEventIncompatible,
216    Transport(Arc<std::io::Error>),
217}
218
219impl Clone for InnerError {
220    fn clone(&self) -> Self {
221        match self {
222            InnerError::Protocol(a) => InnerError::Protocol(a.clone()),
223            InnerError::ProtocolStreamEventIncompatible => {
224                InnerError::ProtocolStreamEventIncompatible
225            }
226            InnerError::Transport(a) => InnerError::Transport(Arc::clone(a)),
227        }
228    }
229}
230
231impl From<InnerError> for Error {
232    fn from(other: InnerError) -> Self {
233        match other {
234            InnerError::Protocol(p) => Error::Protocol(p),
235            InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
236            InnerError::Transport(t) => Error::Transport(t),
237        }
238    }
239}
240
241impl From<::fidl::Error> for InnerError {
242    fn from(other: ::fidl::Error) -> Self {
243        InnerError::Protocol(other)
244    }
245}
246
247// TODO(399717689) Figure out if we could just use AsyncRead/Write instead of a special trait.
248/// Implemented by objects which provide a transport over which we can speak the
249/// FDomain protocol.
250///
251/// The implementer must provide two things:
252/// 1) An incoming stream of messages presented as `Vec<u8>`. This is provided
253///    via the `Stream` trait, which this trait requires.
254/// 2) A way to send messages. This is provided by implementing the
255///    `poll_send_message` method.
256pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
257    /// Attempt to send a message asynchronously. Messages should be sent so
258    /// that they arrive at the target in order.
259    fn poll_send_message(
260        self: Pin<&mut Self>,
261        msg: &[u8],
262        ctx: &mut Context<'_>,
263    ) -> Poll<Result<(), std::io::Error>>;
264}
265
266/// Wrapper for an `FDomainTransport` implementer that:
267/// 1) Provides a queue for outgoing messages so we need not have an await point
268///    when we submit a message.
269/// 2) Drops the transport on error, then returns the last observed error for
270///    all future operations.
271enum Transport {
272    Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
273    Error(InnerError),
274}
275
276impl Transport {
277    /// Get the failure mode of the transport if it has failed.
278    fn error(&self) -> Option<InnerError> {
279        match self {
280            Transport::Transport(_, _, _) => None,
281            Transport::Error(inner_error) => Some(inner_error.clone()),
282        }
283    }
284
285    /// Enqueue a message to be sent on this transport.
286    fn push_msg(&mut self, msg: Box<[u8]>) {
287        if let Transport::Transport(_, v, w) = self {
288            v.push_back(msg);
289            w.drain(..).for_each(Waker::wake);
290        }
291    }
292
293    /// Push messages in the send queue out through the transport.
294    fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
295        match self {
296            Transport::Error(e) => Poll::Ready(e.clone()),
297            Transport::Transport(t, v, w) => {
298                while let Some(msg) = v.front() {
299                    match t.as_mut().poll_send_message(msg, ctx) {
300                        Poll::Ready(Ok(())) => {
301                            v.pop_front();
302                        }
303                        Poll::Ready(Err(e)) => {
304                            let e = Arc::new(e);
305                            *self = Transport::Error(InnerError::Transport(Arc::clone(&e)));
306                            return Poll::Ready(InnerError::Transport(e));
307                        }
308                        Poll::Pending => return Poll::Pending,
309                    }
310                }
311
312                if v.is_empty() {
313                    w.push(ctx.waker().clone());
314                } else {
315                    ctx.waker().wake_by_ref();
316                }
317                Poll::Pending
318            }
319        }
320    }
321
322    /// Get the next incoming message from the transport.
323    fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Option<Result<Box<[u8]>, InnerError>>> {
324        match self {
325            Transport::Error(e) => Poll::Ready(Some(Err(e.clone()))),
326            Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
327                Some(Ok(x)) => Poll::Ready(Some(Ok(x))),
328                Some(Err(e)) => {
329                    let e = Arc::new(e);
330                    *self = Transport::Error(InnerError::Transport(Arc::clone(&e)));
331                    Poll::Ready(Some(Err(InnerError::Transport(e))))
332                }
333                Option::None => Poll::Ready(None),
334            },
335        }
336    }
337}
338
339/// State of a socket that is or has been read from.
340struct SocketReadState {
341    wakers: Vec<Waker>,
342    queued: VecDeque<Result<proto::SocketData, Error>>,
343    read_request_pending: bool,
344    is_streaming: bool,
345}
346
347impl SocketReadState {
348    /// Handle an incoming message, which is either a channel streaming event or
349    /// response to a `ChannelRead` request.
350    fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) {
351        self.queued.push_back(msg);
352        self.wakers.drain(..).for_each(Waker::wake);
353    }
354}
355
356/// State of a channel that is or has been read from.
357struct ChannelReadState {
358    wakers: Vec<Waker>,
359    queued: VecDeque<Result<proto::ChannelMessage, Error>>,
360    read_request_pending: bool,
361    is_streaming: bool,
362}
363
364impl ChannelReadState {
365    /// Handle an incoming message, which is either a channel streaming event or
366    /// response to a `ChannelRead` request.
367    fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) {
368        self.queued.push_back(msg);
369        self.wakers.drain(..).for_each(Waker::wake);
370    }
371}
372
373/// Lock-protected interior of `Client`
374struct ClientInner {
375    transport: Transport,
376    transactions: HashMap<NonZeroU32, responder::Responder>,
377    channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
378    socket_read_states: HashMap<proto::HandleId, SocketReadState>,
379    next_tx_id: u32,
380    waiting_to_close: Vec<proto::HandleId>,
381    waiting_to_close_waker: Waker,
382}
383
384impl ClientInner {
385    /// Serialize and enqueue a new transaction, including header and transaction ID.
386    fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
387        let tx_id = self.next_tx_id;
388
389        let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
390        let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
391        self.next_tx_id += 1;
392        assert!(
393            self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
394            "Allocated same tx id twice!"
395        );
396        self.transport.push_msg(msg.into());
397    }
398
399    /// Polls the underlying transport to ensure any incoming or outgoing
400    /// messages are processed as far as possible. Errors if the transport has failed.
401    fn try_poll_transport(
402        &mut self,
403        ctx: &mut Context<'_>,
404    ) -> Poll<Result<Infallible, InnerError>> {
405        if !self.waiting_to_close.is_empty() {
406            let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
407            // We've dropped the handle object. Nobody is going to wait to read
408            // the buffers anymore. This is a safe time to drop the read state.
409            for handle in &handles {
410                let _ = self.channel_read_states.remove(handle);
411                let _ = self.socket_read_states.remove(handle);
412            }
413            self.request(
414                ordinals::CLOSE,
415                proto::FDomainCloseRequest { handles },
416                Responder::Ignore,
417            );
418        }
419
420        self.waiting_to_close_waker = ctx.waker().clone();
421
422        loop {
423            if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
424                for state in std::mem::take(&mut self.socket_read_states).into_values() {
425                    state.wakers.into_iter().for_each(Waker::wake);
426                }
427                for (_, state) in self.channel_read_states.drain() {
428                    state.wakers.into_iter().for_each(Waker::wake);
429                }
430                return Poll::Ready(Err(e));
431            }
432            let Poll::Ready(Some(result)) = self.transport.poll_next(ctx) else {
433                return Poll::Pending;
434            };
435            let data = result?;
436            let (header, data) = match fidl_message::decode_transaction_header(&data) {
437                Ok(x) => x,
438                Err(e) => {
439                    self.transport = Transport::Error(InnerError::Protocol(e));
440                    continue;
441                }
442            };
443
444            let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
445                if let Err(e) = self.process_event(header, data) {
446                    self.transport = Transport::Error(e);
447                }
448                continue;
449            };
450
451            let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
452            match tx.handle(self, Ok((header, data))) {
453                Ok(x) => x,
454                Err(e) => {
455                    self.transport = Transport::Error(InnerError::Protocol(e));
456                    continue;
457                }
458            }
459        }
460    }
461
462    /// Process an incoming message that arose from an event rather than a transaction reply.
463    fn process_event(&mut self, header: TransactionHeader, data: &[u8]) -> Result<(), InnerError> {
464        match header.ordinal {
465            ordinals::ON_SOCKET_STREAMING_DATA => {
466                let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
467                    header, data,
468                )?;
469                let o =
470                    self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
471                        wakers: Vec::new(),
472                        queued: VecDeque::new(),
473                        is_streaming: false,
474                        read_request_pending: false,
475                    });
476                match msg.socket_message {
477                    proto::SocketMessage::Data(data) => {
478                        o.handle_incoming_message(Ok(data));
479                        Ok(())
480                    }
481                    proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
482                        if let Some(error) = error {
483                            o.handle_incoming_message(Err(Error::FDomain(*error)));
484                        }
485                        o.is_streaming = false;
486                        Ok(())
487                    }
488                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
489                }
490            }
491            ordinals::ON_CHANNEL_STREAMING_DATA => {
492                let msg = fidl_message::decode_message::<
493                    proto::ChannelOnChannelStreamingDataRequest,
494                >(header, data)?;
495                let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
496                    ChannelReadState {
497                        wakers: Vec::new(),
498                        queued: VecDeque::new(),
499                        is_streaming: false,
500                        read_request_pending: false,
501                    }
502                });
503                match msg.channel_sent {
504                    proto::ChannelSent::Message(data) => {
505                        o.handle_incoming_message(Ok(data));
506                        Ok(())
507                    }
508                    proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
509                        if let Some(error) = error {
510                            o.handle_incoming_message(Err(Error::FDomain(*error)));
511                        }
512                        o.is_streaming = false;
513                        Ok(())
514                    }
515                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
516                }
517            }
518            _ => Err(::fidl::Error::UnknownOrdinal {
519                ordinal: header.ordinal,
520                protocol_name:
521                    <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
522            }
523            .into()),
524        }
525    }
526
527    /// Polls the underlying transport to ensure any incoming or outgoing
528    /// messages are processed as far as possible. If a failure occurs, puts the
529    /// transport into an error state and fails all pending transactions.
530    fn poll_transport(&mut self, ctx: &mut Context<'_>) {
531        if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
532            for (_, v) in std::mem::take(&mut self.transactions) {
533                let _ = v.handle(self, Err(e.clone()));
534            }
535        }
536    }
537
538    /// Handles the response to a `SocketRead` protocol message.
539    pub(crate) fn handle_socket_read_response(
540        &mut self,
541        msg: Result<proto::SocketData, Error>,
542        id: proto::HandleId,
543    ) {
544        let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
545            wakers: Vec::new(),
546            queued: VecDeque::new(),
547            is_streaming: false,
548            read_request_pending: false,
549        });
550        state.handle_incoming_message(msg);
551        state.read_request_pending = false;
552    }
553
554    /// Handles the response to a `ChannelRead` protocol message.
555    pub(crate) fn handle_channel_read_response(
556        &mut self,
557        msg: Result<proto::ChannelMessage, Error>,
558        id: proto::HandleId,
559    ) {
560        let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
561            wakers: Vec::new(),
562            queued: VecDeque::new(),
563            is_streaming: false,
564            read_request_pending: false,
565        });
566        state.handle_incoming_message(msg);
567        state.read_request_pending = false;
568    }
569}
570
571/// Represents a connection to an FDomain.
572///
573/// The client is constructed by passing it a transport object which represents
574/// the raw connection to the remote FDomain. The `Client` wrapper then allows
575/// us to construct and use handles which behave similarly to their counterparts
576/// on a Fuchsia device.
577pub struct Client(Mutex<ClientInner>);
578
579impl std::fmt::Debug for Client {
580    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
581        f.debug_tuple("Client").field(&"...").finish()
582    }
583}
584
585/// A client which is always disconnected. Handles that lose their clients
586/// connect to this client instead, which always returns a "Client Lost"
587/// transport failure.
588pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
589    Arc::new(Client(Mutex::new(ClientInner {
590        transport: Transport::Error(InnerError::Transport(Arc::new(std::io::Error::other(
591            "Client Lost",
592        )))),
593        transactions: HashMap::new(),
594        channel_read_states: HashMap::new(),
595        socket_read_states: HashMap::new(),
596        next_tx_id: 1,
597        waiting_to_close: Vec::new(),
598        waiting_to_close_waker: futures::task::noop_waker(),
599    })))
600});
601
602impl Client {
603    /// Create a new FDomain client. The `transport` argument should contain the
604    /// established connection to the target, ready to communicate the FDomain
605    /// protocol.
606    ///
607    /// The second return item is a future that must be polled to keep
608    /// transactions running.
609    pub fn new(
610        transport: impl FDomainTransport + 'static,
611    ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
612        let ret = Arc::new(Client(Mutex::new(ClientInner {
613            transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
614            transactions: HashMap::new(),
615            socket_read_states: HashMap::new(),
616            channel_read_states: HashMap::new(),
617            next_tx_id: 1,
618            waiting_to_close: Vec::new(),
619            waiting_to_close_waker: futures::task::noop_waker(),
620        })));
621
622        let client_weak = Arc::downgrade(&ret);
623        let fut = futures::future::poll_fn(move |ctx| {
624            let Some(client) = client_weak.upgrade() else {
625                return Poll::Ready(());
626            };
627
628            client.0.lock().unwrap().poll_transport(ctx);
629            Poll::Pending
630        });
631
632        (ret, fut)
633    }
634
635    /// Get the namespace for the connected FDomain. Calling this more than once is an error.
636    pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
637        let new_handle = self.new_hid();
638        self.transaction(
639            ordinals::GET_NAMESPACE,
640            proto::FDomainGetNamespaceRequest { new_handle },
641            Responder::Namespace,
642        )
643        .await?;
644        Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
645    }
646
647    /// Create a new channel in the connected FDomain.
648    pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
649        let id_a = self.new_hid();
650        let id_b = self.new_hid();
651        let fut = self.transaction(
652            ordinals::CREATE_CHANNEL,
653            proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
654            Responder::CreateChannel,
655        );
656
657        fuchsia_async::Task::spawn(async move {
658            if let Err(e) = fut.await {
659                log::debug!("FDomain channel creation failed: {e}");
660            }
661        })
662        .detach();
663
664        (
665            Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
666            Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
667        )
668    }
669
670    /// Creates client and server endpoints connected to by a channel.
671    pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
672        self: &Arc<Self>,
673    ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
674        let (client, server) = self.create_channel();
675        let client_end = crate::fidl::ClientEnd::<F>::new(client);
676        let server_end = crate::fidl::ServerEnd::new(server);
677        (client_end, server_end)
678    }
679
680    /// Creates a client proxy and a server endpoint connected by a channel.
681    pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
682        self: &Arc<Self>,
683    ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
684        let (client_end, server_end) = self.create_endpoints::<F>();
685        (client_end.into_proxy(), server_end)
686    }
687
688    /// Creates a client proxy and a server request stream connected by a channel.
689    pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
690        self: &Arc<Self>,
691    ) -> (F::Proxy, F::RequestStream) {
692        let (client_end, server_end) = self.create_endpoints::<F>();
693        (client_end.into_proxy(), server_end.into_stream())
694    }
695
696    /// Create a new socket in the connected FDomain.
697    fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
698        let id_a = self.new_hid();
699        let id_b = self.new_hid();
700        let fut = self.transaction(
701            ordinals::CREATE_SOCKET,
702            proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
703            Responder::CreateSocket,
704        );
705
706        fuchsia_async::Task::spawn(async move {
707            if let Err(e) = fut.await {
708                log::debug!("FDomain socket creation failed: {e}");
709            }
710        })
711        .detach();
712
713        (
714            Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
715            Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
716        )
717    }
718
719    /// Create a new streaming socket in the connected FDomain.
720    pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
721        self.create_socket(proto::SocketType::Stream)
722    }
723
724    /// Create a new datagram socket in the connected FDomain.
725    pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
726        self.create_socket(proto::SocketType::Datagram)
727    }
728
729    /// Create a new event pair in the connected FDomain.
730    pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
731        let id_a = self.new_hid();
732        let id_b = self.new_hid();
733        let fut = self.transaction(
734            ordinals::CREATE_EVENT_PAIR,
735            proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
736            Responder::CreateEventPair,
737        );
738
739        fuchsia_async::Task::spawn(async move {
740            if let Err(e) = fut.await {
741                log::debug!("FDomain event pair creation failed: {e}");
742            }
743        })
744        .detach();
745
746        (
747            EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
748            EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
749        )
750    }
751
752    /// Create a new event handle in the connected FDomain.
753    pub fn create_event(self: &Arc<Self>) -> Event {
754        let id = self.new_hid();
755        let fut = self.transaction(
756            ordinals::CREATE_EVENT,
757            proto::EventCreateEventRequest { handle: id },
758            Responder::CreateEvent,
759        );
760
761        fuchsia_async::Task::spawn(async move {
762            if let Err(e) = fut.await {
763                log::debug!("FDomain event creation failed: {e}");
764            }
765        })
766        .detach();
767
768        Event(Handle { id: id.id, client: Arc::downgrade(self) })
769    }
770
771    /// Allocate a new HID, which should be suitable for use with the connected FDomain.
772    pub(crate) fn new_hid(&self) -> proto::NewHandleId {
773        // TODO: On the target side we have to keep a table of these which means
774        // we can automatically detect collisions in the random value. On the
775        // client side we'd have to add a whole data structure just for that
776        // purpose. Should we?
777        proto::NewHandleId { id: rand::random::<u32>() >> 1 }
778    }
779
780    /// Create a future which sends a FIDL message to the connected FDomain and
781    /// waits for a response.
782    ///
783    /// Calling this method queues the transaction synchronously. Awaiting is
784    /// only necessary to wait for the response.
785    pub(crate) fn transaction<S: fidl_message::Body, R: 'static>(
786        self: &Arc<Self>,
787        ordinal: u64,
788        request: S,
789        f: impl Fn(OneshotSender<Result<R, Error>>) -> Responder,
790    ) -> impl Future<Output = Result<R, Error>> + 'static {
791        let mut inner = self.0.lock().unwrap();
792
793        let (sender, receiver) = futures::channel::oneshot::channel();
794        inner.request(ordinal, request, f(sender));
795        receiver.map(|x| x.expect("Oneshot went away without reply!"))
796    }
797
798    /// Start getting streaming events for socket reads.
799    pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
800        let mut inner = self.0.lock().unwrap();
801        if let Some(e) = inner.transport.error() {
802            return Err(e.into());
803        }
804
805        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
806            wakers: Vec::new(),
807            queued: VecDeque::new(),
808            is_streaming: false,
809            read_request_pending: false,
810        });
811
812        assert!(!state.is_streaming, "Initiated streaming twice!");
813        state.is_streaming = true;
814
815        inner.request(
816            ordinals::READ_SOCKET_STREAMING_START,
817            proto::SocketReadSocketStreamingStartRequest { handle: id },
818            Responder::Ignore,
819        );
820        Ok(())
821    }
822
823    /// Stop getting streaming events for socket reads. Doesn't return errors
824    /// because it's exclusively called in destructors where we have nothing to
825    /// do with them.
826    pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
827        let mut inner = self.0.lock().unwrap();
828        if let Some(state) = inner.socket_read_states.get_mut(&id) {
829            if state.is_streaming {
830                state.is_streaming = false;
831                // TODO: Log?
832                let _ = inner.request(
833                    ordinals::READ_SOCKET_STREAMING_STOP,
834                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
835                    Responder::Ignore,
836                );
837            }
838        }
839    }
840
841    /// Start getting streaming events for socket reads.
842    pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
843        let mut inner = self.0.lock().unwrap();
844        if let Some(e) = inner.transport.error() {
845            return Err(e.into());
846        }
847        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
848            wakers: Vec::new(),
849            queued: VecDeque::new(),
850            is_streaming: false,
851            read_request_pending: false,
852        });
853
854        assert!(!state.is_streaming, "Initiated streaming twice!");
855        state.is_streaming = true;
856
857        inner.request(
858            ordinals::READ_CHANNEL_STREAMING_START,
859            proto::ChannelReadChannelStreamingStartRequest { handle: id },
860            Responder::Ignore,
861        );
862
863        Ok(())
864    }
865
866    /// Stop getting streaming events for socket reads. Doesn't return errors
867    /// because it's exclusively called in destructors where we have nothing to
868    /// do with them.
869    pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
870        let mut inner = self.0.lock().unwrap();
871        if let Some(state) = inner.channel_read_states.get_mut(&id) {
872            if state.is_streaming {
873                state.is_streaming = false;
874                // TODO: Log?
875                let _ = inner.request(
876                    ordinals::READ_CHANNEL_STREAMING_STOP,
877                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
878                    Responder::Ignore,
879                );
880            }
881        }
882    }
883
884    /// Execute a read from a channel.
885    pub(crate) fn poll_socket(
886        &self,
887        id: proto::HandleId,
888        ctx: &mut Context<'_>,
889        out: &mut [u8],
890    ) -> Poll<Result<usize, Error>> {
891        let mut inner = self.0.lock().unwrap();
892        if let Some(error) = inner.transport.error() {
893            return Poll::Ready(Err(error.into()));
894        }
895
896        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
897            wakers: Vec::new(),
898            queued: VecDeque::new(),
899            is_streaming: false,
900            read_request_pending: false,
901        });
902
903        if let Some(got) = state.queued.front_mut() {
904            match got.as_mut() {
905                Ok(data) => {
906                    let read_size = std::cmp::min(data.data.len(), out.len());
907                    out[..read_size].copy_from_slice(&data.data[..read_size]);
908
909                    if data.data.len() > read_size && !data.is_datagram {
910                        let _ = data.data.drain(..read_size);
911                    } else {
912                        let _ = state.queued.pop_front();
913                    }
914
915                    return Poll::Ready(Ok(read_size));
916                }
917                Err(_) => {
918                    let err = state.queued.pop_front().unwrap().unwrap_err();
919                    return Poll::Ready(Err(err));
920                }
921            }
922        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
923            state.wakers.push(ctx.waker().clone());
924        }
925
926        if !state.read_request_pending && !state.is_streaming {
927            inner.request(
928                ordinals::READ_SOCKET,
929                proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
930                Responder::ReadSocket(id),
931            );
932        }
933
934        Poll::Pending
935    }
936
937    /// Execute a read from a channel.
938    pub(crate) fn poll_channel(
939        &self,
940        id: proto::HandleId,
941        ctx: &mut Context<'_>,
942        for_stream: bool,
943    ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
944        let mut inner = self.0.lock().unwrap();
945        if let Some(error) = inner.transport.error() {
946            return Poll::Ready(Some(Err(error.into())));
947        }
948
949        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
950            wakers: Vec::new(),
951            queued: VecDeque::new(),
952            is_streaming: false,
953            read_request_pending: false,
954        });
955
956        if let Some(got) = state.queued.pop_front() {
957            return Poll::Ready(Some(got));
958        } else if for_stream && !state.is_streaming {
959            return Poll::Ready(None);
960        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
961            state.wakers.push(ctx.waker().clone());
962        }
963
964        if !state.read_request_pending && !state.is_streaming {
965            inner.request(
966                ordinals::READ_CHANNEL,
967                proto::ChannelReadChannelRequest { handle: id },
968                Responder::ReadChannel(id),
969            );
970        }
971
972        Poll::Pending
973    }
974
975    /// Check whether this channel is streaming
976    pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
977        let inner = self.0.lock().unwrap();
978        let Some(state) = inner.channel_read_states.get(&id) else {
979            return false;
980        };
981        state.is_streaming
982    }
983
984    /// Check that all the given handles are safe to transfer through a channel
985    /// e.g. that there's no chance of in-flight reads getting dropped.
986    pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
987        let inner = self.0.lock().unwrap();
988        match handles {
989            proto::Handles::Handles(handles) => {
990                for handle in handles {
991                    assert!(
992                        !(inner.channel_read_states.contains_key(handle)
993                            || inner.socket_read_states.contains_key(handle)),
994                        "Tried to transfer handle after reading"
995                    );
996                }
997            }
998            proto::Handles::Dispositions(dispositions) => {
999                for disposition in dispositions {
1000                    match &disposition.handle {
1001                        proto::HandleOp::Move_(handle) => assert!(
1002                            !(inner.channel_read_states.contains_key(handle)
1003                                || inner.socket_read_states.contains_key(handle)),
1004                            "Tried to transfer handle after reading"
1005                        ),
1006                        // Pretty sure this should be fine regardless of read state.
1007                        proto::HandleOp::Duplicate(_) => (),
1008                    }
1009                }
1010            }
1011        }
1012    }
1013}