Skip to main content

fdomain_client/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_fdomain as proto;
6use fidl_message::TransactionHeader;
7use fuchsia_async as _;
8use fuchsia_sync::Mutex;
9use futures::FutureExt;
10use futures::channel::oneshot::Sender as OneshotSender;
11use futures::stream::Stream as StreamTrait;
12use std::collections::{HashMap, VecDeque};
13use std::convert::Infallible;
14use std::future::Future;
15use std::num::NonZeroU32;
16use std::pin::Pin;
17use std::sync::{Arc, LazyLock, Weak};
18use std::task::{Context, Poll, Waker, ready};
19
20mod channel;
21mod event;
22mod event_pair;
23mod handle;
24mod responder;
25mod socket;
26
27#[cfg(test)]
28mod test;
29
30pub mod fidl;
31pub mod fidl_next;
32
33use responder::Responder;
34
35pub use channel::{
36    AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
37};
38pub use event::Event;
39pub use event_pair::Eventpair as EventPair;
40pub use handle::unowned::Unowned;
41pub use handle::{
42    AsHandleRef, Handle, HandleBased, HandleRef, NullableHandle, OnFDomainSignals, Peered,
43};
44pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
45pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
46
47// Unsupported handle types.
48#[rustfmt::skip]
49pub use Handle as Clock;
50#[rustfmt::skip]
51pub use Handle as Exception;
52#[rustfmt::skip]
53pub use Handle as Fifo;
54#[rustfmt::skip]
55pub use Handle as Iob;
56#[rustfmt::skip]
57pub use Handle as Job;
58#[rustfmt::skip]
59pub use Handle as Process;
60#[rustfmt::skip]
61pub use Handle as Resource;
62#[rustfmt::skip]
63pub use Handle as Stream;
64#[rustfmt::skip]
65pub use Handle as Thread;
66#[rustfmt::skip]
67pub use Handle as Vmar;
68#[rustfmt::skip]
69pub use Handle as Vmo;
70#[rustfmt::skip]
71pub use Handle as Counter;
72
73use proto::f_domain_ordinals as ordinals;
74
75fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76    match error {
77        FDomainError::TargetError(e) => {
78            let e = zx_status::Status::from_raw(*e);
79            write!(f, "Target-side error {e}")
80        }
81        FDomainError::BadHandleId(proto::BadHandleId { id }) => {
82            write!(f, "Tried to use invalid handle id {id}")
83        }
84        FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
85            f,
86            "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
87        ),
88        FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
89            write!(f, "Handle is occupied delivering streaming reads")
90        }
91        FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
92            write!(f, "No streaming read was in progress")
93        }
94        FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
95            write!(
96                f,
97                "Tried to create a handle with id {id}, which is outside the valid range for client handles"
98            )
99        }
100        FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
101            if *same_call {
102                write!(f, "Tried to create two or more new handles with the same id {id}")
103            } else {
104                write!(
105                    f,
106                    "Tried to create a new handle with id {id}, which is already the id of an existing handle"
107                )
108            }
109        }
110        FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
111            write!(f, "Tried to write a channel into itself")
112        }
113        FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
114            write!(f, "Handle closed while being read")
115        }
116        _ => todo!(),
117    }
118}
119
120/// Result type alias.
121pub type Result<T, E = Error> = std::result::Result<T, E>;
122
123/// Error type emitted by FDomain operations.
124#[derive(Clone)]
125pub enum Error {
126    SocketWrite(WriteSocketError),
127    ChannelWrite(WriteChannelError),
128    FDomain(FDomainError),
129    Protocol(::fidl::Error),
130    ProtocolObjectTypeIncompatible,
131    ProtocolRightsIncompatible,
132    ProtocolSignalsIncompatible,
133    ProtocolStreamEventIncompatible,
134    Transport(Option<Arc<std::io::Error>>),
135    ConnectionMismatch,
136    StreamingAborted,
137}
138
139impl std::fmt::Display for Error {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        match self {
142            Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
143                write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
144                write_fdomain_error(error, f)
145            }
146            Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
147                write!(f, "While writing channel: ")?;
148                write_fdomain_error(error, f)
149            }
150            Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
151                write!(f, "Couldn't write all handles into a channel:")?;
152                for (pos, error) in
153                    errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
154                {
155                    write!(f, "\n  Handle in position {pos}: ")?;
156                    write_fdomain_error(error, f)?;
157                }
158                Ok(())
159            }
160            Self::ProtocolObjectTypeIncompatible => {
161                write!(
162                    f,
163                    "The FDomain protocol received an unrecognized or incompatible object type"
164                )
165            }
166            Self::ProtocolRightsIncompatible => {
167                write!(
168                    f,
169                    "The FDomain protocol received unrecognized or incompatible handle rights"
170                )
171            }
172            Self::ProtocolSignalsIncompatible => {
173                write!(f, "The FDomain protocol received unrecognized or incompatible signals")
174            }
175            Self::ProtocolStreamEventIncompatible => {
176                write!(
177                    f,
178                    "The FDomain protocol received an unrecognized or incompatible streaming IO event"
179                )
180            }
181            Self::FDomain(e) => write_fdomain_error(e, f),
182            Self::Protocol(e) => write!(f, "Protocol error: {e}"),
183            Self::Transport(Some(e)) => write!(f, "Transport error: {e}"),
184            Self::Transport(None) => write!(f, "Connection to the device has been lost"),
185            Self::ConnectionMismatch => {
186                write!(
187                    f,
188                    "Tried to use an FDomain handle with a different connection than the one it was created on"
189                )
190            }
191            Self::StreamingAborted => write!(f, "Streaming on this channel has been aborted"),
192        }
193    }
194}
195
196impl std::fmt::Debug for Error {
197    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198        match self {
199            Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
200            Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
201            Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
202            Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
203            Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
204            Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
205            Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
206            Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
207            Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
208            Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
209            Self::StreamingAborted => write!(f, "StreamingAborted"),
210        }
211    }
212}
213
214impl std::error::Error for Error {}
215
216impl From<FDomainError> for Error {
217    fn from(other: FDomainError) -> Self {
218        Self::FDomain(other)
219    }
220}
221
222impl From<::fidl::Error> for Error {
223    fn from(other: ::fidl::Error) -> Self {
224        Self::Protocol(other)
225    }
226}
227
228impl From<WriteSocketError> for Error {
229    fn from(other: WriteSocketError) -> Self {
230        Self::SocketWrite(other)
231    }
232}
233
234impl From<WriteChannelError> for Error {
235    fn from(other: WriteChannelError) -> Self {
236        Self::ChannelWrite(other)
237    }
238}
239
240/// An error emitted internally by the client. Similar to [`Error`] but does not
241/// contain several variants which are irrelevant in the contexts where it is
242/// used.
243#[derive(Clone)]
244enum InnerError {
245    Protocol(::fidl::Error),
246    ProtocolStreamEventIncompatible,
247    Transport(Option<Arc<std::io::Error>>),
248}
249
250impl From<InnerError> for Error {
251    fn from(other: InnerError) -> Self {
252        match other {
253            InnerError::Protocol(p) => Error::Protocol(p),
254            InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
255            InnerError::Transport(t) => Error::Transport(t),
256        }
257    }
258}
259
260impl From<::fidl::Error> for InnerError {
261    fn from(other: ::fidl::Error) -> Self {
262        InnerError::Protocol(other)
263    }
264}
265
266// TODO(399717689) Figure out if we could just use AsyncRead/Write instead of a special trait.
267/// Implemented by objects which provide a transport over which we can speak the
268/// FDomain protocol.
269///
270/// The implementer must provide two things:
271/// 1) An incoming stream of messages presented as `Vec<u8>`. This is provided
272///    via the `Stream` trait, which this trait requires.
273/// 2) A way to send messages. This is provided by implementing the
274///    `poll_send_message` method.
275pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
276    /// Attempt to send a message asynchronously. Messages should be sent so
277    /// that they arrive at the target in order.
278    fn poll_send_message(
279        self: Pin<&mut Self>,
280        msg: &[u8],
281        ctx: &mut Context<'_>,
282    ) -> Poll<Result<(), Option<std::io::Error>>>;
283
284    /// Optional debug information outlet.
285    fn debug_fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286        Ok(())
287    }
288
289    /// Whether `debug_fmt` does anything.
290    fn has_debug_fmt(&self) -> bool {
291        false
292    }
293}
294
295/// Wrapper for an `FDomainTransport` implementer that:
296/// 1) Provides a queue for outgoing messages so we need not have an await point
297///    when we submit a message.
298/// 2) Drops the transport on error, then returns the last observed error for
299///    all future operations.
300enum Transport {
301    Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
302    Error(InnerError),
303}
304
305impl Transport {
306    /// Get the failure mode of the transport if it has failed.
307    fn error(&self) -> Option<InnerError> {
308        match self {
309            Transport::Transport(_, _, _) => None,
310            Transport::Error(inner_error) => Some(inner_error.clone()),
311        }
312    }
313
314    /// Enqueue a message to be sent on this transport.
315    fn push_msg(&mut self, msg: Box<[u8]>) -> Result<(), InnerError> {
316        match self {
317            Transport::Transport(_, v, w) => {
318                v.push_back(msg);
319                w.drain(..).for_each(Waker::wake);
320                Ok(())
321            }
322            Transport::Error(e) => Err(e.clone()),
323        }
324    }
325
326    /// Push messages in the send queue out through the transport.
327    fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
328        match self {
329            Transport::Error(e) => Poll::Ready(e.clone()),
330            Transport::Transport(t, v, w) => {
331                while let Some(msg) = v.front() {
332                    match t.as_mut().poll_send_message(msg, ctx) {
333                        Poll::Ready(Ok(())) => {
334                            v.pop_front();
335                        }
336                        Poll::Ready(Err(e)) => {
337                            let e = e.map(Arc::new);
338                            return Poll::Ready(InnerError::Transport(e));
339                        }
340                        Poll::Pending => return Poll::Pending,
341                    }
342                }
343
344                if v.is_empty() {
345                    w.push(ctx.waker().clone());
346                } else {
347                    ctx.waker().wake_by_ref();
348                }
349                Poll::Pending
350            }
351        }
352    }
353
354    /// Get the next incoming message from the transport.
355    fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Result<Box<[u8]>, InnerError>> {
356        match self {
357            Transport::Error(e) => Poll::Ready(Err(e.clone())),
358            Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
359                Some(Ok(x)) => Poll::Ready(Ok(x)),
360                Some(Err(e)) => Poll::Ready(Err(InnerError::Transport(Some(Arc::new(e))))),
361                Option::None => Poll::Ready(Err(InnerError::Transport(None))),
362            },
363        }
364    }
365}
366
367impl Drop for Transport {
368    fn drop(&mut self) {
369        if let Transport::Transport(_, _, wakers) = self {
370            wakers.drain(..).for_each(Waker::wake);
371        }
372    }
373}
374
375/// State of a socket that is or has been read from.
376struct SocketReadState {
377    wakers: Vec<Waker>,
378    queued: VecDeque<Result<proto::SocketData, Error>>,
379    read_request_pending: bool,
380    is_streaming: bool,
381}
382
383impl SocketReadState {
384    /// Handle an incoming message, which is either a channel streaming event or
385    /// response to a `ChannelRead` request.
386    fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) -> Vec<Waker> {
387        self.queued.push_back(msg);
388        std::mem::replace(&mut self.wakers, Vec::new())
389    }
390}
391
392/// State of a channel that is or has been read from.
393struct ChannelReadState {
394    wakers: Vec<Waker>,
395    queued: VecDeque<Result<proto::ChannelMessage, Error>>,
396    read_request_pending: bool,
397    is_streaming: bool,
398}
399
400impl ChannelReadState {
401    /// Handle an incoming message, which is either a channel streaming event or
402    /// response to a `ChannelRead` request.
403    fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) -> Vec<Waker> {
404        self.queued.push_back(msg);
405        std::mem::replace(&mut self.wakers, Vec::new())
406    }
407}
408
409/// Lock-protected interior of `Client`
410struct ClientInner {
411    transport: Transport,
412    transactions: HashMap<NonZeroU32, responder::Responder>,
413    channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
414    socket_read_states: HashMap<proto::HandleId, SocketReadState>,
415    next_tx_id: u32,
416    waiting_to_close: Vec<proto::HandleId>,
417    waiting_to_close_waker: Waker,
418
419    /// There is a lock around `ClientInner`, and sometimes the FIDL bindings
420    /// give us wakers that want to do handle operations synchronously on wake,
421    /// which means we can double-take the lock if we wake a waker while we hold
422    /// it. This is a place to store wakers that we'd like to be woken as soon
423    /// as we're not holding that lock, to avoid these weird reentrancy issues.
424    wakers_to_wake: Vec<Waker>,
425}
426
427impl ClientInner {
428    /// Serialize and enqueue a new transaction, including header and transaction ID.
429    fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
430        if ordinal != ordinals::CLOSE {
431            self.process_waiting_to_close();
432        }
433        let tx_id = self.next_tx_id;
434
435        let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
436        let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
437        self.next_tx_id += 1;
438        if let Err(e) = self.transport.push_msg(msg.into()) {
439            let _ = responder.handle(self, Err(e.into()));
440        } else {
441            assert!(
442                self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
443                "Allocated same tx id twice!"
444            );
445        }
446    }
447
448    fn process_waiting_to_close(&mut self) {
449        if !self.waiting_to_close.is_empty() {
450            let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
451            // We've dropped the handle object. Nobody is going to wait to read
452            // the buffers anymore. This is a safe time to drop the read state.
453            for handle in &handles {
454                let _ = self.channel_read_states.remove(handle);
455                let _ = self.socket_read_states.remove(handle);
456            }
457            self.request(
458                ordinals::CLOSE,
459                proto::FDomainCloseRequest { handles },
460                Responder::Ignore,
461            );
462        }
463    }
464
465    /// Polls the underlying transport to ensure any incoming or outgoing
466    /// messages are processed as far as possible. Errors if the transport has failed.
467    fn try_poll_transport(
468        &mut self,
469        ctx: &mut Context<'_>,
470    ) -> Poll<Result<Infallible, InnerError>> {
471        self.process_waiting_to_close();
472
473        self.waiting_to_close_waker = ctx.waker().clone();
474
475        loop {
476            if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
477                return Poll::Ready(Err(e));
478            }
479            let Poll::Ready(result) = self.transport.poll_next(ctx) else {
480                return Poll::Pending;
481            };
482            let data = result?;
483            let (header, data) = fidl_message::decode_transaction_header(&data)?;
484
485            let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
486                let wakers = self.process_event(header, data)?;
487                self.wakers_to_wake.extend(wakers);
488                continue;
489            };
490
491            let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
492            tx.handle(self, Ok((header, data)))?;
493        }
494    }
495
496    /// Process an incoming message that arose from an event rather than a transaction reply.
497    fn process_event(
498        &mut self,
499        header: TransactionHeader,
500        data: &[u8],
501    ) -> Result<Vec<Waker>, InnerError> {
502        match header.ordinal {
503            ordinals::ON_SOCKET_STREAMING_DATA => {
504                let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
505                    header, data,
506                )?;
507                let o =
508                    self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
509                        wakers: Vec::new(),
510                        queued: VecDeque::new(),
511                        is_streaming: false,
512                        read_request_pending: false,
513                    });
514                match msg.socket_message {
515                    proto::SocketMessage::Data(data) => Ok(o.handle_incoming_message(Ok(data))),
516                    proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
517                        let ret = if let Some(error) = error {
518                            o.handle_incoming_message(Err(Error::FDomain(*error)))
519                        } else {
520                            Vec::new()
521                        };
522                        o.is_streaming = false;
523                        Ok(ret)
524                    }
525                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
526                }
527            }
528            ordinals::ON_CHANNEL_STREAMING_DATA => {
529                let msg = fidl_message::decode_message::<
530                    proto::ChannelOnChannelStreamingDataRequest,
531                >(header, data)?;
532                let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
533                    ChannelReadState {
534                        wakers: Vec::new(),
535                        queued: VecDeque::new(),
536                        is_streaming: false,
537                        read_request_pending: false,
538                    }
539                });
540                match msg.channel_sent {
541                    proto::ChannelSent::Message(data) => Ok(o.handle_incoming_message(Ok(data))),
542                    proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
543                        let ret = if let Some(error) = error {
544                            o.handle_incoming_message(Err(Error::FDomain(*error)))
545                        } else {
546                            Vec::new()
547                        };
548                        o.is_streaming = false;
549                        Ok(ret)
550                    }
551                    _ => Err(InnerError::ProtocolStreamEventIncompatible),
552                }
553            }
554            _ => Err(::fidl::Error::UnknownOrdinal {
555                ordinal: header.ordinal,
556                protocol_name:
557                    <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
558            }
559            .into()),
560        }
561    }
562
563    /// Polls the underlying transport to ensure any incoming or outgoing
564    /// messages are processed as far as possible. If a failure occurs, puts the
565    /// transport into an error state and fails all pending transactions.
566    fn poll_transport(&mut self, ctx: &mut Context<'_>) -> Poll<()> {
567        if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
568            for (_, v) in std::mem::take(&mut self.transactions) {
569                let _ = v.handle(self, Err(e.clone()));
570            }
571            for mut state in std::mem::take(&mut self.socket_read_states).into_values() {
572                state.queued.push_back(Err(Error::from(e.clone())));
573                self.wakers_to_wake.extend(state.wakers);
574            }
575            for (_, mut state) in self.channel_read_states.drain() {
576                state.queued.push_back(Err(Error::from(e.clone())));
577                self.wakers_to_wake.extend(state.wakers);
578            }
579            if matches!(self.transport, Transport::Transport(_, _, _)) {
580                self.transport = Transport::Error(e);
581            }
582
583            Poll::Ready(())
584        } else {
585            Poll::Pending
586        }
587    }
588
589    /// Handles the response to a `SocketRead` protocol message.
590    pub(crate) fn handle_socket_read_response(
591        &mut self,
592        msg: Result<proto::SocketData, Error>,
593        id: proto::HandleId,
594    ) {
595        let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
596            wakers: Vec::new(),
597            queued: VecDeque::new(),
598            is_streaming: false,
599            read_request_pending: false,
600        });
601        let wakers = state.handle_incoming_message(msg);
602        self.wakers_to_wake.extend(wakers);
603        state.read_request_pending = false;
604    }
605
606    /// Handles the response to a `ChannelRead` protocol message.
607    pub(crate) fn handle_channel_read_response(
608        &mut self,
609        msg: Result<proto::ChannelMessage, Error>,
610        id: proto::HandleId,
611    ) {
612        let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
613            wakers: Vec::new(),
614            queued: VecDeque::new(),
615            is_streaming: false,
616            read_request_pending: false,
617        });
618        let wakers = state.handle_incoming_message(msg);
619        self.wakers_to_wake.extend(wakers);
620        state.read_request_pending = false;
621    }
622}
623
624impl Drop for ClientInner {
625    fn drop(&mut self) {
626        let responders = self.transactions.drain().map(|x| x.1).collect::<Vec<_>>();
627        for responder in responders {
628            let _ = responder.handle(self, Err(InnerError::Transport(None)));
629        }
630        for state in self.channel_read_states.values_mut() {
631            state.wakers.drain(..).for_each(Waker::wake);
632        }
633        for state in self.socket_read_states.values_mut() {
634            state.wakers.drain(..).for_each(Waker::wake);
635        }
636        self.waiting_to_close_waker.wake_by_ref();
637        self.wakers_to_wake.drain(..).for_each(Waker::wake);
638    }
639}
640
641/// Represents a connection to an FDomain.
642///
643/// The client is constructed by passing it a transport object which represents
644/// the raw connection to the remote FDomain. The `Client` wrapper then allows
645/// us to construct and use handles which behave similarly to their counterparts
646/// on a Fuchsia device.
647pub struct Client(pub(crate) Mutex<ClientInner>);
648
649impl std::fmt::Debug for Client {
650    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
651        let inner = self.0.lock();
652        match &inner.transport {
653            Transport::Transport(transport, ..) if transport.has_debug_fmt() => {
654                write!(f, "Client(")?;
655                transport.debug_fmt(f)?;
656                write!(f, ")")
657            }
658            Transport::Error(error) => {
659                let error = Error::from(error.clone());
660                write!(f, "Client(Failed: {error})")
661            }
662            _ => f.debug_tuple("Client").field(&"<transport>").finish(),
663        }
664    }
665}
666
667/// A client which is always disconnected. Handles that lose their clients
668/// connect to this client instead, which always returns a "Client Lost"
669/// transport failure.
670pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
671    Arc::new(Client(Mutex::new(ClientInner {
672        transport: Transport::Error(InnerError::Transport(None)),
673        transactions: HashMap::new(),
674        channel_read_states: HashMap::new(),
675        socket_read_states: HashMap::new(),
676        next_tx_id: 1,
677        waiting_to_close: Vec::new(),
678        waiting_to_close_waker: std::task::Waker::noop().clone(),
679        wakers_to_wake: Vec::new(),
680    })))
681});
682
683/// A wrapper around the FDomain client background future that ensures
684/// all pending transactions and reads are failed if the loop is dropped.
685///
686/// This prevents hangs when the transport is abruptly closed (e.g. during target reboot)
687/// by waking up any futures waiting for responses or data on channels/sockets.
688pub struct ClientLoop {
689    client: Weak<Client>,
690    fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
691}
692
693impl Future for ClientLoop {
694    type Output = ();
695    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
696        self.fut.as_mut().poll(cx)
697    }
698}
699
700impl Drop for ClientLoop {
701    fn drop(&mut self) {
702        let Some(client) = self.client.upgrade() else {
703            return;
704        };
705
706        let (channel_read_states, socket_read_states, deferred_wakers) = {
707            let mut inner = client.0.lock();
708            let transactions = std::mem::take(&mut inner.transactions);
709            log::debug!("ClientLoop dropped, failing {} transactions", transactions.len());
710            for (_, v) in transactions {
711                let _ = v.handle(&mut *inner, Err(InnerError::Transport(None)));
712            }
713
714            let channel_read_states = std::mem::take(&mut inner.channel_read_states);
715            let socket_read_states = std::mem::take(&mut inner.socket_read_states);
716
717            let deferred_wakers = std::mem::replace(&mut inner.wakers_to_wake, Vec::new());
718
719            (channel_read_states, socket_read_states, deferred_wakers)
720        };
721
722        log::debug!("Failing reads on {} channels", channel_read_states.len());
723        for (_, mut state) in channel_read_states {
724            state.queued.push_back(Err(Error::Transport(None)));
725            state.wakers.into_iter().for_each(Waker::wake);
726        }
727
728        log::debug!("Failing reads on {} sockets", socket_read_states.len());
729        for (_, mut state) in socket_read_states {
730            state.queued.push_back(Err(Error::Transport(None)));
731            state.wakers.into_iter().for_each(Waker::wake);
732        }
733
734        deferred_wakers.into_iter().for_each(Waker::wake);
735    }
736}
737
738impl Client {
739    pub fn transport_status(&self) -> Result<()> {
740        match &self.0.lock().transport {
741            Transport::Error(e) => Err(e.clone().into()),
742            Transport::Transport(_, _, _) => Ok(()),
743        }
744    }
745    /// Create a new FDomain client. The `transport` argument should contain the
746    /// established connection to the target, ready to communicate the FDomain
747    /// protocol.
748    ///
749    /// The second return item is a future that must be polled to keep
750    /// transactions running.
751    pub fn new(
752        transport: impl FDomainTransport + 'static,
753    ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
754        let ret = Arc::new(Client(Mutex::new(ClientInner {
755            transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
756            transactions: HashMap::new(),
757            socket_read_states: HashMap::new(),
758            channel_read_states: HashMap::new(),
759            next_tx_id: 1,
760            waiting_to_close: Vec::new(),
761            waiting_to_close_waker: std::task::Waker::noop().clone(),
762            wakers_to_wake: Vec::new(),
763        })));
764
765        let client_weak = Arc::downgrade(&ret);
766        let fut = futures::future::poll_fn(move |ctx| {
767            let Some(client) = client_weak.upgrade() else {
768                return Poll::Ready(());
769            };
770
771            let (ret, deferred_wakers) = {
772                let mut inner = client.0.lock();
773                let ret = inner.poll_transport(ctx);
774                let deferred_wakers = std::mem::replace(&mut inner.wakers_to_wake, Vec::new());
775                (ret, deferred_wakers)
776            };
777            deferred_wakers.into_iter().for_each(Waker::wake);
778            ret
779        });
780
781        let client_loop = ClientLoop { client: Arc::downgrade(&ret), fut: Box::pin(fut) };
782
783        (ret, client_loop)
784    }
785
786    /// Get the namespace for the connected FDomain. Calling this more than once is an error.
787    pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
788        let new_handle = self.new_hid();
789        self.transaction(
790            ordinals::GET_NAMESPACE,
791            proto::FDomainGetNamespaceRequest { new_handle },
792            Responder::Namespace,
793        )
794        .await?;
795        Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
796    }
797
798    /// Create a new channel in the connected FDomain.
799    pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
800        let id_a = self.new_hid();
801        let id_b = self.new_hid();
802        let fut = self.transaction(
803            ordinals::CREATE_CHANNEL,
804            proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
805            Responder::CreateChannel,
806        );
807
808        fuchsia_async::Task::spawn(async move {
809            if let Err(e) = fut.await {
810                log::debug!("FDomain channel creation failed: {e}");
811            }
812        })
813        .detach();
814
815        (
816            Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
817            Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
818        )
819    }
820
821    /// Creates client and server endpoints connected to by a channel.
822    pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
823        self: &Arc<Self>,
824    ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
825        let (client, server) = self.create_channel();
826        let client_end = crate::fidl::ClientEnd::<F>::new(client);
827        let server_end = crate::fidl::ServerEnd::new(server);
828        (client_end, server_end)
829    }
830
831    /// Creates a client proxy and a server endpoint connected by a channel.
832    pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
833        self: &Arc<Self>,
834    ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
835        let (client_end, server_end) = self.create_endpoints::<F>();
836        (client_end.into_proxy(), server_end)
837    }
838
839    /// Creates a client proxy and a server request stream connected by a channel.
840    pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
841        self: &Arc<Self>,
842    ) -> (F::Proxy, F::RequestStream) {
843        let (client_end, server_end) = self.create_endpoints::<F>();
844        (client_end.into_proxy(), server_end.into_stream())
845    }
846
847    /// Creates a client end and a server request stream connected by a channel.
848    pub fn create_request_stream<F: crate::fidl::ProtocolMarker>(
849        self: &Arc<Self>,
850    ) -> (crate::fidl::ClientEnd<F>, F::RequestStream) {
851        let (client_end, server_end) = self.create_endpoints::<F>();
852        (client_end, server_end.into_stream())
853    }
854
855    /// Create a new socket in the connected FDomain.
856    fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
857        let id_a = self.new_hid();
858        let id_b = self.new_hid();
859        let fut = self.transaction(
860            ordinals::CREATE_SOCKET,
861            proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
862            Responder::CreateSocket,
863        );
864
865        fuchsia_async::Task::spawn(async move {
866            if let Err(e) = fut.await {
867                log::debug!("FDomain socket creation failed: {e}");
868            }
869        })
870        .detach();
871
872        (
873            Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
874            Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
875        )
876    }
877
878    /// Create a new streaming socket in the connected FDomain.
879    pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
880        self.create_socket(proto::SocketType::Stream)
881    }
882
883    /// Create a new datagram socket in the connected FDomain.
884    pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
885        self.create_socket(proto::SocketType::Datagram)
886    }
887
888    /// Create a new event pair in the connected FDomain.
889    pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
890        let id_a = self.new_hid();
891        let id_b = self.new_hid();
892        let fut = self.transaction(
893            ordinals::CREATE_EVENT_PAIR,
894            proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
895            Responder::CreateEventPair,
896        );
897
898        fuchsia_async::Task::spawn(async move {
899            if let Err(e) = fut.await {
900                log::debug!("FDomain event pair creation failed: {e}");
901            }
902        })
903        .detach();
904
905        (
906            EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
907            EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
908        )
909    }
910
911    /// Create a new event handle in the connected FDomain.
912    pub fn create_event(self: &Arc<Self>) -> Event {
913        let id = self.new_hid();
914        let fut = self.transaction(
915            ordinals::CREATE_EVENT,
916            proto::EventCreateEventRequest { handle: id },
917            Responder::CreateEvent,
918        );
919
920        fuchsia_async::Task::spawn(async move {
921            if let Err(e) = fut.await {
922                log::debug!("FDomain event creation failed: {e}");
923            }
924        })
925        .detach();
926
927        Event(Handle { id: id.id, client: Arc::downgrade(self) })
928    }
929
930    /// Allocate a new HID, which should be suitable for use with the connected FDomain.
931    pub(crate) fn new_hid(&self) -> proto::NewHandleId {
932        // TODO: On the target side we have to keep a table of these which means
933        // we can automatically detect collisions in the random value. On the
934        // client side we'd have to add a whole data structure just for that
935        // purpose. Should we?
936        proto::NewHandleId { id: rand::random::<u32>() >> 1 }
937    }
938
939    /// Create a future which sends a FIDL message to the connected FDomain and
940    /// waits for a response.
941    ///
942    /// Calling this method queues the transaction synchronously. Awaiting is
943    /// only necessary to wait for the response.
944    pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
945        self: &Arc<Self>,
946        ordinal: u64,
947        request: S,
948        f: F,
949    ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
950    where
951        F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
952    {
953        let mut inner = self.0.lock();
954
955        let (sender, receiver) = futures::channel::oneshot::channel();
956        inner.request(ordinal, request, f(sender));
957        receiver.map(|x| x.expect("Oneshot went away without reply!"))
958    }
959
960    /// Start getting streaming events for socket reads.
961    pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
962        let mut inner = self.0.lock();
963        if let Some(e) = inner.transport.error() {
964            return Err(e.into());
965        }
966
967        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
968            wakers: Vec::new(),
969            queued: VecDeque::new(),
970            is_streaming: false,
971            read_request_pending: false,
972        });
973
974        assert!(!state.is_streaming, "Initiated streaming twice!");
975        state.is_streaming = true;
976
977        inner.request(
978            ordinals::READ_SOCKET_STREAMING_START,
979            proto::SocketReadSocketStreamingStartRequest { handle: id },
980            Responder::Ignore,
981        );
982        Ok(())
983    }
984
985    /// Stop getting streaming events for socket reads. Doesn't return errors
986    /// because it's exclusively called in destructors where we have nothing to
987    /// do with them.
988    pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
989        let mut inner = self.0.lock();
990        if let Some(state) = inner.socket_read_states.get_mut(&id) {
991            if state.is_streaming {
992                state.is_streaming = false;
993                // TODO: Log?
994                let _ = inner.request(
995                    ordinals::READ_SOCKET_STREAMING_STOP,
996                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
997                    Responder::Ignore,
998                );
999            }
1000        }
1001    }
1002
1003    /// Start getting streaming events for socket reads.
1004    pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
1005        let mut inner = self.0.lock();
1006        if let Some(e) = inner.transport.error() {
1007            return Err(e.into());
1008        }
1009        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
1010            wakers: Vec::new(),
1011            queued: VecDeque::new(),
1012            is_streaming: false,
1013            read_request_pending: false,
1014        });
1015
1016        assert!(!state.is_streaming, "Initiated streaming twice!");
1017        state.is_streaming = true;
1018
1019        inner.request(
1020            ordinals::READ_CHANNEL_STREAMING_START,
1021            proto::ChannelReadChannelStreamingStartRequest { handle: id },
1022            Responder::Ignore,
1023        );
1024
1025        Ok(())
1026    }
1027
1028    /// Stop getting streaming events for socket reads. Doesn't return errors
1029    /// because it's exclusively called in destructors where we have nothing to
1030    /// do with them.
1031    pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
1032        let mut inner = self.0.lock();
1033        if let Some(state) = inner.channel_read_states.get_mut(&id) {
1034            if state.is_streaming {
1035                state.is_streaming = false;
1036                // TODO: Log?
1037                let _ = inner.request(
1038                    ordinals::READ_CHANNEL_STREAMING_STOP,
1039                    proto::ChannelReadChannelStreamingStopRequest { handle: id },
1040                    Responder::Ignore,
1041                );
1042            }
1043        }
1044    }
1045
1046    /// Execute a read from a channel.
1047    pub(crate) fn poll_socket(
1048        &self,
1049        id: proto::HandleId,
1050        ctx: &mut Context<'_>,
1051        out: &mut [u8],
1052    ) -> Poll<Result<usize, Error>> {
1053        let mut inner = self.0.lock();
1054        if let Some(error) = inner.transport.error() {
1055            return Poll::Ready(Err(error.into()));
1056        }
1057
1058        let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
1059            wakers: Vec::new(),
1060            queued: VecDeque::new(),
1061            is_streaming: false,
1062            read_request_pending: false,
1063        });
1064
1065        if let Some(got) = state.queued.front_mut() {
1066            match got.as_mut() {
1067                Ok(data) => {
1068                    let read_size = std::cmp::min(data.data.len(), out.len());
1069                    out[..read_size].copy_from_slice(&data.data[..read_size]);
1070
1071                    if data.data.len() > read_size && !data.is_datagram {
1072                        let _ = data.data.drain(..read_size);
1073                    } else {
1074                        let _ = state.queued.pop_front();
1075                    }
1076
1077                    return Poll::Ready(Ok(read_size));
1078                }
1079                Err(_) => {
1080                    let err = state.queued.pop_front().unwrap().unwrap_err();
1081                    return Poll::Ready(Err(err));
1082                }
1083            }
1084        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
1085            state.wakers.push(ctx.waker().clone());
1086        }
1087
1088        if !state.read_request_pending && !state.is_streaming {
1089            inner.request(
1090                ordinals::READ_SOCKET,
1091                proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
1092                Responder::ReadSocket(id),
1093            );
1094        }
1095
1096        Poll::Pending
1097    }
1098
1099    /// Execute a read from a channel.
1100    pub(crate) fn poll_channel(
1101        &self,
1102        id: proto::HandleId,
1103        ctx: &mut Context<'_>,
1104        for_stream: bool,
1105    ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
1106        let mut inner = self.0.lock();
1107        if let Some(error) = inner.transport.error() {
1108            return Poll::Ready(Some(Err(error.into())));
1109        }
1110
1111        let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
1112            wakers: Vec::new(),
1113            queued: VecDeque::new(),
1114            is_streaming: false,
1115            read_request_pending: false,
1116        });
1117
1118        if let Some(got) = state.queued.pop_front() {
1119            return Poll::Ready(Some(got));
1120        } else if for_stream && !state.is_streaming {
1121            return Poll::Ready(None);
1122        } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
1123            state.wakers.push(ctx.waker().clone());
1124        }
1125
1126        if !state.read_request_pending && !state.is_streaming {
1127            inner.request(
1128                ordinals::READ_CHANNEL,
1129                proto::ChannelReadChannelRequest { handle: id },
1130                Responder::ReadChannel(id),
1131            );
1132        }
1133
1134        Poll::Pending
1135    }
1136
1137    /// Check whether this channel is streaming
1138    pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
1139        let inner = self.0.lock();
1140        let Some(state) = inner.channel_read_states.get(&id) else {
1141            return false;
1142        };
1143        state.is_streaming
1144    }
1145
1146    /// Check that all the given handles are safe to transfer through a channel
1147    /// e.g. that there's no chance of in-flight reads getting dropped.
1148    pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
1149        let inner = self.0.lock();
1150        match handles {
1151            proto::Handles::Handles(handles) => {
1152                for handle in handles {
1153                    assert!(
1154                        !(inner.channel_read_states.contains_key(handle)
1155                            || inner.socket_read_states.contains_key(handle)),
1156                        "Tried to transfer handle after reading"
1157                    );
1158                }
1159            }
1160            proto::Handles::Dispositions(dispositions) => {
1161                for disposition in dispositions {
1162                    match &disposition.handle {
1163                        proto::HandleOp::Move_(handle) => assert!(
1164                            !(inner.channel_read_states.contains_key(handle)
1165                                || inner.socket_read_states.contains_key(handle)),
1166                            "Tried to transfer handle after reading"
1167                        ),
1168                        // Pretty sure this should be fine regardless of read state.
1169                        proto::HandleOp::Duplicate(_) => (),
1170                    }
1171                }
1172            }
1173        }
1174    }
1175}