1use fidl_message::TransactionHeader;
6use fuchsia_sync::Mutex;
7use futures::FutureExt;
8use futures::channel::oneshot::Sender as OneshotSender;
9use futures::stream::Stream as StreamTrait;
10use std::collections::{HashMap, VecDeque};
11use std::convert::Infallible;
12use std::future::Future;
13use std::num::NonZeroU32;
14use std::pin::Pin;
15use std::sync::{Arc, LazyLock};
16use std::task::{Context, Poll, Waker, ready};
17use {fidl_fuchsia_fdomain as proto, fuchsia_async as _};
18
19mod channel;
20mod event;
21mod event_pair;
22mod handle;
23mod responder;
24mod socket;
25
26#[cfg(test)]
27mod test;
28
29pub mod fidl;
30pub mod fidl_next;
31
32use responder::Responder;
33
34pub use channel::{
35 AnyHandle, Channel, ChannelMessageStream, ChannelWriter, HandleInfo, HandleOp, MessageBuf,
36};
37pub use event::Event;
38pub use event_pair::Eventpair as EventPair;
39pub use handle::unowned::Unowned;
40pub use handle::{
41 AsHandleRef, Handle, HandleBased, HandleRef, NullableHandle, OnFDomainSignals, Peered,
42};
43pub use proto::{Error as FDomainError, WriteChannelError, WriteSocketError};
44pub use socket::{Socket, SocketDisposition, SocketReadStream, SocketWriter};
45
46#[rustfmt::skip]
48pub use Handle as Clock;
49#[rustfmt::skip]
50pub use Handle as Fifo;
51#[rustfmt::skip]
52pub use Handle as Job;
53#[rustfmt::skip]
54pub use Handle as Process;
55#[rustfmt::skip]
56pub use Handle as Resource;
57#[rustfmt::skip]
58pub use Handle as Stream;
59#[rustfmt::skip]
60pub use Handle as Thread;
61#[rustfmt::skip]
62pub use Handle as Vmar;
63#[rustfmt::skip]
64pub use Handle as Vmo;
65
66use proto::f_domain_ordinals as ordinals;
67
68fn write_fdomain_error(error: &FDomainError, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 match error {
70 FDomainError::TargetError(e) => {
71 let e = zx_status::Status::from_raw(*e);
72 write!(f, "Target-side error {e}")
73 }
74 FDomainError::BadHandleId(proto::BadHandleId { id }) => {
75 write!(f, "Tried to use invalid handle id {id}")
76 }
77 FDomainError::WrongHandleType(proto::WrongHandleType { expected, got }) => write!(
78 f,
79 "Tried to use handle as {expected:?} but target reported handle was of type {got:?}"
80 ),
81 FDomainError::StreamingReadInProgress(proto::StreamingReadInProgress {}) => {
82 write!(f, "Handle is occupied delivering streaming reads")
83 }
84 FDomainError::NoReadInProgress(proto::NoReadInProgress {}) => {
85 write!(f, "No streaming read was in progress")
86 }
87 FDomainError::NewHandleIdOutOfRange(proto::NewHandleIdOutOfRange { id }) => {
88 write!(
89 f,
90 "Tried to create a handle with id {id}, which is outside the valid range for client handles"
91 )
92 }
93 FDomainError::NewHandleIdReused(proto::NewHandleIdReused { id, same_call }) => {
94 if *same_call {
95 write!(f, "Tried to create two or more new handles with the same id {id}")
96 } else {
97 write!(
98 f,
99 "Tried to create a new handle with id {id}, which is already the id of an existing handle"
100 )
101 }
102 }
103 FDomainError::WroteToSelf(proto::WroteToSelf {}) => {
104 write!(f, "Tried to write a channel into itself")
105 }
106 FDomainError::ClosedDuringRead(proto::ClosedDuringRead {}) => {
107 write!(f, "Handle closed while being read")
108 }
109 _ => todo!(),
110 }
111}
112
113pub type Result<T, E = Error> = std::result::Result<T, E>;
115
116#[derive(Clone)]
118pub enum Error {
119 SocketWrite(WriteSocketError),
120 ChannelWrite(WriteChannelError),
121 FDomain(FDomainError),
122 Protocol(::fidl::Error),
123 ProtocolObjectTypeIncompatible,
124 ProtocolRightsIncompatible,
125 ProtocolSignalsIncompatible,
126 ProtocolStreamEventIncompatible,
127 Transport(Option<Arc<std::io::Error>>),
128 ConnectionMismatch,
129 StreamingAborted,
130}
131
132impl std::fmt::Display for Error {
133 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134 match self {
135 Self::SocketWrite(proto::WriteSocketError { error, wrote }) => {
136 write!(f, "While writing socket (after {wrote} bytes written successfully): ")?;
137 write_fdomain_error(error, f)
138 }
139 Self::ChannelWrite(proto::WriteChannelError::Error(error)) => {
140 write!(f, "While writing channel: ")?;
141 write_fdomain_error(error, f)
142 }
143 Self::ChannelWrite(proto::WriteChannelError::OpErrors(errors)) => {
144 write!(f, "Couldn't write all handles into a channel:")?;
145 for (pos, error) in
146 errors.iter().enumerate().filter_map(|(num, x)| x.as_ref().map(|y| (num, &**y)))
147 {
148 write!(f, "\n Handle in position {pos}: ")?;
149 write_fdomain_error(error, f)?;
150 }
151 Ok(())
152 }
153 Self::ProtocolObjectTypeIncompatible => {
154 write!(f, "The FDomain protocol does not recognize an object type")
155 }
156 Self::ProtocolRightsIncompatible => {
157 write!(f, "The FDomain protocol does not recognize some rights")
158 }
159 Self::ProtocolSignalsIncompatible => {
160 write!(f, "The FDomain protocol does not recognize some signals")
161 }
162 Self::ProtocolStreamEventIncompatible => {
163 write!(f, "The FDomain protocol does not recognize a received streaming IO event")
164 }
165 Self::FDomain(e) => write_fdomain_error(e, f),
166 Self::Protocol(e) => write!(f, "Protocol error: {e}"),
167 Self::Transport(Some(e)) => write!(f, "Transport error: {e:?}"),
168 Self::Transport(None) => write!(f, "Transport closed"),
169 Self::ConnectionMismatch => {
170 write!(f, "Tried to use an FDomain handle from a different connection")
171 }
172 Self::StreamingAborted => write!(f, "This channel is no longer streaming"),
173 }
174 }
175}
176
177impl std::fmt::Debug for Error {
178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179 match self {
180 Self::SocketWrite(e) => f.debug_tuple("SocketWrite").field(e).finish(),
181 Self::ChannelWrite(e) => f.debug_tuple("ChannelWrite").field(e).finish(),
182 Self::FDomain(e) => f.debug_tuple("FDomain").field(e).finish(),
183 Self::Protocol(e) => f.debug_tuple("Protocol").field(e).finish(),
184 Self::Transport(e) => f.debug_tuple("Transport").field(e).finish(),
185 Self::ProtocolObjectTypeIncompatible => write!(f, "ProtocolObjectTypeIncompatible "),
186 Self::ProtocolRightsIncompatible => write!(f, "ProtocolRightsIncompatible "),
187 Self::ProtocolSignalsIncompatible => write!(f, "ProtocolSignalsIncompatible "),
188 Self::ProtocolStreamEventIncompatible => write!(f, "ProtocolStreamEventIncompatible"),
189 Self::ConnectionMismatch => write!(f, "ConnectionMismatch"),
190 Self::StreamingAborted => write!(f, "StreamingAborted"),
191 }
192 }
193}
194
195impl std::error::Error for Error {}
196
197impl From<FDomainError> for Error {
198 fn from(other: FDomainError) -> Self {
199 Self::FDomain(other)
200 }
201}
202
203impl From<::fidl::Error> for Error {
204 fn from(other: ::fidl::Error) -> Self {
205 Self::Protocol(other)
206 }
207}
208
209impl From<WriteSocketError> for Error {
210 fn from(other: WriteSocketError) -> Self {
211 Self::SocketWrite(other)
212 }
213}
214
215impl From<WriteChannelError> for Error {
216 fn from(other: WriteChannelError) -> Self {
217 Self::ChannelWrite(other)
218 }
219}
220
221#[derive(Clone)]
225enum InnerError {
226 Protocol(::fidl::Error),
227 ProtocolStreamEventIncompatible,
228 Transport(Option<Arc<std::io::Error>>),
229}
230
231impl From<InnerError> for Error {
232 fn from(other: InnerError) -> Self {
233 match other {
234 InnerError::Protocol(p) => Error::Protocol(p),
235 InnerError::ProtocolStreamEventIncompatible => Error::ProtocolStreamEventIncompatible,
236 InnerError::Transport(t) => Error::Transport(t),
237 }
238 }
239}
240
241impl From<::fidl::Error> for InnerError {
242 fn from(other: ::fidl::Error) -> Self {
243 InnerError::Protocol(other)
244 }
245}
246
247pub trait FDomainTransport: StreamTrait<Item = Result<Box<[u8]>, std::io::Error>> + Send {
257 fn poll_send_message(
260 self: Pin<&mut Self>,
261 msg: &[u8],
262 ctx: &mut Context<'_>,
263 ) -> Poll<Result<(), Option<std::io::Error>>>;
264}
265
266enum Transport {
272 Transport(Pin<Box<dyn FDomainTransport>>, VecDeque<Box<[u8]>>, Vec<Waker>),
273 Error(InnerError),
274}
275
276impl Transport {
277 fn error(&self) -> Option<InnerError> {
279 match self {
280 Transport::Transport(_, _, _) => None,
281 Transport::Error(inner_error) => Some(inner_error.clone()),
282 }
283 }
284
285 fn push_msg(&mut self, msg: Box<[u8]>) {
287 if let Transport::Transport(_, v, w) = self {
288 v.push_back(msg);
289 w.drain(..).for_each(Waker::wake);
290 }
291 }
292
293 fn poll_send_messages(&mut self, ctx: &mut Context<'_>) -> Poll<InnerError> {
295 match self {
296 Transport::Error(e) => Poll::Ready(e.clone()),
297 Transport::Transport(t, v, w) => {
298 while let Some(msg) = v.front() {
299 match t.as_mut().poll_send_message(msg, ctx) {
300 Poll::Ready(Ok(())) => {
301 v.pop_front();
302 }
303 Poll::Ready(Err(e)) => {
304 let e = e.map(Arc::new);
305 *self = Transport::Error(InnerError::Transport(e.clone()));
306 return Poll::Ready(InnerError::Transport(e));
307 }
308 Poll::Pending => return Poll::Pending,
309 }
310 }
311
312 if v.is_empty() {
313 w.push(ctx.waker().clone());
314 } else {
315 ctx.waker().wake_by_ref();
316 }
317 Poll::Pending
318 }
319 }
320 }
321
322 fn poll_next(&mut self, ctx: &mut Context<'_>) -> Poll<Result<Box<[u8]>, InnerError>> {
324 match self {
325 Transport::Error(e) => Poll::Ready(Err(e.clone())),
326 Transport::Transport(t, _, _) => match ready!(t.as_mut().poll_next(ctx)) {
327 Some(Ok(x)) => Poll::Ready(Ok(x)),
328 Some(Err(e)) => {
329 let e = Arc::new(e);
330 *self = Transport::Error(InnerError::Transport(Some(Arc::clone(&e))));
331 Poll::Ready(Err(InnerError::Transport(Some(e))))
332 }
333 Option::None => Poll::Ready(Err(InnerError::Transport(None))),
334 },
335 }
336 }
337}
338
339struct SocketReadState {
341 wakers: Vec<Waker>,
342 queued: VecDeque<Result<proto::SocketData, Error>>,
343 read_request_pending: bool,
344 is_streaming: bool,
345}
346
347impl SocketReadState {
348 fn handle_incoming_message(&mut self, msg: Result<proto::SocketData, Error>) {
351 self.queued.push_back(msg);
352 self.wakers.drain(..).for_each(Waker::wake);
353 }
354}
355
356struct ChannelReadState {
358 wakers: Vec<Waker>,
359 queued: VecDeque<Result<proto::ChannelMessage, Error>>,
360 read_request_pending: bool,
361 is_streaming: bool,
362}
363
364impl ChannelReadState {
365 fn handle_incoming_message(&mut self, msg: Result<proto::ChannelMessage, Error>) {
368 self.queued.push_back(msg);
369 self.wakers.drain(..).for_each(Waker::wake);
370 }
371}
372
373struct ClientInner {
375 transport: Transport,
376 transactions: HashMap<NonZeroU32, responder::Responder>,
377 channel_read_states: HashMap<proto::HandleId, ChannelReadState>,
378 socket_read_states: HashMap<proto::HandleId, SocketReadState>,
379 next_tx_id: u32,
380 waiting_to_close: Vec<proto::HandleId>,
381 waiting_to_close_waker: Waker,
382}
383
384impl ClientInner {
385 fn request<S: fidl_message::Body>(&mut self, ordinal: u64, request: S, responder: Responder) {
387 let tx_id = self.next_tx_id;
388
389 let header = TransactionHeader::new(tx_id, ordinal, fidl_message::DynamicFlags::FLEXIBLE);
390 let msg = fidl_message::encode_message(header, request).expect("Could not encode request!");
391 self.next_tx_id += 1;
392 assert!(
393 self.transactions.insert(tx_id.try_into().unwrap(), responder).is_none(),
394 "Allocated same tx id twice!"
395 );
396 self.transport.push_msg(msg.into());
397 }
398
399 fn try_poll_transport(
402 &mut self,
403 ctx: &mut Context<'_>,
404 ) -> Poll<Result<Infallible, InnerError>> {
405 if !self.waiting_to_close.is_empty() {
406 let handles = std::mem::replace(&mut self.waiting_to_close, Vec::new());
407 for handle in &handles {
410 let _ = self.channel_read_states.remove(handle);
411 let _ = self.socket_read_states.remove(handle);
412 }
413 self.request(
414 ordinals::CLOSE,
415 proto::FDomainCloseRequest { handles },
416 Responder::Ignore,
417 );
418 }
419
420 self.waiting_to_close_waker = ctx.waker().clone();
421
422 loop {
423 if let Poll::Ready(e) = self.transport.poll_send_messages(ctx) {
424 for state in std::mem::take(&mut self.socket_read_states).into_values() {
425 state.wakers.into_iter().for_each(Waker::wake);
426 }
427 for (_, state) in self.channel_read_states.drain() {
428 state.wakers.into_iter().for_each(Waker::wake);
429 }
430 return Poll::Ready(Err(e));
431 }
432 let Poll::Ready(result) = self.transport.poll_next(ctx) else {
433 return Poll::Pending;
434 };
435 let data = result?;
436 let (header, data) = match fidl_message::decode_transaction_header(&data) {
437 Ok(x) => x,
438 Err(e) => {
439 self.transport = Transport::Error(InnerError::Protocol(e));
440 continue;
441 }
442 };
443
444 let Some(tx_id) = NonZeroU32::new(header.tx_id) else {
445 if let Err(e) = self.process_event(header, data) {
446 self.transport = Transport::Error(e);
447 }
448 continue;
449 };
450
451 let tx = self.transactions.remove(&tx_id).ok_or(::fidl::Error::InvalidResponseTxid)?;
452 match tx.handle(self, Ok((header, data))) {
453 Ok(x) => x,
454 Err(e) => {
455 self.transport = Transport::Error(InnerError::Protocol(e));
456 continue;
457 }
458 }
459 }
460 }
461
462 fn process_event(&mut self, header: TransactionHeader, data: &[u8]) -> Result<(), InnerError> {
464 match header.ordinal {
465 ordinals::ON_SOCKET_STREAMING_DATA => {
466 let msg = fidl_message::decode_message::<proto::SocketOnSocketStreamingDataRequest>(
467 header, data,
468 )?;
469 let o =
470 self.socket_read_states.entry(msg.handle).or_insert_with(|| SocketReadState {
471 wakers: Vec::new(),
472 queued: VecDeque::new(),
473 is_streaming: false,
474 read_request_pending: false,
475 });
476 match msg.socket_message {
477 proto::SocketMessage::Data(data) => {
478 o.handle_incoming_message(Ok(data));
479 Ok(())
480 }
481 proto::SocketMessage::Stopped(proto::AioStopped { error }) => {
482 if let Some(error) = error {
483 o.handle_incoming_message(Err(Error::FDomain(*error)));
484 }
485 o.is_streaming = false;
486 Ok(())
487 }
488 _ => Err(InnerError::ProtocolStreamEventIncompatible),
489 }
490 }
491 ordinals::ON_CHANNEL_STREAMING_DATA => {
492 let msg = fidl_message::decode_message::<
493 proto::ChannelOnChannelStreamingDataRequest,
494 >(header, data)?;
495 let o = self.channel_read_states.entry(msg.handle).or_insert_with(|| {
496 ChannelReadState {
497 wakers: Vec::new(),
498 queued: VecDeque::new(),
499 is_streaming: false,
500 read_request_pending: false,
501 }
502 });
503 match msg.channel_sent {
504 proto::ChannelSent::Message(data) => {
505 o.handle_incoming_message(Ok(data));
506 Ok(())
507 }
508 proto::ChannelSent::Stopped(proto::AioStopped { error }) => {
509 if let Some(error) = error {
510 o.handle_incoming_message(Err(Error::FDomain(*error)));
511 }
512 o.is_streaming = false;
513 Ok(())
514 }
515 _ => Err(InnerError::ProtocolStreamEventIncompatible),
516 }
517 }
518 _ => Err(::fidl::Error::UnknownOrdinal {
519 ordinal: header.ordinal,
520 protocol_name:
521 <proto::FDomainMarker as ::fidl::endpoints::ProtocolMarker>::DEBUG_NAME,
522 }
523 .into()),
524 }
525 }
526
527 fn poll_transport(&mut self, ctx: &mut Context<'_>) -> Poll<()> {
531 if let Poll::Ready(Err(e)) = self.try_poll_transport(ctx) {
532 for (_, v) in std::mem::take(&mut self.transactions) {
533 let _ = v.handle(self, Err(e.clone()));
534 }
535
536 Poll::Ready(())
537 } else {
538 Poll::Pending
539 }
540 }
541
542 pub(crate) fn handle_socket_read_response(
544 &mut self,
545 msg: Result<proto::SocketData, Error>,
546 id: proto::HandleId,
547 ) {
548 let state = self.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
549 wakers: Vec::new(),
550 queued: VecDeque::new(),
551 is_streaming: false,
552 read_request_pending: false,
553 });
554 state.handle_incoming_message(msg);
555 state.read_request_pending = false;
556 }
557
558 pub(crate) fn handle_channel_read_response(
560 &mut self,
561 msg: Result<proto::ChannelMessage, Error>,
562 id: proto::HandleId,
563 ) {
564 let state = self.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
565 wakers: Vec::new(),
566 queued: VecDeque::new(),
567 is_streaming: false,
568 read_request_pending: false,
569 });
570 state.handle_incoming_message(msg);
571 state.read_request_pending = false;
572 }
573}
574
575impl Drop for ClientInner {
576 fn drop(&mut self) {
577 let responders = self.transactions.drain().map(|x| x.1).collect::<Vec<_>>();
578 for responder in responders {
579 let _ = responder.handle(self, Err(InnerError::Transport(None)));
580 }
581 for state in self.channel_read_states.values_mut() {
582 state.wakers.drain(..).for_each(Waker::wake);
583 }
584 for state in self.socket_read_states.values_mut() {
585 state.wakers.drain(..).for_each(Waker::wake);
586 }
587 }
588}
589
590pub struct Client(pub(crate) Mutex<ClientInner>);
597
598impl std::fmt::Debug for Client {
599 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
600 f.debug_tuple("Client").field(&"...").finish()
601 }
602}
603
604pub(crate) static DEAD_CLIENT: LazyLock<Arc<Client>> = LazyLock::new(|| {
608 Arc::new(Client(Mutex::new(ClientInner {
609 transport: Transport::Error(InnerError::Transport(None)),
610 transactions: HashMap::new(),
611 channel_read_states: HashMap::new(),
612 socket_read_states: HashMap::new(),
613 next_tx_id: 1,
614 waiting_to_close: Vec::new(),
615 waiting_to_close_waker: std::task::Waker::noop().clone(),
616 })))
617});
618
619impl Client {
620 pub fn new(
627 transport: impl FDomainTransport + 'static,
628 ) -> (Arc<Self>, impl Future<Output = ()> + Send + 'static) {
629 let ret = Arc::new(Client(Mutex::new(ClientInner {
630 transport: Transport::Transport(Box::pin(transport), VecDeque::new(), Vec::new()),
631 transactions: HashMap::new(),
632 socket_read_states: HashMap::new(),
633 channel_read_states: HashMap::new(),
634 next_tx_id: 1,
635 waiting_to_close: Vec::new(),
636 waiting_to_close_waker: std::task::Waker::noop().clone(),
637 })));
638
639 let client_weak = Arc::downgrade(&ret);
640 let fut = futures::future::poll_fn(move |ctx| {
641 let Some(client) = client_weak.upgrade() else {
642 return Poll::Ready(());
643 };
644
645 client.0.lock().poll_transport(ctx)
646 });
647
648 (ret, fut)
649 }
650
651 pub async fn namespace(self: &Arc<Self>) -> Result<Channel, Error> {
653 let new_handle = self.new_hid();
654 self.transaction(
655 ordinals::GET_NAMESPACE,
656 proto::FDomainGetNamespaceRequest { new_handle },
657 Responder::Namespace,
658 )
659 .await?;
660 Ok(Channel(Handle { id: new_handle.id, client: Arc::downgrade(self) }))
661 }
662
663 pub fn create_channel(self: &Arc<Self>) -> (Channel, Channel) {
665 let id_a = self.new_hid();
666 let id_b = self.new_hid();
667 let fut = self.transaction(
668 ordinals::CREATE_CHANNEL,
669 proto::ChannelCreateChannelRequest { handles: [id_a, id_b] },
670 Responder::CreateChannel,
671 );
672
673 fuchsia_async::Task::spawn(async move {
674 if let Err(e) = fut.await {
675 log::debug!("FDomain channel creation failed: {e}");
676 }
677 })
678 .detach();
679
680 (
681 Channel(Handle { id: id_a.id, client: Arc::downgrade(self) }),
682 Channel(Handle { id: id_b.id, client: Arc::downgrade(self) }),
683 )
684 }
685
686 pub fn create_endpoints<F: crate::fidl::ProtocolMarker>(
688 self: &Arc<Self>,
689 ) -> (crate::fidl::ClientEnd<F>, crate::fidl::ServerEnd<F>) {
690 let (client, server) = self.create_channel();
691 let client_end = crate::fidl::ClientEnd::<F>::new(client);
692 let server_end = crate::fidl::ServerEnd::new(server);
693 (client_end, server_end)
694 }
695
696 pub fn create_proxy<F: crate::fidl::ProtocolMarker>(
698 self: &Arc<Self>,
699 ) -> (F::Proxy, crate::fidl::ServerEnd<F>) {
700 let (client_end, server_end) = self.create_endpoints::<F>();
701 (client_end.into_proxy(), server_end)
702 }
703
704 pub fn create_proxy_and_stream<F: crate::fidl::ProtocolMarker>(
706 self: &Arc<Self>,
707 ) -> (F::Proxy, F::RequestStream) {
708 let (client_end, server_end) = self.create_endpoints::<F>();
709 (client_end.into_proxy(), server_end.into_stream())
710 }
711
712 pub fn create_request_stream<F: crate::fidl::ProtocolMarker>(
714 self: &Arc<Self>,
715 ) -> (crate::fidl::ClientEnd<F>, F::RequestStream) {
716 let (client_end, server_end) = self.create_endpoints::<F>();
717 (client_end, server_end.into_stream())
718 }
719
720 fn create_socket(self: &Arc<Self>, options: proto::SocketType) -> (Socket, Socket) {
722 let id_a = self.new_hid();
723 let id_b = self.new_hid();
724 let fut = self.transaction(
725 ordinals::CREATE_SOCKET,
726 proto::SocketCreateSocketRequest { handles: [id_a, id_b], options },
727 Responder::CreateSocket,
728 );
729
730 fuchsia_async::Task::spawn(async move {
731 if let Err(e) = fut.await {
732 log::debug!("FDomain socket creation failed: {e}");
733 }
734 })
735 .detach();
736
737 (
738 Socket(Handle { id: id_a.id, client: Arc::downgrade(self) }),
739 Socket(Handle { id: id_b.id, client: Arc::downgrade(self) }),
740 )
741 }
742
743 pub fn create_stream_socket(self: &Arc<Self>) -> (Socket, Socket) {
745 self.create_socket(proto::SocketType::Stream)
746 }
747
748 pub fn create_datagram_socket(self: &Arc<Self>) -> (Socket, Socket) {
750 self.create_socket(proto::SocketType::Datagram)
751 }
752
753 pub fn create_event_pair(self: &Arc<Self>) -> (EventPair, EventPair) {
755 let id_a = self.new_hid();
756 let id_b = self.new_hid();
757 let fut = self.transaction(
758 ordinals::CREATE_EVENT_PAIR,
759 proto::EventPairCreateEventPairRequest { handles: [id_a, id_b] },
760 Responder::CreateEventPair,
761 );
762
763 fuchsia_async::Task::spawn(async move {
764 if let Err(e) = fut.await {
765 log::debug!("FDomain event pair creation failed: {e}");
766 }
767 })
768 .detach();
769
770 (
771 EventPair(Handle { id: id_a.id, client: Arc::downgrade(self) }),
772 EventPair(Handle { id: id_b.id, client: Arc::downgrade(self) }),
773 )
774 }
775
776 pub fn create_event(self: &Arc<Self>) -> Event {
778 let id = self.new_hid();
779 let fut = self.transaction(
780 ordinals::CREATE_EVENT,
781 proto::EventCreateEventRequest { handle: id },
782 Responder::CreateEvent,
783 );
784
785 fuchsia_async::Task::spawn(async move {
786 if let Err(e) = fut.await {
787 log::debug!("FDomain event creation failed: {e}");
788 }
789 })
790 .detach();
791
792 Event(Handle { id: id.id, client: Arc::downgrade(self) })
793 }
794
795 pub(crate) fn new_hid(&self) -> proto::NewHandleId {
797 proto::NewHandleId { id: rand::random::<u32>() >> 1 }
802 }
803
804 pub(crate) fn transaction<S: fidl_message::Body, R: 'static, F>(
810 self: &Arc<Self>,
811 ordinal: u64,
812 request: S,
813 f: F,
814 ) -> impl Future<Output = Result<R, Error>> + 'static + use<S, R, F>
815 where
816 F: Fn(OneshotSender<Result<R, Error>>) -> Responder,
817 {
818 let mut inner = self.0.lock();
819
820 let (sender, receiver) = futures::channel::oneshot::channel();
821 inner.request(ordinal, request, f(sender));
822 receiver.map(|x| x.expect("Oneshot went away without reply!"))
823 }
824
825 pub(crate) fn start_socket_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
827 let mut inner = self.0.lock();
828 if let Some(e) = inner.transport.error() {
829 return Err(e.into());
830 }
831
832 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
833 wakers: Vec::new(),
834 queued: VecDeque::new(),
835 is_streaming: false,
836 read_request_pending: false,
837 });
838
839 assert!(!state.is_streaming, "Initiated streaming twice!");
840 state.is_streaming = true;
841
842 inner.request(
843 ordinals::READ_SOCKET_STREAMING_START,
844 proto::SocketReadSocketStreamingStartRequest { handle: id },
845 Responder::Ignore,
846 );
847 Ok(())
848 }
849
850 pub(crate) fn stop_socket_streaming(&self, id: proto::HandleId) {
854 let mut inner = self.0.lock();
855 if let Some(state) = inner.socket_read_states.get_mut(&id) {
856 if state.is_streaming {
857 state.is_streaming = false;
858 let _ = inner.request(
860 ordinals::READ_SOCKET_STREAMING_STOP,
861 proto::ChannelReadChannelStreamingStopRequest { handle: id },
862 Responder::Ignore,
863 );
864 }
865 }
866 }
867
868 pub(crate) fn start_channel_streaming(&self, id: proto::HandleId) -> Result<(), Error> {
870 let mut inner = self.0.lock();
871 if let Some(e) = inner.transport.error() {
872 return Err(e.into());
873 }
874 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
875 wakers: Vec::new(),
876 queued: VecDeque::new(),
877 is_streaming: false,
878 read_request_pending: false,
879 });
880
881 assert!(!state.is_streaming, "Initiated streaming twice!");
882 state.is_streaming = true;
883
884 inner.request(
885 ordinals::READ_CHANNEL_STREAMING_START,
886 proto::ChannelReadChannelStreamingStartRequest { handle: id },
887 Responder::Ignore,
888 );
889
890 Ok(())
891 }
892
893 pub(crate) fn stop_channel_streaming(&self, id: proto::HandleId) {
897 let mut inner = self.0.lock();
898 if let Some(state) = inner.channel_read_states.get_mut(&id) {
899 if state.is_streaming {
900 state.is_streaming = false;
901 let _ = inner.request(
903 ordinals::READ_CHANNEL_STREAMING_STOP,
904 proto::ChannelReadChannelStreamingStopRequest { handle: id },
905 Responder::Ignore,
906 );
907 }
908 }
909 }
910
911 pub(crate) fn poll_socket(
913 &self,
914 id: proto::HandleId,
915 ctx: &mut Context<'_>,
916 out: &mut [u8],
917 ) -> Poll<Result<usize, Error>> {
918 let mut inner = self.0.lock();
919 if let Some(error) = inner.transport.error() {
920 return Poll::Ready(Err(error.into()));
921 }
922
923 let state = inner.socket_read_states.entry(id).or_insert_with(|| SocketReadState {
924 wakers: Vec::new(),
925 queued: VecDeque::new(),
926 is_streaming: false,
927 read_request_pending: false,
928 });
929
930 if let Some(got) = state.queued.front_mut() {
931 match got.as_mut() {
932 Ok(data) => {
933 let read_size = std::cmp::min(data.data.len(), out.len());
934 out[..read_size].copy_from_slice(&data.data[..read_size]);
935
936 if data.data.len() > read_size && !data.is_datagram {
937 let _ = data.data.drain(..read_size);
938 } else {
939 let _ = state.queued.pop_front();
940 }
941
942 return Poll::Ready(Ok(read_size));
943 }
944 Err(_) => {
945 let err = state.queued.pop_front().unwrap().unwrap_err();
946 return Poll::Ready(Err(err));
947 }
948 }
949 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
950 state.wakers.push(ctx.waker().clone());
951 }
952
953 if !state.read_request_pending && !state.is_streaming {
954 inner.request(
955 ordinals::READ_SOCKET,
956 proto::SocketReadSocketRequest { handle: id, max_bytes: out.len() as u64 },
957 Responder::ReadSocket(id),
958 );
959 }
960
961 Poll::Pending
962 }
963
964 pub(crate) fn poll_channel(
966 &self,
967 id: proto::HandleId,
968 ctx: &mut Context<'_>,
969 for_stream: bool,
970 ) -> Poll<Option<Result<proto::ChannelMessage, Error>>> {
971 let mut inner = self.0.lock();
972 if let Some(error) = inner.transport.error() {
973 return Poll::Ready(Some(Err(error.into())));
974 }
975
976 let state = inner.channel_read_states.entry(id).or_insert_with(|| ChannelReadState {
977 wakers: Vec::new(),
978 queued: VecDeque::new(),
979 is_streaming: false,
980 read_request_pending: false,
981 });
982
983 if let Some(got) = state.queued.pop_front() {
984 return Poll::Ready(Some(got));
985 } else if for_stream && !state.is_streaming {
986 return Poll::Ready(None);
987 } else if !state.wakers.iter().any(|x| ctx.waker().will_wake(x)) {
988 state.wakers.push(ctx.waker().clone());
989 }
990
991 if !state.read_request_pending && !state.is_streaming {
992 inner.request(
993 ordinals::READ_CHANNEL,
994 proto::ChannelReadChannelRequest { handle: id },
995 Responder::ReadChannel(id),
996 );
997 }
998
999 Poll::Pending
1000 }
1001
1002 pub(crate) fn channel_is_streaming(&self, id: proto::HandleId) -> bool {
1004 let inner = self.0.lock();
1005 let Some(state) = inner.channel_read_states.get(&id) else {
1006 return false;
1007 };
1008 state.is_streaming
1009 }
1010
1011 pub(crate) fn clear_handles_for_transfer(&self, handles: &proto::Handles) {
1014 let inner = self.0.lock();
1015 match handles {
1016 proto::Handles::Handles(handles) => {
1017 for handle in handles {
1018 assert!(
1019 !(inner.channel_read_states.contains_key(handle)
1020 || inner.socket_read_states.contains_key(handle)),
1021 "Tried to transfer handle after reading"
1022 );
1023 }
1024 }
1025 proto::Handles::Dispositions(dispositions) => {
1026 for disposition in dispositions {
1027 match &disposition.handle {
1028 proto::HandleOp::Move_(handle) => assert!(
1029 !(inner.channel_read_states.contains_key(handle)
1030 || inner.socket_read_states.contains_key(handle)),
1031 "Tried to transfer handle after reading"
1032 ),
1033 proto::HandleOp::Duplicate(_) => (),
1035 }
1036 }
1037 }
1038 }
1039 }
1040}