1use fidl_fuchsia_fdomain as proto;
6use fidl_message::TransactionHeader;
7use fuchsia_async as _;
8use fuchsia_sync::Mutex;
9use futures::FutureExt;
10use futures::channel::oneshot::Sender as OneshotSender;
11use futures::stream::Stream as StreamTrait;
12use std::collections::{HashMap, VecDeque};
13use std::convert::Infallible;
14use std::future::Future;
15use std::num::NonZeroU32;
16use std::pin::Pin;
17use std::sync::{Arc, LazyLock, Weak};
18use std::task::{Context, Poll, Waker, ready};
19
20mod channel;
21mod event;
22mod event_pair;
23mod handle;
24mod responder;
25mod socket;
26
27#[cfg(test)]
28mod test;
29
30pub mod fidl;
31pub mod fidl_next;
32
33use responder::Responder;
34
35pub use channel::{
36 AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
37};
38pub use event::Event;
39pub use event_pair::Eventpair as EventPair;
40pub use handle::unowned::Unowned;
41pub use handle::{
42 AsHandleRef, Handle, HandleBased, HandleRef, NullableHandle, OnFDomainSignals, Peered,
43};
44pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
45pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
46
47#[rustfmt::skip]
49pub use Handle as Clock;
50#[rustfmt::skip]
51pub use Handle as Exception;
52#[rustfmt::skip]
53pub use Handle as Fifo;
54#[rustfmt::skip]
55pub use Handle as Iob;
56#[rustfmt::skip]
57pub use Handle as Job;
58#[rustfmt::skip]
59pub use Handle as Process;
60#[rustfmt::skip]
61pub use Handle as Resource;
62#[rustfmt::skip]
63pub use Handle as Stream;
64#[rustfmt::skip]
65pub use Handle as Thread;
66#[rustfmt::skip]
67pub use Handle as Vmar;
68#[rustfmt::skip]
69pub use Handle as Vmo;
70#[rustfmt::skip]
71pub use Handle as Counter;
72
73use proto::f_domain_ordinals as ordinals;
74
75fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 match error {
77 FDomainError::TargetError(e) => {
78 let e = zx_status::Status::from_raw(*e);
79 write!(f, "Target-side error {e}")
80 }
81 FDomainError::BadHandleId(proto::BadHandleId { id }) => {
82 write!(f, "Tried to use invalid handle id {id}")
83 }
84 FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
85 f,
86 "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
87 ),
88 FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
89 write!(f, "Handle is occupied delivering streaming reads")
90 }
91 FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
92 write!(f, "No streaming read was in progress")
93 }
94 FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
95 write!(
96 f,
97 "Tried to create a handle with id {id}, which is outside the valid range for client handles"
98 )
99 }
100 FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
101 if *same_call {
102 write!(f, "Tried to create two or more new handles with the same id {id}")
103 } else {
104 write!(
105 f,
106 "Tried to create a new handle with id {id}, which is already the id of an existing handle"
107 )
108 }
109 }
110 FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
111 write!(f, "Tried to write a channel into itself")
112 }
113 FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
114 write!(f, "Handle closed while being read")
115 }
116 _ => todo!(),
117 }
118}
119
120pub type Result<T, E = Error> = std::result::Result<T, E>;
122
123#[derive(Clone)]
125pub enum Error {
126 SocketWrite(WriteSocketError),
127 ChannelWrite(WriteChannelError),
128 FDomain(FDomainError),
129 Protocol(::fidl::Error),
130 ProtocolObjectTypeIncompatible,
131 ProtocolRightsIncompatible,
132 ProtocolSignalsIncompatible,
133 ProtocolStreamEventIncompatible,
134 Transport(Option<Arc<std::io::Error>>),
135 ConnectionMismatch,
136 StreamingAborted,
137}
138
139impl std::fmt::Display for Error {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 match self {
142 Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
143 write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
144 write_fdomain_error(error, f)
145 }
146 Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
147 write!(f, "While writing channel: ")?;
148 write_fdomain_error(error, f)
149 }
150 Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
151 write!(f, "Couldn't write all handles into a channel:")?;
152 for (pos, error) in
153 errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
154 {
155 write!(f, "\n Handle in position {pos}: ")?;
156 write_fdomain_error(error, f)?;
157 }
158 Ok(())
159 }
160 Self::ProtocolObjectTypeIncompatible => {
161 write!(
162 f,
163 "The FDomain protocol received an unrecognized or incompatible object type"
164 )
165 }
166 Self::ProtocolRightsIncompatible => {
167 write!(
168 f,
169 "The FDomain protocol received unrecognized or incompatible handle rights"
170 )
171 }
172 Self::ProtocolSignalsIncompatible => {
173 write!(f, "The FDomain protocol received unrecognized or incompatible signals")
174 }
175 Self::ProtocolStreamEventIncompatible => {
176 write!(
177 f,
178 "The FDomain protocol received an unrecognized or incompatible streaming IO event"
179 )
180 }
181 Self::FDomain(e) => write_fdomain_error(e, f),
182 Self::Protocol(e) => write!(f, "Protocol error: {e}"),
183 Self::Transport(Some(e)) => write!(f, "Transport error: {e}"),
184 Self::Transport(None) => write!(f, "Connection to the device has been lost"),
185 Self::ConnectionMismatch => {
186 write!(
187 f,
188 "Tried to use an FDomain handle with a different connection than the one it was created on"
189 )
190 }
191 Self::StreamingAborted => write!(f, "Streaming on this channel has been aborted"),
192 }
193 }
194}
195
196impl std::fmt::Debug for Error {
197 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 match self {
199 Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
200 Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
201 Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
202 Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
203 Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
204 Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
205 Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
206 Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
207 Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
208 Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
209 Self::StreamingAborted => write!(f, "StreamingAborted"),
210 }
211 }
212}
213
214impl std::error::Error for Error {}
215
216impl From<FDomainError> for Error {
217 fn from(other: FDomainError) -> Self {
218 Self::FDomain(other)
219 }
220}
221
222impl From<::fidl::Error> for Error {
223 fn from(other: ::fidl::Error) -> Self {
224 Self::Protocol(other)
225 }
226}
227
228impl From<WriteSocketError> for Error {
229 fn from(other: WriteSocketError) -> Self {
230 Self::SocketWrite(other)
231 }
232}
233
234impl From<WriteChannelError> for Error {
235 fn from(other: WriteChannelError) -> Self {
236 Self::ChannelWrite(other)
237 }
238}
239
240#[derive(Clone)]
244enum InnerError {
245 Protocol(::fidl::Error),
246 ProtocolStreamEventIncompatible,
247 Transport(Option<Arc<std::io::Error>>),
248}
249
250impl From<InnerError> for Error {
251 fn from(other: InnerError) -> Self {
252 match other {
253 InnerError::Protocol(p) => Error::Protocol(p),
254 InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
255 InnerError::Transport(t) => Error::Transport(t),
256 }
257 }
258}
259
260impl From<::fidl::Error> for InnerError {
261 fn from(other: ::fidl::Error) -> Self {
262 InnerError::Protocol(other)
263 }
264}
265
266pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
276 fn poll_send_message(
279 self: Pin<&mut Self>,
280 msg: &[u8],
281 ctx: &mut Context<'_>,
282 ) -> Poll<Result<(), Option<std::io::Error>>>;
283
284 fn debug_fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286 Ok(())
287 }
288
289 fn has_debug_fmt(&self) -> bool {
291 false
292 }
293}
294
295enum Transport {
301 Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
302 Error(InnerError),
303}
304
305impl Transport {
306 fn error(&self) -> Option<InnerError> {
308 match self {
309 Transport::Transport(_, _, _) => None,
310 Transport::Error(inner_error) => Some(inner_error.clone()),
311 }
312 }
313
314 fn push_msg(&mut self, msg: Box<[u8]>) -> Result<(), InnerError> {
316 match self {
317 Transport::Transport(_, v, w) => {
318 v.push_back(msg);
319 w.drain(..).for_each(Waker::wake);
320 Ok(())
321 }
322 Transport::Error(e) => Err(e.clone()),
323 }
324 }
325
326 fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
328 match self {
329 Transport::Error(e) => Poll::Ready(e.clone()),
330 Transport::Transport(t, v, w) => {
331 while let Some(msg) = v.front() {
332 match t.as_mut().poll_send_message(msg, ctx) {
333 Poll::Ready(Ok(())) => {
334 v.pop_front();
335 }
336 Poll::Ready(Err(e)) => {
337 let e = e.map(Arc::new);
338 return Poll::Ready(InnerError::Transport(e));
339 }
340 Poll::Pending => return Poll::Pending,
341 }
342 }
343
344 if v.is_empty() {
345 w.push(ctx.waker().clone());
346 } else {
347 ctx.waker().wake_by_ref();
348 }
349 Poll::Pending
350 }
351 }
352 }
353
354 fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Result<Box<[u8]>, InnerError>> {
356 match self {
357 Transport::Error(e) => Poll::Ready(Err(e.clone())),
358 Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
359 Some(Ok(x)) => Poll::Ready(Ok(x)),
360 Some(Err(e)) => Poll::Ready(Err(InnerError::Transport(Some(Arc::new(e))))),
361 Option::None => Poll::Ready(Err(InnerError::Transport(None))),
362 },
363 }
364 }
365}
366
367impl Drop for Transport {
368 fn drop(&mut self) {
369 if let Transport::Transport(_, _, wakers) = self {
370 wakers.drain(..).for_each(Waker::wake);
371 }
372 }
373}
374
375struct SocketReadState {
377 wakers: Vec<Waker>,
378 queued: VecDeque<Result<proto::SocketData, Error>>,
379 read_request_pending: bool,
380 is_streaming: bool,
381}
382
383impl SocketReadState {
384 fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) -> Vec<Waker> {
387 self.queued.push_back(msg);
388 std::mem::replace(&mut self.wakers, Vec::new())
389 }
390}
391
392struct ChannelReadState {
394 wakers: Vec<Waker>,
395 queued: VecDeque<Result<proto::ChannelMessage, Error>>,
396 read_request_pending: bool,
397 is_streaming: bool,
398}
399
400impl ChannelReadState {
401 fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) -> Vec<Waker> {
404 self.queued.push_back(msg);
405 std::mem::replace(&mut self.wakers, Vec::new())
406 }
407}
408
409struct ClientInner {
411 transport: Transport,
412 transactions: HashMap<NonZeroU32, responder::Responder>,
413 channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
414 socket_read_states: HashMap<proto::HandleId, SocketReadState>,
415 next_tx_id: u32,
416 waiting_to_close: Vec<proto::HandleId>,
417 waiting_to_close_waker: Waker,
418
419 wakers_to_wake: Vec<Waker>,
425}
426
427impl ClientInner {
428 fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
430 if ordinal != ordinals::CLOSE {
431 self.process_waiting_to_close();
432 }
433 let tx_id = self.next_tx_id;
434
435 let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
436 let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
437 self.next_tx_id += 1;
438 if let Err(e) = self.transport.push_msg(msg.into()) {
439 let _ = responder.handle(self, Err(e.into()));
440 } else {
441 assert!(
442 self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
443 "Allocated same tx id twice!"
444 );
445 }
446 }
447
448 fn process_waiting_to_close(&mut self) {
449 if !self.waiting_to_close.is_empty() {
450 let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
451 for handle in &handles {
454 let _ = self.channel_read_states.remove(handle);
455 let _ = self.socket_read_states.remove(handle);
456 }
457 self.request(
458 ordinals::CLOSE,
459 proto::FDomainCloseRequest { handles },
460 Responder::Ignore,
461 );
462 }
463 }
464
465 fn try_poll_transport(
468 &mut self,
469 ctx: &mut Context<'_>,
470 ) -> Poll<Result<Infallible, InnerError>> {
471 self.process_waiting_to_close();
472
473 self.waiting_to_close_waker = ctx.waker().clone();
474
475 loop {
476 if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
477 return Poll::Ready(Err(e));
478 }
479 let Poll::Ready(result) = self.transport.poll_next(ctx) else {
480 return Poll::Pending;
481 };
482 let data = result?;
483 let (header, data) = fidl_message::decode_transaction_header(&data)?;
484
485 let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
486 let wakers = self.process_event(header, data)?;
487 self.wakers_to_wake.extend(wakers);
488 continue;
489 };
490
491 let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
492 tx.handle(self, Ok((header, data)))?;
493 }
494 }
495
496 fn process_event(
498 &mut self,
499 header: TransactionHeader,
500 data: &[u8],
501 ) -> Result<Vec<Waker>, InnerError> {
502 match header.ordinal {
503 ordinals::ON_SOCKET_STREAMING_DATA => {
504 let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
505 header, data,
506 )?;
507 let o =
508 self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
509 wakers: Vec::new(),
510 queued: VecDeque::new(),
511 is_streaming: false,
512 read_request_pending: false,
513 });
514 match msg.socket_message {
515 proto::SocketMessage::Data(data) => Ok(o.handle_incoming_message(Ok(data))),
516 proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
517 let ret = if let Some(error) = error {
518 o.handle_incoming_message(Err(Error::FDomain(*error)))
519 } else {
520 Vec::new()
521 };
522 o.is_streaming = false;
523 Ok(ret)
524 }
525 _ => Err(InnerError::ProtocolStreamEventIncompatible),
526 }
527 }
528 ordinals::ON_CHANNEL_STREAMING_DATA => {
529 let msg = fidl_message::decode_message::<
530 proto::ChannelOnChannelStreamingDataRequest,
531 >(header, data)?;
532 let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
533 ChannelReadState {
534 wakers: Vec::new(),
535 queued: VecDeque::new(),
536 is_streaming: false,
537 read_request_pending: false,
538 }
539 });
540 match msg.channel_sent {
541 proto::ChannelSent::Message(data) => Ok(o.handle_incoming_message(Ok(data))),
542 proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
543 let ret = if let Some(error) = error {
544 o.handle_incoming_message(Err(Error::FDomain(*error)))
545 } else {
546 Vec::new()
547 };
548 o.is_streaming = false;
549 Ok(ret)
550 }
551 _ => Err(InnerError::ProtocolStreamEventIncompatible),
552 }
553 }
554 _ => Err(::fidl::Error::UnknownOrdinal {
555 ordinal: header.ordinal,
556 protocol_name:
557 <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
558 }
559 .into()),
560 }
561 }
562
563 fn poll_transport(&mut self, ctx: &mut Context<'_>) -> Poll<()> {
567 if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
568 for (_, v) in std::mem::take(&mut self.transactions) {
569 let _ = v.handle(self, Err(e.clone()));
570 }
571 for mut state in std::mem::take(&mut self.socket_read_states).into_values() {
572 state.queued.push_back(Err(Error::from(e.clone())));
573 self.wakers_to_wake.extend(state.wakers);
574 }
575 for (_, mut state) in self.channel_read_states.drain() {
576 state.queued.push_back(Err(Error::from(e.clone())));
577 self.wakers_to_wake.extend(state.wakers);
578 }
579 if matches!(self.transport, Transport::Transport(_, _, _)) {
580 self.transport = Transport::Error(e);
581 }
582
583 Poll::Ready(())
584 } else {
585 Poll::Pending
586 }
587 }
588
589 pub(crate) fn handle_socket_read_response(
591 &mut self,
592 msg: Result<proto::SocketData, Error>,
593 id: proto::HandleId,
594 ) {
595 let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
596 wakers: Vec::new(),
597 queued: VecDeque::new(),
598 is_streaming: false,
599 read_request_pending: false,
600 });
601 let wakers = state.handle_incoming_message(msg);
602 self.wakers_to_wake.extend(wakers);
603 state.read_request_pending = false;
604 }
605
606 pub(crate) fn handle_channel_read_response(
608 &mut self,
609 msg: Result<proto::ChannelMessage, Error>,
610 id: proto::HandleId,
611 ) {
612 let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
613 wakers: Vec::new(),
614 queued: VecDeque::new(),
615 is_streaming: false,
616 read_request_pending: false,
617 });
618 let wakers = state.handle_incoming_message(msg);
619 self.wakers_to_wake.extend(wakers);
620 state.read_request_pending = false;
621 }
622}
623
624impl Drop for ClientInner {
625 fn drop(&mut self) {
626 let responders = self.transactions.drain().map(|x| x.1).collect::<Vec<_>>();
627 for responder in responders {
628 let _ = responder.handle(self, Err(InnerError::Transport(None)));
629 }
630 for state in self.channel_read_states.values_mut() {
631 state.wakers.drain(..).for_each(Waker::wake);
632 }
633 for state in self.socket_read_states.values_mut() {
634 state.wakers.drain(..).for_each(Waker::wake);
635 }
636 self.waiting_to_close_waker.wake_by_ref();
637 self.wakers_to_wake.drain(..).for_each(Waker::wake);
638 }
639}
640
641pub struct Client(pub(crate) Mutex<ClientInner>);
648
649impl std::fmt::Debug for Client {
650 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
651 let inner = self.0.lock();
652 match &inner.transport {
653 Transport::Transport(transport, ..) if transport.has_debug_fmt() => {
654 write!(f, "Client(")?;
655 transport.debug_fmt(f)?;
656 write!(f, ")")
657 }
658 Transport::Error(error) => {
659 let error = Error::from(error.clone());
660 write!(f, "Client(Failed: {error})")
661 }
662 _ => f.debug_tuple("Client").field(&"<transport>").finish(),
663 }
664 }
665}
666
667pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
671 Arc::new(Client(Mutex::new(ClientInner {
672 transport: Transport::Error(InnerError::Transport(None)),
673 transactions: HashMap::new(),
674 channel_read_states: HashMap::new(),
675 socket_read_states: HashMap::new(),
676 next_tx_id: 1,
677 waiting_to_close: Vec::new(),
678 waiting_to_close_waker: std::task::Waker::noop().clone(),
679 wakers_to_wake: Vec::new(),
680 })))
681});
682
683pub struct ClientLoop {
689 client: Weak<Client>,
690 fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
691}
692
693impl Future for ClientLoop {
694 type Output = ();
695 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
696 self.fut.as_mut().poll(cx)
697 }
698}
699
700impl Drop for ClientLoop {
701 fn drop(&mut self) {
702 let Some(client) = self.client.upgrade() else {
703 return;
704 };
705
706 let (channel_read_states, socket_read_states, deferred_wakers) = {
707 let mut inner = client.0.lock();
708 let transactions = std::mem::take(&mut inner.transactions);
709 log::debug!("ClientLoop dropped, failing {} transactions", transactions.len());
710 for (_, v) in transactions {
711 let _ = v.handle(&mut *inner, Err(InnerError::Transport(None)));
712 }
713
714 let channel_read_states = std::mem::take(&mut inner.channel_read_states);
715 let socket_read_states = std::mem::take(&mut inner.socket_read_states);
716
717 let deferred_wakers = std::mem::replace(&mut inner.wakers_to_wake, Vec::new());
718
719 (channel_read_states, socket_read_states, deferred_wakers)
720 };
721
722 log::debug!("Failing reads on {} channels", channel_read_states.len());
723 for (_, mut state) in channel_read_states {
724 state.queued.push_back(Err(Error::Transport(None)));
725 state.wakers.into_iter().for_each(Waker::wake);
726 }
727
728 log::debug!("Failing reads on {} sockets", socket_read_states.len());
729 for (_, mut state) in socket_read_states {
730 state.queued.push_back(Err(Error::Transport(None)));
731 state.wakers.into_iter().for_each(Waker::wake);
732 }
733
734 deferred_wakers.into_iter().for_each(Waker::wake);
735 }
736}
737
738impl Client {
739 pub fn transport_status(&self) -> Result<()> {
740 match &self.0.lock().transport {
741 Transport::Error(e) => Err(e.clone().into()),
742 Transport::Transport(_, _, _) => Ok(()),
743 }
744 }
745 pub fn new(
752 transport: impl FDomainTransport + 'static,
753 ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
754 let ret = Arc::new(Client(Mutex::new(ClientInner {
755 transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
756 transactions: HashMap::new(),
757 socket_read_states: HashMap::new(),
758 channel_read_states: HashMap::new(),
759 next_tx_id: 1,
760 waiting_to_close: Vec::new(),
761 waiting_to_close_waker: std::task::Waker::noop().clone(),
762 wakers_to_wake: Vec::new(),
763 })));
764
765 let client_weak = Arc::downgrade(&ret);
766 let fut = futures::future::poll_fn(move |ctx| {
767 let Some(client) = client_weak.upgrade() else {
768 return Poll::Ready(());
769 };
770
771 let (ret, deferred_wakers) = {
772 let mut inner = client.0.lock();
773 let ret = inner.poll_transport(ctx);
774 let deferred_wakers = std::mem::replace(&mut inner.wakers_to_wake, Vec::new());
775 (ret, deferred_wakers)
776 };
777 deferred_wakers.into_iter().for_each(Waker::wake);
778 ret
779 });
780
781 let client_loop = ClientLoop { client: Arc::downgrade(&ret), fut: Box::pin(fut) };
782
783 (ret, client_loop)
784 }
785
786 pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
788 let new_handle = self.new_hid();
789 self.transaction(
790 ordinals::GET_NAMESPACE,
791 proto::FDomainGetNamespaceRequest { new_handle },
792 Responder::Namespace,
793 )
794 .await?;
795 Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
796 }
797
798 pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
800 let id_a = self.new_hid();
801 let id_b = self.new_hid();
802 let fut = self.transaction(
803 ordinals::CREATE_CHANNEL,
804 proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
805 Responder::CreateChannel,
806 );
807
808 fuchsia_async::Task::spawn(async move {
809 if let Err(e) = fut.await {
810 log::debug!("FDomain channel creation failed: {e}");
811 }
812 })
813 .detach();
814
815 (
816 Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
817 Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
818 )
819 }
820
821 pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
823 self: &Arc<Self>,
824 ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
825 let (client, server) = self.create_channel();
826 let client_end = crate::fidl::ClientEnd::<F>::new(client);
827 let server_end = crate::fidl::ServerEnd::new(server);
828 (client_end, server_end)
829 }
830
831 pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
833 self: &Arc<Self>,
834 ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
835 let (client_end, server_end) = self.create_endpoints::<F>();
836 (client_end.into_proxy(), server_end)
837 }
838
839 pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
841 self: &Arc<Self>,
842 ) -> (F::Proxy, F::RequestStream) {
843 let (client_end, server_end) = self.create_endpoints::<F>();
844 (client_end.into_proxy(), server_end.into_stream())
845 }
846
847 pub fn create_request_stream<F: crate::fidl::ProtocolMarker>(
849 self: &Arc<Self>,
850 ) -> (crate::fidl::ClientEnd<F>, F::RequestStream) {
851 let (client_end, server_end) = self.create_endpoints::<F>();
852 (client_end, server_end.into_stream())
853 }
854
855 fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
857 let id_a = self.new_hid();
858 let id_b = self.new_hid();
859 let fut = self.transaction(
860 ordinals::CREATE_SOCKET,
861 proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
862 Responder::CreateSocket,
863 );
864
865 fuchsia_async::Task::spawn(async move {
866 if let Err(e) = fut.await {
867 log::debug!("FDomain socket creation failed: {e}");
868 }
869 })
870 .detach();
871
872 (
873 Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
874 Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
875 )
876 }
877
878 pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
880 self.create_socket(proto::SocketType::Stream)
881 }
882
883 pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
885 self.create_socket(proto::SocketType::Datagram)
886 }
887
888 pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
890 let id_a = self.new_hid();
891 let id_b = self.new_hid();
892 let fut = self.transaction(
893 ordinals::CREATE_EVENT_PAIR,
894 proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
895 Responder::CreateEventPair,
896 );
897
898 fuchsia_async::Task::spawn(async move {
899 if let Err(e) = fut.await {
900 log::debug!("FDomain event pair creation failed: {e}");
901 }
902 })
903 .detach();
904
905 (
906 EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
907 EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
908 )
909 }
910
911 pub fn create_event(self: &Arc<Self>) -> Event {
913 let id = self.new_hid();
914 let fut = self.transaction(
915 ordinals::CREATE_EVENT,
916 proto::EventCreateEventRequest { handle: id },
917 Responder::CreateEvent,
918 );
919
920 fuchsia_async::Task::spawn(async move {
921 if let Err(e) = fut.await {
922 log::debug!("FDomain event creation failed: {e}");
923 }
924 })
925 .detach();
926
927 Event(Handle { id: id.id, client: Arc::downgrade(self) })
928 }
929
930 pub(crate) fn new_hid(&self) -> proto::NewHandleId {
932 proto::NewHandleId { id: rand::random::<u32>() >> 1 }
937 }
938
939 pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
945 self: &Arc<Self>,
946 ordinal: u64,
947 request: S,
948 f: F,
949 ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
950 where
951 F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
952 {
953 let mut inner = self.0.lock();
954
955 let (sender, receiver) = futures::channel::oneshot::channel();
956 inner.request(ordinal, request, f(sender));
957 receiver.map(|x| x.expect("Oneshot went away without reply!"))
958 }
959
960 pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
962 let mut inner = self.0.lock();
963 if let Some(e) = inner.transport.error() {
964 return Err(e.into());
965 }
966
967 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
968 wakers: Vec::new(),
969 queued: VecDeque::new(),
970 is_streaming: false,
971 read_request_pending: false,
972 });
973
974 assert!(!state.is_streaming, "Initiated streaming twice!");
975 state.is_streaming = true;
976
977 inner.request(
978 ordinals::READ_SOCKET_STREAMING_START,
979 proto::SocketReadSocketStreamingStartRequest { handle: id },
980 Responder::Ignore,
981 );
982 Ok(())
983 }
984
985 pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
989 let mut inner = self.0.lock();
990 if let Some(state) = inner.socket_read_states.get_mut(&id) {
991 if state.is_streaming {
992 state.is_streaming = false;
993 let _ = inner.request(
995 ordinals::READ_SOCKET_STREAMING_STOP,
996 proto::ChannelReadChannelStreamingStopRequest { handle: id },
997 Responder::Ignore,
998 );
999 }
1000 }
1001 }
1002
1003 pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
1005 let mut inner = self.0.lock();
1006 if let Some(e) = inner.transport.error() {
1007 return Err(e.into());
1008 }
1009 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
1010 wakers: Vec::new(),
1011 queued: VecDeque::new(),
1012 is_streaming: false,
1013 read_request_pending: false,
1014 });
1015
1016 assert!(!state.is_streaming, "Initiated streaming twice!");
1017 state.is_streaming = true;
1018
1019 inner.request(
1020 ordinals::READ_CHANNEL_STREAMING_START,
1021 proto::ChannelReadChannelStreamingStartRequest { handle: id },
1022 Responder::Ignore,
1023 );
1024
1025 Ok(())
1026 }
1027
1028 pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
1032 let mut inner = self.0.lock();
1033 if let Some(state) = inner.channel_read_states.get_mut(&id) {
1034 if state.is_streaming {
1035 state.is_streaming = false;
1036 let _ = inner.request(
1038 ordinals::READ_CHANNEL_STREAMING_STOP,
1039 proto::ChannelReadChannelStreamingStopRequest { handle: id },
1040 Responder::Ignore,
1041 );
1042 }
1043 }
1044 }
1045
1046 pub(crate) fn poll_socket(
1048 &self,
1049 id: proto::HandleId,
1050 ctx: &mut Context<'_>,
1051 out: &mut [u8],
1052 ) -> Poll<Result<usize, Error>> {
1053 let mut inner = self.0.lock();
1054 if let Some(error) = inner.transport.error() {
1055 return Poll::Ready(Err(error.into()));
1056 }
1057
1058 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
1059 wakers: Vec::new(),
1060 queued: VecDeque::new(),
1061 is_streaming: false,
1062 read_request_pending: false,
1063 });
1064
1065 if let Some(got) = state.queued.front_mut() {
1066 match got.as_mut() {
1067 Ok(data) => {
1068 let read_size = std::cmp::min(data.data.len(), out.len());
1069 out[..read_size].copy_from_slice(&data.data[..read_size]);
1070
1071 if data.data.len() > read_size && !data.is_datagram {
1072 let _ = data.data.drain(..read_size);
1073 } else {
1074 let _ = state.queued.pop_front();
1075 }
1076
1077 return Poll::Ready(Ok(read_size));
1078 }
1079 Err(_) => {
1080 let err = state.queued.pop_front().unwrap().unwrap_err();
1081 return Poll::Ready(Err(err));
1082 }
1083 }
1084 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
1085 state.wakers.push(ctx.waker().clone());
1086 }
1087
1088 if !state.read_request_pending && !state.is_streaming {
1089 inner.request(
1090 ordinals::READ_SOCKET,
1091 proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
1092 Responder::ReadSocket(id),
1093 );
1094 }
1095
1096 Poll::Pending
1097 }
1098
1099 pub(crate) fn poll_channel(
1101 &self,
1102 id: proto::HandleId,
1103 ctx: &mut Context<'_>,
1104 for_stream: bool,
1105 ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
1106 let mut inner = self.0.lock();
1107 if let Some(error) = inner.transport.error() {
1108 return Poll::Ready(Some(Err(error.into())));
1109 }
1110
1111 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
1112 wakers: Vec::new(),
1113 queued: VecDeque::new(),
1114 is_streaming: false,
1115 read_request_pending: false,
1116 });
1117
1118 if let Some(got) = state.queued.pop_front() {
1119 return Poll::Ready(Some(got));
1120 } else if for_stream && !state.is_streaming {
1121 return Poll::Ready(None);
1122 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
1123 state.wakers.push(ctx.waker().clone());
1124 }
1125
1126 if !state.read_request_pending && !state.is_streaming {
1127 inner.request(
1128 ordinals::READ_CHANNEL,
1129 proto::ChannelReadChannelRequest { handle: id },
1130 Responder::ReadChannel(id),
1131 );
1132 }
1133
1134 Poll::Pending
1135 }
1136
1137 pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
1139 let inner = self.0.lock();
1140 let Some(state) = inner.channel_read_states.get(&id) else {
1141 return false;
1142 };
1143 state.is_streaming
1144 }
1145
1146 pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
1149 let inner = self.0.lock();
1150 match handles {
1151 proto::Handles::Handles(handles) => {
1152 for handle in handles {
1153 assert!(
1154 !(inner.channel_read_states.contains_key(handle)
1155 || inner.socket_read_states.contains_key(handle)),
1156 "Tried to transfer handle after reading"
1157 );
1158 }
1159 }
1160 proto::Handles::Dispositions(dispositions) => {
1161 for disposition in dispositions {
1162 match &disposition.handle {
1163 proto::HandleOp::Move_(handle) => assert!(
1164 !(inner.channel_read_states.contains_key(handle)
1165 || inner.socket_read_states.contains_key(handle)),
1166 "Tried to transfer handle after reading"
1167 ),
1168 proto::HandleOp::Duplicate(_) => (),
1170 }
1171 }
1172 }
1173 }
1174 }
1175}