Skip to main content

fdomain_client/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_message::TransactionHeader;
6use fuchsia_sync::Mutex;
7use futures::FutureExt;
8use futures::channel::oneshot::Sender as OneshotSender;
9use futures::stream::Stream as StreamTrait;
10use std::collections::{HashMap, VecDeque};
11use std::convert::Infallible;
12use std::future::Future;
13use std::num::NonZeroU32;
14use std::pin::Pin;
15use std::sync::{Arc, LazyLock};
16use std::task::{Context, Poll, Waker, ready};
17use {fidl_fuchsia_fdomain as proto, fuchsia_async as _};
18
19mod channel;
20mod event;
21mod event_pair;
22mod handle;
23mod responder;
24mod socket;
25
26#[cfg(test)]
27mod test;
28
29pub mod fidl;
30pub mod fidl_next;
31
32use responder::Responder;
33
34pub use channel::{
35    AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
36};
37pub use event::Event;
38pub use event_pair::Eventpair as EventPair;
39pub use handle::unowned::Unowned;
40pub use handle::{
41    AsHandleRef, Handle, HandleBased, HandleRef, NullableHandle, OnFDomainSignals, Peered,
42};
43pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
44pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
45
46// Unsupported handle types.
47#[rustfmt::skip]
48pub use Handle as Clock;
49#[rustfmt::skip]
50pub use Handle as Fifo;
51#[rustfmt::skip]
52pub use Handle as Job;
53#[rustfmt::skip]
54pub use Handle as Process;
55#[rustfmt::skip]
56pub use Handle as Resource;
57#[rustfmt::skip]
58pub use Handle as Stream;
59#[rustfmt::skip]
60pub use Handle as Thread;
61#[rustfmt::skip]
62pub use Handle as Vmar;
63#[rustfmt::skip]
64pub use Handle as Vmo;
65
66use proto::f_domain_ordinals as ordinals;
67
68fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69    match error {
70        FDomainError::TargetError(e) => {
71            let e = zx_status::Status::from_raw(*e);
72            write!(f, "Target-side error {e}")
73        }
74        FDomainError::BadHandleId(proto::BadHandleId { id }) => {
75            write!(f, "Tried to use invalid handle id {id}")
76        }
77        FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
78            f,
79            "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
80        ),
81        FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
82            write!(f, "Handle is occupied delivering streaming reads")
83        }
84        FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
85            write!(f, "No streaming read was in progress")
86        }
87        FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
88            write!(
89                f,
90                "Tried to create a handle with id {id}, which is outside the valid range for client handles"
91            )
92        }
93        FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
94            if *same_call {
95                write!(f, "Tried to create two or more new handles with the same id {id}")
96            } else {
97                write!(
98                    f,
99                    "Tried to create a new handle with id {id}, which is already the id of an existing handle"
100                )
101            }
102        }
103        FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
104            write!(f, "Tried to write a channel into itself")
105        }
106        FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
107            write!(f, "Handle closed while being read")
108        }
109        _ => todo!(),
110    }
111}
112
113/// Result type alias.
114pub type Result<T, E = Error> = std::result::Result<T, E>;
115
116/// Error type emitted by FDomain operations.
117#[derive(Clone)]
118pub enum Error {
119    SocketWrite(WriteSocketError),
120    ChannelWrite(WriteChannelError),
121    FDomain(FDomainError),
122    Protocol(::fidl::Error),
123    ProtocolObjectTypeIncompatible,
124    ProtocolRightsIncompatible,
125    ProtocolSignalsIncompatible,
126    ProtocolStreamEventIncompatible,
127    Transport(Option<Arc<std::io::Error>>),
128    ConnectionMismatch,
129    StreamingAborted,
130}
131
132impl std::fmt::Display for Error {
133    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134        match self {
135            Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
136                write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
137                write_fdomain_error(error, f)
138            }
139            Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
140                write!(f, "While writing channel: ")?;
141                write_fdomain_error(error, f)
142            }
143            Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
144                write!(f, "Couldn't write all handles into a channel:")?;
145                for (pos, error) in
146                    errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
147                {
148                    write!(f, "\n  Handle in position {pos}: ")?;
149                    write_fdomain_error(error, f)?;
150                }
151                Ok(())
152            }
153            Self::ProtocolObjectTypeIncompatible => {
154                write!(f, "The FDomain protocol does not recognize an object type")
155            }
156            Self::ProtocolRightsIncompatible => {
157                write!(f, "The FDomain protocol does not recognize some rights")
158            }
159            Self::ProtocolSignalsIncompatible => {
160                write!(f, "The FDomain protocol does not recognize some signals")
161            }
162            Self::ProtocolStreamEventIncompatible => {
163                write!(f, "The FDomain protocol does not recognize a received streaming IO event")
164            }
165            Self::FDomain(e) => write_fdomain_error(e, f),
166            Self::Protocol(e) => write!(f, "Protocol error: {e}"),
167            Self::Transport(Some(e)) => write!(f, "Transport error: {e:?}"),
168            Self::Transport(None) => write!(f, "Transport closed"),
169            Self::ConnectionMismatch => {
170                write!(f, "Tried to use an FDomain handle from a different connection")
171            }
172            Self::StreamingAborted => write!(f, "This channel is no longer streaming"),
173        }
174    }
175}
176
177impl std::fmt::Debug for Error {
178    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179        match self {
180            Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
181            Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
182            Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
183            Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
184            Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
185            Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
186            Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
187            Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
188            Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
189            Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
190            Self::StreamingAborted => write!(f, "StreamingAborted"),
191        }
192    }
193}
194
195impl std::error::Error for Error {}
196
197impl From<FDomainError> for Error {
198    fn from(other: FDomainError) -> Self {
199        Self::FDomain(other)
200    }
201}
202
203impl From<::fidl::Error> for Error {
204    fn from(other: ::fidl::Error) -> Self {
205        Self::Protocol(other)
206    }
207}
208
209impl From<WriteSocketError> for Error {
210    fn from(other: WriteSocketError) -> Self {
211        Self::SocketWrite(other)
212    }
213}
214
215impl From<WriteChannelError> for Error {
216    fn from(other: WriteChannelError) -> Self {
217        Self::ChannelWrite(other)
218    }
219}
220
221/// An error emitted internally by the client. Similar to [`Error`] but does not
222/// contain several variants which are irrelevant in the contexts where it is
223/// used.
224#[derive(Clone)]
225enum InnerError {
226    Protocol(::fidl::Error),
227    ProtocolStreamEventIncompatible,
228    Transport(Option<Arc<std::io::Error>>),
229}
230
231impl From<InnerError> for Error {
232    fn from(other: InnerError) -> Self {
233        match other {
234            InnerError::Protocol(p) => Error::Protocol(p),
235            InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
236            InnerError::Transport(t) => Error::Transport(t),
237        }
238    }
239}
240
241impl From<::fidl::Error> for InnerError {
242    fn from(other: ::fidl::Error) -> Self {
243        InnerError::Protocol(other)
244    }
245}
246
247// TODO(399717689) Figure out if we could just use AsyncRead/Write instead of a special trait.
248/// Implemented by objects which provide a transport over which we can speak the
249/// FDomain protocol.
250///
251/// The implementer must provide two things:
252/// 1) An incoming stream of messages presented as `Vec<u8>`. This is provided
253///    via the `Stream` trait, which this trait requires.
254/// 2) A way to send messages. This is provided by implementing the
255///    `poll_send_message` method.
256pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
257    /// Attempt to send a message asynchronously. Messages should be sent so
258    /// that they arrive at the target in order.
259    fn poll_send_message(
260        self: Pin<&mut Self>,
261        msg: &[u8],
262        ctx: &mut Context<'_>,
263    ) -> Poll<Result<(), Option<std::io::Error>>>;
264}
265
266/// Wrapper for an `FDomainTransport` implementer that:
267/// 1) Provides a queue for outgoing messages so we need not have an await point
268///    when we submit a message.
269/// 2) Drops the transport on error, then returns the last observed error for
270///    all future operations.
271enum Transport {
272    Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
273    Error(InnerError),
274}
275
276impl Transport {
277    /// Get the failure mode of the transport if it has failed.
278    fn error(&self) -> Option<InnerError> {
279        match self {
280            Transport::Transport(_, _, _) => None,
281            Transport::Error(inner_error) => Some(inner_error.clone()),
282        }
283    }
284
285    /// Enqueue a message to be sent on this transport.
286    fn push_msg(&mut self, msg: Box<[u8]>) {
287        if let Transport::Transport(_, v, w) = self {
288            v.push_back(msg);
289            w.drain(..).for_each(Waker::wake);
290        }
291    }
292
293    /// Push messages in the send queue out through the transport.
294    fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
295        match self {
296            Transport::Error(e) => Poll::Ready(e.clone()),
297            Transport::Transport(t, v, w) => {
298                while let Some(msg) = v.front() {
299                    match t.as_mut().poll_send_message(msg, ctx) {
300                        Poll::Ready(Ok(())) => {
301                            v.pop_front();
302                        }
303                        Poll::Ready(Err(e)) => {
304                            let e = e.map(Arc::new);
305                            *self = Transport::Error(InnerError::Transport(e.clone()));
306                            return Poll::Ready(InnerError::Transport(e));
307                        }
308                        Poll::Pending => return Poll::Pending,
309                    }
310                }
311
312                if v.is_empty() {
313                    w.push(ctx.waker().clone());
314                } else {
315                    ctx.waker().wake_by_ref();
316                }
317                Poll::Pending
318            }
319        }
320    }
321
322    /// Get the next incoming message from the transport.
323    fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Result<Box<[u8]>, InnerError>> {
324        match self {
325            Transport::Error(e) => Poll::Ready(Err(e.clone())),
326            Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
327                Some(Ok(x)) => Poll::Ready(Ok(x)),
328                Some(Err(e)) => {
329                    let e = Arc::new(e);
330                    *self = Transport::Error(InnerError::Transport(Some(Arc::clone(&e))));
331                    Poll::Ready(Err(InnerError::Transport(Some(e))))
332                }
333                Option::None => Poll::Ready(Err(InnerError::Transport(None))),
334            },
335        }
336    }
337}
338
339/// State of a socket that is or has been read from.
340struct SocketReadState {
341    wakers: Vec<Waker>,
342    queued: VecDeque<Result<proto::SocketData, Error>>,
343    read_request_pending: bool,
344    is_streaming: bool,
345}
346
347impl SocketReadState {
348    /// Handle an incoming message, which is either a channel streaming event or
349    /// response to a `ChannelRead` request.
350    fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) {
351        self.queued.push_back(msg);
352        self.wakers.drain(..).for_each(Waker::wake);
353    }
354}
355
356/// State of a channel that is or has been read from.
357struct ChannelReadState {
358    wakers: Vec<Waker>,
359    queued: VecDeque<Result<proto::ChannelMessage, Error>>,
360    read_request_pending: bool,
361    is_streaming: bool,
362}
363
364impl ChannelReadState {
365    /// Handle an incoming message, which is either a channel streaming event or
366    /// response to a `ChannelRead` request.
367    fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) {
368        self.queued.push_back(msg);
369        self.wakers.drain(..).for_each(Waker::wake);
370    }
371}
372
373/// Lock-protected interior of `Client`
374struct ClientInner {
375    transport: Transport,
376    transactions: HashMap<NonZeroU32, responder::Responder>,
377    channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
378    socket_read_states: HashMap<proto::HandleId, SocketReadState>,
379    next_tx_id: u32,
380    waiting_to_close: Vec<proto::HandleId>,
381    waiting_to_close_waker: Waker,
382}
383
384impl ClientInner {
385    /// Serialize and enqueue a new transaction, including header and transaction ID.
386    fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
387        let tx_id = self.next_tx_id;
388
389        let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
390        let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
391        self.next_tx_id += 1;
392        assert!(
393            self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
394            "Allocated same tx id twice!"
395        );
396        self.transport.push_msg(msg.into());
397    }
398
399    /// Polls the underlying transport to ensure any incoming or outgoing
400    /// messages are processed as far as possible. Errors if the transport has failed.
401    fn try_poll_transport(
402        &mut self,
403        ctx: &mut Context<'_>,
404    ) -> Poll<Result<Infallible, InnerError>> {
405        if !self.waiting_to_close.is_empty() {
406            let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
407            // We've dropped the handle object. Nobody is going to wait to read
408            // the buffers anymore. This is a safe time to drop the read state.
409            for handle in &handles {
410                let _ = self.channel_read_states.remove(handle);
411                let _ = self.socket_read_states.remove(handle);
412            }
413            self.request(
414                ordinals::CLOSE,
415                proto::FDomainCloseRequest { handles },
416                Responder::Ignore,
417            );
418        }
419
420        self.waiting_to_close_waker = ctx.waker().clone();
421
422        loop {
423            if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
424                for state in std::mem::take(&mut self.socket_read_states).into_values() {
425                    state.wakers.into_iter().for_each(Waker::wake);
426                }
427                for (_, state) in self.channel_read_states.drain() {
428                    state.wakers.into_iter().for_each(Waker::wake);
429                }
430                return Poll::Ready(Err(e));
431            }
432            let Poll::Ready(result) = self.transport.poll_next(ctx) else {
433                return Poll::Pending;
434            };
435            let data = result?;
436            let (header, data) = match fidl_message::decode_transaction_header(&data) {
437                Ok(x) => x,
438                Err(e) => {
439                    self.transport = Transport::Error(InnerError::Protocol(e));
440                    continue;
441                }
442            };
443
444            let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
445                if let Err(e) = self.process_event(header, data) {
446                    self.transport = Transport::Error(e);
447                }
448                continue;
449            };
450
451            let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
452            match tx.handle(self, Ok((header, data))) {
453                Ok(x) => x,
454                Err(e) => {
455                    self.transport = Transport::Error(InnerError::Protocol(e));
456                    continue;
457                }
458            }
459        }
460    }
461
462    /// Process an incoming message that arose from an event rather than a transaction reply.
463    fn process_event(&mut self, header: TransactionHeader, data: &[u8]) -> Result<(), InnerError> {
464        match header.ordinal {
465            ordinals::ON_SOCKET_STREAMING_DATA => {
466                let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
467                    header, data,
468                )?;
469                let o =
470                    self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
471                        wakers: Vec::new(),
472                        queued: VecDeque::new(),
473                        is_streaming: false,
474                        read_request_pending: false,
475                    });
476                match msg.socket_message {
477                    proto::SocketMessage::Data(data) => {
478                        o.handle_incoming_message(Ok(data));
479                        Ok(())
480                    }
481                    proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
482                        if let Some(error) = error {
483                            o.handle_incoming_message(Err(Error::FDomain(*error)));
484                        }
485                        o.is_streaming = false;
486                        Ok(())
487                    }
488                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
489                }
490            }
491            ordinals::ON_CHANNEL_STREAMING_DATA => {
492                let msg = fidl_message::decode_message::<
493                    proto::ChannelOnChannelStreamingDataRequest,
494                >(header, data)?;
495                let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
496                    ChannelReadState {
497                        wakers: Vec::new(),
498                        queued: VecDeque::new(),
499                        is_streaming: false,
500                        read_request_pending: false,
501                    }
502                });
503                match msg.channel_sent {
504                    proto::ChannelSent::Message(data) => {
505                        o.handle_incoming_message(Ok(data));
506                        Ok(())
507                    }
508                    proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
509                        if let Some(error) = error {
510                            o.handle_incoming_message(Err(Error::FDomain(*error)));
511                        }
512                        o.is_streaming = false;
513                        Ok(())
514                    }
515                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
516                }
517            }
518            _ => Err(::fidl::Error::UnknownOrdinal {
519                ordinal: header.ordinal,
520                protocol_name:
521                    <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
522            }
523            .into()),
524        }
525    }
526
527    /// Polls the underlying transport to ensure any incoming or outgoing
528    /// messages are processed as far as possible. If a failure occurs, puts the
529    /// transport into an error state and fails all pending transactions.
530    fn poll_transport(&mut self, ctx: &mut Context<'_>) -> Poll<()> {
531        if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
532            for (_, v) in std::mem::take(&mut self.transactions) {
533                let _ = v.handle(self, Err(e.clone()));
534            }
535
536            Poll::Ready(())
537        } else {
538            Poll::Pending
539        }
540    }
541
542    /// Handles the response to a `SocketRead` protocol message.
543    pub(crate) fn handle_socket_read_response(
544        &mut self,
545        msg: Result<proto::SocketData, Error>,
546        id: proto::HandleId,
547    ) {
548        let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
549            wakers: Vec::new(),
550            queued: VecDeque::new(),
551            is_streaming: false,
552            read_request_pending: false,
553        });
554        state.handle_incoming_message(msg);
555        state.read_request_pending = false;
556    }
557
558    /// Handles the response to a `ChannelRead` protocol message.
559    pub(crate) fn handle_channel_read_response(
560        &mut self,
561        msg: Result<proto::ChannelMessage, Error>,
562        id: proto::HandleId,
563    ) {
564        let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
565            wakers: Vec::new(),
566            queued: VecDeque::new(),
567            is_streaming: false,
568            read_request_pending: false,
569        });
570        state.handle_incoming_message(msg);
571        state.read_request_pending = false;
572    }
573}
574
575impl Drop for ClientInner {
576    fn drop(&mut self) {
577        let responders = self.transactions.drain().map(|x| x.1).collect::<Vec<_>>();
578        for responder in responders {
579            let _ = responder.handle(self, Err(InnerError::Transport(None)));
580        }
581        for state in self.channel_read_states.values_mut() {
582            state.wakers.drain(..).for_each(Waker::wake);
583        }
584        for state in self.socket_read_states.values_mut() {
585            state.wakers.drain(..).for_each(Waker::wake);
586        }
587    }
588}
589
590/// Represents a connection to an FDomain.
591///
592/// The client is constructed by passing it a transport object which represents
593/// the raw connection to the remote FDomain. The `Client` wrapper then allows
594/// us to construct and use handles which behave similarly to their counterparts
595/// on a Fuchsia device.
596pub struct Client(pub(crate) Mutex<ClientInner>);
597
598impl std::fmt::Debug for Client {
599    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
600        f.debug_tuple("Client").field(&"...").finish()
601    }
602}
603
604/// A client which is always disconnected. Handles that lose their clients
605/// connect to this client instead, which always returns a "Client Lost"
606/// transport failure.
607pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
608    Arc::new(Client(Mutex::new(ClientInner {
609        transport: Transport::Error(InnerError::Transport(None)),
610        transactions: HashMap::new(),
611        channel_read_states: HashMap::new(),
612        socket_read_states: HashMap::new(),
613        next_tx_id: 1,
614        waiting_to_close: Vec::new(),
615        waiting_to_close_waker: std::task::Waker::noop().clone(),
616    })))
617});
618
619impl Client {
620    /// Create a new FDomain client. The `transport` argument should contain the
621    /// established connection to the target, ready to communicate the FDomain
622    /// protocol.
623    ///
624    /// The second return item is a future that must be polled to keep
625    /// transactions running.
626    pub fn new(
627        transport: impl FDomainTransport + 'static,
628    ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
629        let ret = Arc::new(Client(Mutex::new(ClientInner {
630            transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
631            transactions: HashMap::new(),
632            socket_read_states: HashMap::new(),
633            channel_read_states: HashMap::new(),
634            next_tx_id: 1,
635            waiting_to_close: Vec::new(),
636            waiting_to_close_waker: std::task::Waker::noop().clone(),
637        })));
638
639        let client_weak = Arc::downgrade(&ret);
640        let fut = futures::future::poll_fn(move |ctx| {
641            let Some(client) = client_weak.upgrade() else {
642                return Poll::Ready(());
643            };
644
645            client.0.lock().poll_transport(ctx)
646        });
647
648        (ret, fut)
649    }
650
651    /// Get the namespace for the connected FDomain. Calling this more than once is an error.
652    pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
653        let new_handle = self.new_hid();
654        self.transaction(
655            ordinals::GET_NAMESPACE,
656            proto::FDomainGetNamespaceRequest { new_handle },
657            Responder::Namespace,
658        )
659        .await?;
660        Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
661    }
662
663    /// Create a new channel in the connected FDomain.
664    pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
665        let id_a = self.new_hid();
666        let id_b = self.new_hid();
667        let fut = self.transaction(
668            ordinals::CREATE_CHANNEL,
669            proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
670            Responder::CreateChannel,
671        );
672
673        fuchsia_async::Task::spawn(async move {
674            if let Err(e) = fut.await {
675                log::debug!("FDomain channel creation failed: {e}");
676            }
677        })
678        .detach();
679
680        (
681            Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
682            Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
683        )
684    }
685
686    /// Creates client and server endpoints connected to by a channel.
687    pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
688        self: &Arc<Self>,
689    ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
690        let (client, server) = self.create_channel();
691        let client_end = crate::fidl::ClientEnd::<F>::new(client);
692        let server_end = crate::fidl::ServerEnd::new(server);
693        (client_end, server_end)
694    }
695
696    /// Creates a client proxy and a server endpoint connected by a channel.
697    pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
698        self: &Arc<Self>,
699    ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
700        let (client_end, server_end) = self.create_endpoints::<F>();
701        (client_end.into_proxy(), server_end)
702    }
703
704    /// Creates a client proxy and a server request stream connected by a channel.
705    pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
706        self: &Arc<Self>,
707    ) -> (F::Proxy, F::RequestStream) {
708        let (client_end, server_end) = self.create_endpoints::<F>();
709        (client_end.into_proxy(), server_end.into_stream())
710    }
711
712    /// Creates a client end and a server request stream connected by a channel.
713    pub fn create_request_stream<F: crate::fidl::ProtocolMarker>(
714        self: &Arc<Self>,
715    ) -> (crate::fidl::ClientEnd<F>, F::RequestStream) {
716        let (client_end, server_end) = self.create_endpoints::<F>();
717        (client_end, server_end.into_stream())
718    }
719
720    /// Create a new socket in the connected FDomain.
721    fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
722        let id_a = self.new_hid();
723        let id_b = self.new_hid();
724        let fut = self.transaction(
725            ordinals::CREATE_SOCKET,
726            proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
727            Responder::CreateSocket,
728        );
729
730        fuchsia_async::Task::spawn(async move {
731            if let Err(e) = fut.await {
732                log::debug!("FDomain socket creation failed: {e}");
733            }
734        })
735        .detach();
736
737        (
738            Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
739            Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
740        )
741    }
742
743    /// Create a new streaming socket in the connected FDomain.
744    pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
745        self.create_socket(proto::SocketType::Stream)
746    }
747
748    /// Create a new datagram socket in the connected FDomain.
749    pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
750        self.create_socket(proto::SocketType::Datagram)
751    }
752
753    /// Create a new event pair in the connected FDomain.
754    pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
755        let id_a = self.new_hid();
756        let id_b = self.new_hid();
757        let fut = self.transaction(
758            ordinals::CREATE_EVENT_PAIR,
759            proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
760            Responder::CreateEventPair,
761        );
762
763        fuchsia_async::Task::spawn(async move {
764            if let Err(e) = fut.await {
765                log::debug!("FDomain event pair creation failed: {e}");
766            }
767        })
768        .detach();
769
770        (
771            EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
772            EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
773        )
774    }
775
776    /// Create a new event handle in the connected FDomain.
777    pub fn create_event(self: &Arc<Self>) -> Event {
778        let id = self.new_hid();
779        let fut = self.transaction(
780            ordinals::CREATE_EVENT,
781            proto::EventCreateEventRequest { handle: id },
782            Responder::CreateEvent,
783        );
784
785        fuchsia_async::Task::spawn(async move {
786            if let Err(e) = fut.await {
787                log::debug!("FDomain event creation failed: {e}");
788            }
789        })
790        .detach();
791
792        Event(Handle { id: id.id, client: Arc::downgrade(self) })
793    }
794
795    /// Allocate a new HID, which should be suitable for use with the connected FDomain.
796    pub(crate) fn new_hid(&self) -> proto::NewHandleId {
797        // TODO: On the target side we have to keep a table of these which means
798        // we can automatically detect collisions in the random value. On the
799        // client side we'd have to add a whole data structure just for that
800        // purpose. Should we?
801        proto::NewHandleId { id: rand::random::<u32>() >> 1 }
802    }
803
804    /// Create a future which sends a FIDL message to the connected FDomain and
805    /// waits for a response.
806    ///
807    /// Calling this method queues the transaction synchronously. Awaiting is
808    /// only necessary to wait for the response.
809    pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
810        self: &Arc<Self>,
811        ordinal: u64,
812        request: S,
813        f: F,
814    ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
815    where
816        F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
817    {
818        let mut inner = self.0.lock();
819
820        let (sender, receiver) = futures::channel::oneshot::channel();
821        inner.request(ordinal, request, f(sender));
822        receiver.map(|x| x.expect("Oneshot went away without reply!"))
823    }
824
825    /// Start getting streaming events for socket reads.
826    pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
827        let mut inner = self.0.lock();
828        if let Some(e) = inner.transport.error() {
829            return Err(e.into());
830        }
831
832        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
833            wakers: Vec::new(),
834            queued: VecDeque::new(),
835            is_streaming: false,
836            read_request_pending: false,
837        });
838
839        assert!(!state.is_streaming, "Initiated streaming twice!");
840        state.is_streaming = true;
841
842        inner.request(
843            ordinals::READ_SOCKET_STREAMING_START,
844            proto::SocketReadSocketStreamingStartRequest { handle: id },
845            Responder::Ignore,
846        );
847        Ok(())
848    }
849
850    /// Stop getting streaming events for socket reads. Doesn't return errors
851    /// because it's exclusively called in destructors where we have nothing to
852    /// do with them.
853    pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
854        let mut inner = self.0.lock();
855        if let Some(state) = inner.socket_read_states.get_mut(&id) {
856            if state.is_streaming {
857                state.is_streaming = false;
858                // TODO: Log?
859                let _ = inner.request(
860                    ordinals::READ_SOCKET_STREAMING_STOP,
861                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
862                    Responder::Ignore,
863                );
864            }
865        }
866    }
867
868    /// Start getting streaming events for socket reads.
869    pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
870        let mut inner = self.0.lock();
871        if let Some(e) = inner.transport.error() {
872            return Err(e.into());
873        }
874        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
875            wakers: Vec::new(),
876            queued: VecDeque::new(),
877            is_streaming: false,
878            read_request_pending: false,
879        });
880
881        assert!(!state.is_streaming, "Initiated streaming twice!");
882        state.is_streaming = true;
883
884        inner.request(
885            ordinals::READ_CHANNEL_STREAMING_START,
886            proto::ChannelReadChannelStreamingStartRequest { handle: id },
887            Responder::Ignore,
888        );
889
890        Ok(())
891    }
892
893    /// Stop getting streaming events for socket reads. Doesn't return errors
894    /// because it's exclusively called in destructors where we have nothing to
895    /// do with them.
896    pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
897        let mut inner = self.0.lock();
898        if let Some(state) = inner.channel_read_states.get_mut(&id) {
899            if state.is_streaming {
900                state.is_streaming = false;
901                // TODO: Log?
902                let _ = inner.request(
903                    ordinals::READ_CHANNEL_STREAMING_STOP,
904                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
905                    Responder::Ignore,
906                );
907            }
908        }
909    }
910
911    /// Execute a read from a channel.
912    pub(crate) fn poll_socket(
913        &self,
914        id: proto::HandleId,
915        ctx: &mut Context<'_>,
916        out: &mut [u8],
917    ) -> Poll<Result<usize, Error>> {
918        let mut inner = self.0.lock();
919        if let Some(error) = inner.transport.error() {
920            return Poll::Ready(Err(error.into()));
921        }
922
923        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
924            wakers: Vec::new(),
925            queued: VecDeque::new(),
926            is_streaming: false,
927            read_request_pending: false,
928        });
929
930        if let Some(got) = state.queued.front_mut() {
931            match got.as_mut() {
932                Ok(data) => {
933                    let read_size = std::cmp::min(data.data.len(), out.len());
934                    out[..read_size].copy_from_slice(&data.data[..read_size]);
935
936                    if data.data.len() > read_size && !data.is_datagram {
937                        let _ = data.data.drain(..read_size);
938                    } else {
939                        let _ = state.queued.pop_front();
940                    }
941
942                    return Poll::Ready(Ok(read_size));
943                }
944                Err(_) => {
945                    let err = state.queued.pop_front().unwrap().unwrap_err();
946                    return Poll::Ready(Err(err));
947                }
948            }
949        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
950            state.wakers.push(ctx.waker().clone());
951        }
952
953        if !state.read_request_pending && !state.is_streaming {
954            inner.request(
955                ordinals::READ_SOCKET,
956                proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
957                Responder::ReadSocket(id),
958            );
959        }
960
961        Poll::Pending
962    }
963
964    /// Execute a read from a channel.
965    pub(crate) fn poll_channel(
966        &self,
967        id: proto::HandleId,
968        ctx: &mut Context<'_>,
969        for_stream: bool,
970    ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
971        let mut inner = self.0.lock();
972        if let Some(error) = inner.transport.error() {
973            return Poll::Ready(Some(Err(error.into())));
974        }
975
976        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
977            wakers: Vec::new(),
978            queued: VecDeque::new(),
979            is_streaming: false,
980            read_request_pending: false,
981        });
982
983        if let Some(got) = state.queued.pop_front() {
984            return Poll::Ready(Some(got));
985        } else if for_stream && !state.is_streaming {
986            return Poll::Ready(None);
987        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
988            state.wakers.push(ctx.waker().clone());
989        }
990
991        if !state.read_request_pending && !state.is_streaming {
992            inner.request(
993                ordinals::READ_CHANNEL,
994                proto::ChannelReadChannelRequest { handle: id },
995                Responder::ReadChannel(id),
996            );
997        }
998
999        Poll::Pending
1000    }
1001
1002    /// Check whether this channel is streaming
1003    pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
1004        let inner = self.0.lock();
1005        let Some(state) = inner.channel_read_states.get(&id) else {
1006            return false;
1007        };
1008        state.is_streaming
1009    }
1010
1011    /// Check that all the given handles are safe to transfer through a channel
1012    /// e.g. that there's no chance of in-flight reads getting dropped.
1013    pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
1014        let inner = self.0.lock();
1015        match handles {
1016            proto::Handles::Handles(handles) => {
1017                for handle in handles {
1018                    assert!(
1019                        !(inner.channel_read_states.contains_key(handle)
1020                            || inner.socket_read_states.contains_key(handle)),
1021                        "Tried to transfer handle after reading"
1022                    );
1023                }
1024            }
1025            proto::Handles::Dispositions(dispositions) => {
1026                for disposition in dispositions {
1027                    match &disposition.handle {
1028                        proto::HandleOp::Move_(handle) => assert!(
1029                            !(inner.channel_read_states.contains_key(handle)
1030                                || inner.socket_read_states.contains_key(handle)),
1031                            "Tried to transfer handle after reading"
1032                        ),
1033                        // Pretty sure this should be fine regardless of read state.
1034                        proto::HandleOp::Duplicate(_) => (),
1035                    }
1036                }
1037            }
1038        }
1039    }
1040}