1use crate::Error;
8use crate::encoding::{
9 Decode, Decoder, DefaultFuchsiaResourceDialect, DynamicFlags, Encode, Encoder, EpitaphBody,
10 MessageBufFor, ProxyChannelBox, ProxyChannelFor, ResourceDialect, TransactionHeader,
11 TransactionMessage, TransactionMessageType, TypeMarker, decode_transaction_header,
12};
13use fuchsia_sync::Mutex;
14use futures::future::{self, FusedFuture, Future, FutureExt, Map, MaybeDone};
15use futures::ready;
16use futures::stream::{FusedStream, Stream};
17use futures::task::{Context, Poll, Waker};
18use slab::Slab;
19use std::collections::VecDeque;
20use std::mem;
21use std::ops::ControlFlow;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{RawWaker, RawWakerVTable};
25use zx_status;
26
27#[doc(hidden)] pub fn decode_transaction_body<T: TypeMarker, D: ResourceDialect, const EXPECTED_ORDINAL: u64>(
30 mut buf: D::MessageBufEtc,
31) -> Result<T::Owned, Error>
32where
33 T::Owned: Decode<T, D>,
34{
35 let (bytes, handles) = buf.split_mut();
36 let (header, body_bytes) = decode_transaction_header(bytes)?;
37 if header.ordinal != EXPECTED_ORDINAL {
38 return Err(Error::InvalidResponseOrdinal);
39 }
40 let mut output = Decode::<T, D>::new_empty();
41 Decoder::<D>::decode_into::<T>(&header, body_bytes, handles, &mut output)?;
42 Ok(output)
43}
44
45#[derive(Debug, Clone)]
47pub struct Client<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
48 inner: Arc<ClientInner<D>>,
49}
50
51pub type DecodedQueryResponseFut<T, D = DefaultFuchsiaResourceDialect> = Map<
53 MessageResponse<D>,
54 fn(Result<<D as ResourceDialect>::MessageBufEtc, Error>) -> Result<T, Error>,
55>;
56
57#[derive(Debug)]
60#[must_use = "futures do nothing unless you `.await` or poll them"]
61pub struct QueryResponseFut<T, D: ResourceDialect = DefaultFuchsiaResourceDialect>(
62 pub MaybeDone<DecodedQueryResponseFut<T, D>>,
63);
64
65impl<T: Unpin, D: ResourceDialect> FusedFuture for QueryResponseFut<T, D> {
66 fn is_terminated(&self) -> bool {
67 matches!(self.0, MaybeDone::Gone)
68 }
69}
70
71impl<T: Unpin, D: ResourceDialect> Future for QueryResponseFut<T, D> {
72 type Output = Result<T, Error>;
73
74 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
75 ready!(self.0.poll_unpin(cx));
76 let maybe_done = Pin::new(&mut self.0);
77 Poll::Ready(maybe_done.take_output().unwrap_or(Err(Error::PollAfterCompletion)))
78 }
79}
80
81impl<T> QueryResponseFut<T> {
82 pub fn check(self) -> Result<Self, Error> {
89 match self.0 {
90 MaybeDone::Done(Err(e)) => Err(e),
91 x => Ok(QueryResponseFut(x)),
92 }
93 }
94}
95
96const TXID_INTEREST_MASK: u32 = 0xFFFFFF;
97const TXID_GENERATION_SHIFT: usize = 24;
98const TXID_GENERATION_MASK: u8 = 0x7F;
99
100#[derive(Debug, Copy, Clone, PartialEq, Eq)]
102pub struct Txid(u32);
103#[derive(Debug, Copy, Clone, PartialEq, Eq)]
105struct InterestId(usize);
106
107impl InterestId {
108 fn from_txid(txid: Txid) -> Self {
109 InterestId((txid.0 & TXID_INTEREST_MASK) as usize - 1)
110 }
111}
112
113impl Txid {
114 fn from_interest_id(int_id: InterestId, generation: u8) -> Self {
115 let id = (int_id.0 as u32 + 1) & TXID_INTEREST_MASK;
118 let generation = (generation & TXID_GENERATION_MASK) as u32;
120
121 let txid = (generation << TXID_GENERATION_SHIFT) | id;
126
127 Txid(txid)
128 }
129
130 pub fn as_raw_id(&self) -> u32 {
132 self.0
133 }
134}
135
136impl From<u32> for Txid {
137 fn from(txid: u32) -> Self {
138 Self(txid)
139 }
140}
141
142impl<D: ResourceDialect> Client<D> {
143 pub fn new(channel: D::ProxyChannel, protocol_name: &'static str) -> Client<D> {
148 Client {
149 inner: Arc::new(ClientInner {
150 channel: channel.boxed(),
151 interests: Mutex::default(),
152 terminal_error: Mutex::default(),
153 protocol_name,
154 }),
155 }
156 }
157
158 pub fn as_channel(&self) -> &D::ProxyChannel {
160 self.inner.channel.as_channel()
161 }
162
163 pub fn into_channel(self) -> Result<D::ProxyChannel, Self> {
170 match Arc::try_unwrap(self.inner) {
180 Ok(inner) => {
181 if inner.interests.lock().messages.is_empty() || inner.channel.is_closed() {
182 Ok(inner.channel.unbox())
183 } else {
184 Err(Self { inner: Arc::new(inner) })
189 }
190 }
191 Err(inner) => Err(Self { inner }),
192 }
193 }
194
195 pub fn take_event_receiver(&self) -> EventReceiver<D> {
198 {
199 let mut lock = self.inner.interests.lock();
200
201 if let EventListener::None = lock.event_listener {
202 lock.event_listener = EventListener::WillPoll;
203 } else {
204 panic!("Event stream was already taken");
205 }
206 }
207
208 EventReceiver { inner: self.inner.clone(), state: EventReceiverState::Active }
209 }
210
211 pub fn send<T: TypeMarker>(
213 &self,
214 body: impl Encode<T, D>,
215 ordinal: u64,
216 dynamic_flags: DynamicFlags,
217 ) -> Result<(), Error> {
218 let msg =
219 TransactionMessage { header: TransactionHeader::new(0, ordinal, dynamic_flags), body };
220 crate::encoding::with_tls_encoded::<TransactionMessageType<T>, D, ()>(
221 msg,
222 |bytes, handles| self.send_raw(bytes, handles),
223 )
224 }
225
226 pub fn send_query<Request: TypeMarker, Response: TypeMarker, const ORDINAL: u64>(
228 &self,
229 body: impl Encode<Request, D>,
230 dynamic_flags: DynamicFlags,
231 ) -> QueryResponseFut<Response::Owned, D>
232 where
233 Response::Owned: Decode<Response, D>,
234 {
235 self.send_query_and_decode::<Request, Response::Owned>(
236 body,
237 ORDINAL,
238 dynamic_flags,
239 |buf| buf.and_then(decode_transaction_body::<Response, D, ORDINAL>),
240 )
241 }
242
243 pub fn send_query_and_decode<Request: TypeMarker, Output>(
246 &self,
247 body: impl Encode<Request, D>,
248 ordinal: u64,
249 dynamic_flags: DynamicFlags,
250 decode: fn(Result<D::MessageBufEtc, Error>) -> Result<Output, Error>,
251 ) -> QueryResponseFut<Output, D> {
252 let send_result = self.send_raw_query(|tx_id, bytes, handles| {
253 let msg = TransactionMessage {
254 header: TransactionHeader::new(tx_id.as_raw_id(), ordinal, dynamic_flags),
255 body,
256 };
257 Encoder::encode::<TransactionMessageType<Request>>(bytes, handles, msg)?;
258 Ok(())
259 });
260
261 QueryResponseFut(match send_result {
262 Ok(res_fut) => future::maybe_done(res_fut.map(decode)),
263 Err(e) => MaybeDone::Done(Err(e)),
264 })
265 }
266
267 pub fn send_raw(
269 &self,
270 bytes: &[u8],
271 handles: &mut [<D::ProxyChannel as ProxyChannelFor<D>>::HandleDisposition],
272 ) -> Result<(), Error> {
273 match self.inner.channel.write_etc(bytes, handles) {
274 Ok(()) | Err(None) => Ok(()),
275 Err(Some(e)) => Err(Error::ClientWrite(e.into())),
276 }
277 }
278
279 pub fn send_raw_query<F>(&self, encode_msg: F) -> Result<MessageResponse<D>, Error>
281 where
282 F: for<'a, 'b> FnOnce(
283 Txid,
284 &'a mut Vec<u8>,
285 &'b mut Vec<<D::ProxyChannel as ProxyChannelFor<D>>::HandleDisposition>,
286 ) -> Result<(), Error>,
287 {
288 let id = self.inner.interests.lock().register_msg_interest();
289 crate::encoding::with_tls_encode_buf::<_, D>(|bytes, handles| {
290 encode_msg(id, bytes, handles)?;
291 self.send_raw(bytes, handles)
292 })?;
293
294 Ok(MessageResponse { id, client: Some(self.inner.clone()) })
295 }
296}
297
298#[must_use]
299#[derive(Debug)]
301pub struct MessageResponse<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
302 id: Txid,
303 client: Option<Arc<ClientInner<D>>>,
305}
306
307impl<D: ResourceDialect> Unpin for MessageResponse<D> {}
308
309impl<D: ResourceDialect> Future for MessageResponse<D> {
310 type Output = Result<D::MessageBufEtc, Error>;
311 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
312 let this = &mut *self;
313 let res;
314 {
315 let client = this.client.as_ref().ok_or(Error::PollAfterCompletion)?;
316 res = client.poll_recv_msg_response(this.id, cx);
317 }
318
319 if let Poll::Ready(Ok(_)) = res {
321 this.client.take().expect("MessageResponse polled after completion");
322 }
323
324 res
325 }
326}
327
328impl<D: ResourceDialect> Drop for MessageResponse<D> {
329 fn drop(&mut self) {
330 if let Some(client) = &self.client {
331 client.interests.lock().deregister(self.id);
332 }
333 }
334}
335
336#[derive(Debug)]
339enum MessageInterest<D: ResourceDialect> {
340 WillPoll,
342 Waiting(Waker),
344 Received(D::MessageBufEtc),
346 Discard,
349}
350
351impl<D: ResourceDialect> MessageInterest<D> {
352 fn is_received(&self) -> bool {
354 matches!(*self, MessageInterest::Received(_))
355 }
356
357 fn unwrap_received(self) -> D::MessageBufEtc {
358 if let MessageInterest::Received(buf) = self {
359 buf
360 } else {
361 panic!("EXPECTED received message")
362 }
363 }
364}
365
366#[derive(Debug)]
367enum EventReceiverState {
368 Active,
369 Terminal,
370 Terminated,
371}
372
373#[derive(Debug)]
375pub struct EventReceiver<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
376 inner: Arc<ClientInner<D>>,
377 state: EventReceiverState,
378}
379
380impl<D: ResourceDialect> Unpin for EventReceiver<D> {}
381
382impl<D: ResourceDialect> FusedStream for EventReceiver<D> {
383 fn is_terminated(&self) -> bool {
384 matches!(self.state, EventReceiverState::Terminated)
385 }
386}
387
388impl<D: ResourceDialect> Stream for EventReceiver<D> {
393 type Item = Result<D::MessageBufEtc, Error>;
394
395 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
396 match self.state {
397 EventReceiverState::Active => {}
398 EventReceiverState::Terminated => {
399 panic!("polled EventReceiver after `None`");
400 }
401 EventReceiverState::Terminal => {
402 self.state = EventReceiverState::Terminated;
403 return Poll::Ready(None);
404 }
405 }
406
407 Poll::Ready(match ready!(self.inner.poll_recv_event(cx)) {
408 Ok(x) => Some(Ok(x)),
409 Err(Error::ClientChannelClosed { status: zx_status::Status::PEER_CLOSED, .. }) => {
410 self.state = EventReceiverState::Terminated;
413 None
414 }
415 err @ Err(_) => {
416 self.state = EventReceiverState::Terminal;
419 Some(err)
420 }
421 })
422 }
423}
424
425impl<D: ResourceDialect> Drop for EventReceiver<D> {
426 fn drop(&mut self) {
427 self.inner.interests.lock().dropped_event_listener();
428 }
429}
430
431#[derive(Debug, Default)]
432enum EventListener {
433 #[default]
435 None,
436 WillPoll,
438 Some(Waker),
440}
441
442impl EventListener {
443 fn is_some(&self) -> bool {
444 matches!(self, EventListener::Some(_))
445 }
446}
447
448#[derive(Debug)]
450struct ClientInner<D: ResourceDialect> {
451 channel: <D::ProxyChannel as ProxyChannelFor<D>>::Boxed,
453
454 interests: Mutex<Interests<D>>,
456
457 terminal_error: Mutex<Option<Error>>,
460
461 protocol_name: &'static str,
463}
464
465#[derive(Debug)]
466struct Interests<D: ResourceDialect> {
467 messages: Slab<MessageInterest<D>>,
468 events: VecDeque<D::MessageBufEtc>,
469 event_listener: EventListener,
470 waker_count: usize,
472 generation: u8,
478}
479
480impl<D: ResourceDialect> Default for Interests<D> {
481 fn default() -> Self {
482 Interests {
483 messages: Slab::new(),
484 events: Default::default(),
485 event_listener: Default::default(),
486 waker_count: 0,
487 generation: 0,
488 }
489 }
490}
491
492impl<D: ResourceDialect> Interests<D> {
493 fn push_event(&mut self, buf: D::MessageBufEtc) -> Option<Waker> {
495 self.events.push_back(buf);
496 self.take_event_waker()
497 }
498
499 fn take_event_waker(&mut self) -> Option<Waker> {
501 if self.event_listener.is_some() {
502 let EventListener::Some(waker) =
503 mem::replace(&mut self.event_listener, EventListener::WillPoll)
504 else {
505 unreachable!()
506 };
507
508 self.waker_count -= 1;
510 Some(waker)
511 } else {
512 None
513 }
514 }
515
516 fn event_waker(&self) -> Option<&Waker> {
518 match &self.event_listener {
519 EventListener::Some(waker) => Some(waker),
520 _ => None,
521 }
522 }
523
524 fn push_message(&mut self, txid: Txid, buf: D::MessageBufEtc) -> Result<Option<Waker>, Error> {
527 let InterestId(raw_id) = InterestId::from_txid(txid);
528 let Some(interest) = self.messages.get_mut(raw_id) else {
531 return Err(Error::InvalidResponseTxid);
533 };
534
535 let mut waker = None;
536 if let MessageInterest::Discard = interest {
537 self.messages.remove(raw_id);
538 } else if let MessageInterest::Waiting(w) =
539 mem::replace(interest, MessageInterest::Received(buf))
540 {
541 waker = Some(w);
542
543 self.waker_count -= 1;
545 }
546
547 Ok(waker)
548 }
549
550 fn register(&mut self, txid: Txid, cx: &Context<'_>) -> Option<D::MessageBufEtc> {
553 let InterestId(raw_id) = InterestId::from_txid(txid);
554 let interest = self.messages.get_mut(raw_id).expect("Polled unregistered interest");
555 match interest {
556 MessageInterest::Received(_) => {
557 return Some(self.messages.remove(raw_id).unwrap_received());
558 }
559 MessageInterest::Discard => panic!("Polled a discarded MessageReceiver?!"),
560 MessageInterest::WillPoll => self.waker_count += 1,
561 MessageInterest::Waiting(_) => {}
562 }
563 *interest = MessageInterest::Waiting(cx.waker().clone());
564 None
565 }
566
567 fn deregister(&mut self, txid: Txid) {
569 let InterestId(raw_id) = InterestId::from_txid(txid);
570 match self.messages[raw_id] {
571 MessageInterest::Received(_) => {
572 self.messages.remove(raw_id);
573 return;
574 }
575 MessageInterest::WillPoll => {}
576 MessageInterest::Waiting(_) => self.waker_count -= 1,
577 MessageInterest::Discard => unreachable!(),
578 }
579 self.messages[raw_id] = MessageInterest::Discard;
580 }
581
582 fn register_event_listener(&mut self, cx: &Context<'_>) -> Option<D::MessageBufEtc> {
584 self.events.pop_front().or_else(|| {
585 if !mem::replace(&mut self.event_listener, EventListener::Some(cx.waker().clone()))
586 .is_some()
587 {
588 self.waker_count += 1;
589 }
590 None
591 })
592 }
593
594 fn dropped_event_listener(&mut self) {
596 if self.event_listener.is_some() {
597 self.waker_count -= 1;
599 }
600 self.event_listener = EventListener::None;
601 }
602
603 fn register_msg_interest(&mut self) -> Txid {
608 self.generation = self.generation.wrapping_add(1);
609 Txid::from_interest_id(
612 InterestId(self.messages.insert(MessageInterest::WillPoll)),
613 self.generation,
614 )
615 }
616}
617
618impl<D: ResourceDialect> ClientInner<D> {
619 fn poll_recv_event(
620 self: &Arc<Self>,
621 cx: &Context<'_>,
622 ) -> Poll<Result<D::MessageBufEtc, Error>> {
623 if let Some(msg_buf) = self.interests.lock().register_event_listener(cx) {
625 return Poll::Ready(Ok(msg_buf));
626 }
627
628 let maybe_terminal_error = self.recv_all(Some(Txid(0)));
631
632 let mut lock = self.interests.lock();
633
634 if let Some(msg_buf) = lock.events.pop_front() {
635 Poll::Ready(Ok(msg_buf))
636 } else {
637 maybe_terminal_error?;
638 Poll::Pending
639 }
640 }
641
642 fn poll_recv_msg_response(
645 self: &Arc<Self>,
646 txid: Txid,
647 cx: &Context<'_>,
648 ) -> Poll<Result<D::MessageBufEtc, Error>> {
649 if let Some(buf) = self.interests.lock().register(txid, cx) {
651 return Poll::Ready(Ok(buf));
652 }
653
654 let maybe_terminal_error = self.recv_all(Some(txid));
657
658 let InterestId(raw_id) = InterestId::from_txid(txid);
659 let mut interests = self.interests.lock();
660 if interests.messages.get(raw_id).expect("Polled unregistered interest").is_received() {
661 let buf = interests.messages.remove(raw_id).unwrap_received();
664 Poll::Ready(Ok(buf))
665 } else {
666 maybe_terminal_error?;
667 Poll::Pending
668 }
669 }
670
671 fn recv_all(self: &Arc<Self>, want_txid: Option<Txid>) -> Result<(), Error> {
681 let mut terminal_error = self.terminal_error.lock();
685 if let Some(error) = terminal_error.as_ref() {
686 return Err(error.clone());
687 }
688
689 let recv_once = |waker| {
690 let cx = &mut Context::from_waker(&waker);
691
692 let mut buf = D::MessageBufEtc::new();
693 let result = self.channel.recv_etc_from(cx, &mut buf);
694 match result {
695 Poll::Ready(Ok(())) => {}
696 Poll::Ready(Err(None)) => {
697 return Err(Error::ClientChannelClosed {
700 status: zx_status::Status::PEER_CLOSED,
701 protocol_name: self.protocol_name,
702 epitaph: None,
703 #[cfg(not(target_os = "fuchsia"))]
704 reason: self.channel.closed_reason(),
705 });
706 }
707 Poll::Ready(Err(Some(e))) => return Err(Error::ClientRead(e.into())),
708 Poll::Pending => return Ok(ControlFlow::Break(())),
709 };
710
711 let (bytes, _) = buf.split_mut();
712 let (header, body_bytes) = decode_transaction_header(bytes)?;
713 if header.is_epitaph() {
714 let handles = &mut [];
716 let mut epitaph_body = Decode::<EpitaphBody, D>::new_empty();
717 Decoder::<D>::decode_into::<EpitaphBody>(
718 &header,
719 body_bytes,
720 handles,
721 &mut epitaph_body,
722 )?;
723 return Err(Error::ClientChannelClosed {
724 status: epitaph_body.error,
725 protocol_name: self.protocol_name,
726 epitaph: Some(epitaph_body.error.into_raw() as u32),
727 #[cfg(not(target_os = "fuchsia"))]
728 reason: self.channel.closed_reason(),
729 });
730 }
731
732 let txid = Txid(header.tx_id);
733
734 let waker = {
735 buf.shrink_bytes_to_fit();
736 let mut interests = self.interests.lock();
737 if txid == Txid(0) {
738 interests.push_event(buf)
739 } else {
740 interests.push_message(txid, buf)?
741 }
742 };
743
744 if want_txid != Some(txid)
746 && let Some(waker) = waker
747 {
748 waker.wake();
749 }
750
751 Ok(ControlFlow::Continue(()))
752 };
753
754 loop {
755 let waker = {
756 let interests = self.interests.lock();
757 if interests.waker_count == 0 {
758 return Ok(());
759 } else if interests.waker_count == 1 {
760 if let Some(waker) = interests.event_waker() {
766 waker.clone()
767 } else {
768 interests
769 .messages
770 .iter()
771 .find_map(|(_, interest)| {
772 if let MessageInterest::Waiting(waker) = interest {
773 Some(waker.clone())
774 } else {
775 None
776 }
777 })
778 .unwrap()
779 }
780 } else {
781 let weak = Arc::downgrade(self);
782 let waker = ClientWaker(Arc::new(move || {
783 if let Some(strong) = weak.upgrade() {
784 #[cfg(target_os = "fuchsia")]
787 if strong.recv_all(None).is_ok() {
788 return;
789 }
790
791 strong.wake_all();
792 }
793 }));
794 unsafe {
799 Waker::from_raw(RawWaker::new(
800 Arc::into_raw(Arc::new(waker)) as *const (),
801 &WAKER_VTABLE,
802 ))
803 }
804 }
805 };
806
807 match recv_once(waker) {
808 Ok(ControlFlow::Continue(())) => {}
809 Ok(ControlFlow::Break(())) => return Ok(()),
810 Err(error) => {
811 self.wake_all();
813 return Err(terminal_error.insert(error).clone());
814 }
815 }
816 }
817 }
818
819 fn wake_all(&self) {
821 let mut lock = self.interests.lock();
822 for (_, interest) in &mut lock.messages {
823 if let MessageInterest::Waiting(_) = interest {
824 let MessageInterest::Waiting(waker) =
825 mem::replace(interest, MessageInterest::WillPoll)
826 else {
827 unreachable!()
828 };
829 waker.wake();
830 }
831 }
832 if let Some(waker) = lock.take_event_waker() {
833 waker.wake();
834 }
835 lock.waker_count = 0;
836 }
837}
838
839#[derive(Clone)]
840struct ClientWaker(Arc<dyn Fn() + Send + Sync + 'static>);
841
842static WAKER_VTABLE: RawWakerVTable =
843 RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop_waker);
844
845unsafe fn clone_waker(data: *const ()) -> RawWaker {
846 unsafe { Arc::increment_strong_count(data as *const ClientWaker) };
847 RawWaker::new(data, &WAKER_VTABLE)
848}
849
850unsafe fn wake(data: *const ()) {
851 unsafe { Arc::from_raw(data as *const ClientWaker) }.0();
852}
853
854unsafe fn wake_by_ref(data: *const ()) {
855 mem::ManuallyDrop::new(unsafe { Arc::from_raw(data as *const ClientWaker) }).0();
856}
857
858unsafe fn drop_waker(data: *const ()) {
859 unsafe { Arc::from_raw(data as *const ClientWaker) };
860}
861
862#[cfg(target_os = "fuchsia")]
863pub mod sync {
864 use super::*;
867 use std::mem::MaybeUninit;
868 use zx::MessageBufEtc;
869
870 #[derive(Debug)]
872 pub struct Client {
873 channel: zx::Channel,
875
876 protocol_name: &'static str,
878 }
879
880 impl Client {
881 pub fn new(channel: zx::Channel, protocol_name: &'static str) -> Self {
883 Client { channel, protocol_name }
884 }
885
886 pub fn as_channel(&self) -> &zx::Channel {
888 &self.channel
889 }
890
891 pub fn into_channel(self) -> zx::Channel {
893 self.channel
894 }
895
896 pub fn send<T: TypeMarker>(
898 &self,
899 body: impl Encode<T, DefaultFuchsiaResourceDialect>,
900 ordinal: u64,
901 dynamic_flags: DynamicFlags,
902 ) -> Result<(), Error> {
903 let mut write_bytes = Vec::new();
904 let mut write_handles = Vec::new();
905 let msg = TransactionMessage {
906 header: TransactionHeader::new(0, ordinal, dynamic_flags),
907 body,
908 };
909 Encoder::encode::<TransactionMessageType<T>>(
910 &mut write_bytes,
911 &mut write_handles,
912 msg,
913 )?;
914 match self.channel.write_etc(&write_bytes, &mut write_handles) {
915 Ok(()) | Err(zx_status::Status::PEER_CLOSED) => Ok(()),
916 Err(e) => Err(Error::ClientWrite(e.into())),
917 }
918 }
919
920 pub fn send_query<Request: TypeMarker, Response: TypeMarker>(
922 &self,
923 body: impl Encode<Request, DefaultFuchsiaResourceDialect>,
924 ordinal: u64,
925 dynamic_flags: DynamicFlags,
926 deadline: zx::MonotonicInstant,
927 ) -> Result<Response::Owned, Error>
928 where
929 Response::Owned: Decode<Response, DefaultFuchsiaResourceDialect>,
930 {
931 let mut write_bytes = Vec::new();
932 let mut write_handles = Vec::new();
933
934 let msg = TransactionMessage {
935 header: TransactionHeader::new(0, ordinal, dynamic_flags),
936 body,
937 };
938 Encoder::encode::<TransactionMessageType<Request>>(
939 &mut write_bytes,
940 &mut write_handles,
941 msg,
942 )?;
943
944 let mut bytes_out =
947 Vec::<MaybeUninit<u8>>::with_capacity(zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize);
948 unsafe { bytes_out.set_len(zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize) };
951
952 let handles_out = &mut [const { MaybeUninit::<zx::HandleInfo>::uninit() };
956 zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize];
957
958 let (bytes_out, handles_out) = self
961 .channel
962 .call_etc_uninit(
963 deadline,
964 &write_bytes,
965 &mut write_handles,
966 bytes_out.as_mut_slice(),
967 handles_out,
968 )
969 .map_err(|e| self.wrap_error(Error::ClientCall, e))?;
970
971 let (header, body_bytes) = decode_transaction_header(bytes_out)?;
972 if header.ordinal != ordinal {
973 return Err(Error::InvalidResponseOrdinal);
974 }
975 let mut output = Decode::<Response, DefaultFuchsiaResourceDialect>::new_empty();
976 Decoder::<DefaultFuchsiaResourceDialect>::decode_into::<Response>(
977 &header,
978 body_bytes,
979 handles_out,
980 &mut output,
981 )?;
982 Ok(output)
983 }
984
985 pub fn wait_for_event(
987 &self,
988 deadline: zx::MonotonicInstant,
989 ) -> Result<MessageBufEtc, Error> {
990 let mut buf = zx::MessageBufEtc::new();
991 buf.ensure_capacity_bytes(zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize);
992 buf.ensure_capacity_handle_infos(zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize);
993
994 loop {
995 self.channel
996 .wait_one(
997 zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
998 deadline,
999 )
1000 .map_err(|e| self.wrap_error(Error::ClientEvent, e))?;
1001 match self.channel.read_etc(&mut buf) {
1002 Ok(()) => {
1003 let (header, body_bytes) = decode_transaction_header(buf.bytes())
1006 .map_err(|_| Error::InvalidHeader)?;
1007 if header.is_epitaph() {
1008 let handles = &mut [];
1011 let mut epitaph_body =
1012 Decode::<EpitaphBody, DefaultFuchsiaResourceDialect>::new_empty();
1013 Decoder::<DefaultFuchsiaResourceDialect>::decode_into::<EpitaphBody>(
1014 &header,
1015 body_bytes,
1016 handles,
1017 &mut epitaph_body,
1018 )?;
1019 return Err(Error::ClientChannelClosed {
1020 status: epitaph_body.error,
1021 protocol_name: self.protocol_name,
1022 epitaph: Some(epitaph_body.error.into_raw() as u32),
1023 });
1024 }
1025 if header.tx_id != 0 {
1026 return Err(Error::UnexpectedSyncResponse);
1027 }
1028 return Ok(buf);
1029 }
1030 Err(zx::Status::SHOULD_WAIT) => {
1031 continue;
1033 }
1034 Err(e) => {
1035 return Err(self.wrap_error(|x| Error::ClientRead(x.into()), e));
1036 }
1037 }
1038 }
1039 }
1040
1041 fn wrap_error<T: Fn(zx_status::Status) -> Error>(
1045 &self,
1046 variant: T,
1047 err: zx_status::Status,
1048 ) -> Error {
1049 if err == zx_status::Status::PEER_CLOSED {
1050 Error::ClientChannelClosed {
1051 status: zx_status::Status::PEER_CLOSED,
1052 protocol_name: self.protocol_name,
1053 epitaph: None,
1054 }
1055 } else {
1056 variant(err)
1057 }
1058 }
1059 }
1060}
1061
1062#[cfg(all(test, target_os = "fuchsia"))]
1063mod tests {
1064 use super::*;
1065 use crate::encoding::MAGIC_NUMBER_INITIAL;
1066 use crate::epitaph::{self, ChannelEpitaphExt};
1067 use anyhow::{Context as _, Error};
1068 use assert_matches::assert_matches;
1069 use fuchsia_async as fasync;
1070 use fuchsia_async::{Channel as AsyncChannel, DurationExt, TimeoutExt};
1071 use futures::channel::oneshot;
1072 use futures::stream::FuturesUnordered;
1073 use futures::task::{ArcWake, noop_waker, waker};
1074 use futures::{StreamExt, TryFutureExt, join};
1075 use futures_test::task::new_count_waker;
1076 use std::future::pending;
1077 use std::thread;
1078 use zx::{AsHandleRef, MessageBufEtc};
1079
1080 const SEND_ORDINAL_HIGH_BYTE: u8 = 42;
1081 const SEND_ORDINAL: u64 = 42 << 32;
1082 const SEND_DATA: u8 = 55;
1083
1084 const EVENT_ORDINAL: u64 = 854 << 23;
1085
1086 #[rustfmt::skip]
1087 fn expected_sent_bytes(txid_index: u8, txid_generation: u8) -> [u8; 24] {
1088 [
1089 txid_index, 0, 0, txid_generation, 2, 0, 0, MAGIC_NUMBER_INITIAL,
1092 0, 0, 0, 0, SEND_ORDINAL_HIGH_BYTE, 0, 0, 0, SEND_DATA, 0, 0, 0, 0, 0, 0, 0, ]
1097 }
1098
1099 fn expected_sent_bytes_oneway() -> [u8; 24] {
1100 expected_sent_bytes(0, 0)
1101 }
1102
1103 fn send_transaction(header: TransactionHeader, channel: &zx::Channel) {
1104 let (bytes, handles) = (&mut vec![], &mut vec![]);
1105 encode_transaction(header, bytes, handles);
1106 channel.write_etc(bytes, handles).expect("Server channel write failed");
1107 }
1108
1109 fn encode_transaction(
1110 header: TransactionHeader,
1111 bytes: &mut Vec<u8>,
1112 handles: &mut Vec<zx::HandleDisposition<'static>>,
1113 ) {
1114 let event = TransactionMessage { header, body: SEND_DATA };
1115 Encoder::<DefaultFuchsiaResourceDialect>::encode::<TransactionMessageType<u8>>(
1116 bytes, handles, event,
1117 )
1118 .expect("Encoding failure");
1119 }
1120
1121 #[test]
1122 fn sync_client() -> Result<(), Error> {
1123 let (client_end, server_end) = zx::Channel::create();
1124 let client = sync::Client::new(client_end, "test_protocol");
1125 client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()).context("sending")?;
1126 let mut received = MessageBufEtc::new();
1127 server_end.read_etc(&mut received).context("reading")?;
1128 assert_eq!(received.bytes(), expected_sent_bytes_oneway());
1129 Ok(())
1130 }
1131
1132 #[test]
1133 fn sync_client_with_response() -> Result<(), Error> {
1134 let (client_end, server_end) = zx::Channel::create();
1135 let client = sync::Client::new(client_end, "test_protocol");
1136 thread::spawn(move || {
1137 let mut received = MessageBufEtc::new();
1139 server_end
1140 .wait_one(
1141 zx::Signals::CHANNEL_READABLE,
1142 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1143 )
1144 .expect("failed to wait for channel readable");
1145 server_end.read_etc(&mut received).expect("failed to read on server end");
1146 let (buf, _handles) = received.split_mut();
1147 let (header, _body_bytes) = decode_transaction_header(buf).expect("server decode");
1148 assert_eq!(header.ordinal, SEND_ORDINAL);
1149 send_transaction(
1150 TransactionHeader::new(header.tx_id, header.ordinal, DynamicFlags::empty()),
1151 &server_end,
1152 );
1153 });
1154 let response_data = client
1155 .send_query::<u8, u8>(
1156 SEND_DATA,
1157 SEND_ORDINAL,
1158 DynamicFlags::empty(),
1159 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1160 )
1161 .context("sending query")?;
1162 assert_eq!(SEND_DATA, response_data);
1163 Ok(())
1164 }
1165
1166 #[test]
1167 fn sync_client_with_event_and_response() -> Result<(), Error> {
1168 let (client_end, server_end) = zx::Channel::create();
1169 let client = sync::Client::new(client_end, "test_protocol");
1170 thread::spawn(move || {
1171 let mut received = MessageBufEtc::new();
1173 server_end
1174 .as_handle_ref()
1175 .wait_one(
1176 zx::Signals::CHANNEL_READABLE,
1177 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1178 )
1179 .expect("failed to wait for channel readable");
1180 server_end.read_etc(&mut received).expect("failed to read on server end");
1181 let (buf, _handles) = received.split_mut();
1182 let (header, _body_bytes) = decode_transaction_header(buf).expect("server decode");
1183 assert_ne!(header.tx_id, 0);
1184 assert_eq!(header.ordinal, SEND_ORDINAL);
1185 send_transaction(
1187 TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
1188 &server_end,
1189 );
1190 send_transaction(
1193 TransactionHeader::new(header.tx_id, header.ordinal, DynamicFlags::empty()),
1194 &server_end,
1195 );
1196 });
1197 let response_data = client
1198 .send_query::<u8, u8>(
1199 SEND_DATA,
1200 SEND_ORDINAL,
1201 DynamicFlags::empty(),
1202 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
1203 )
1204 .context("sending query")?;
1205 assert_eq!(SEND_DATA, response_data);
1206
1207 let event_buf = client
1208 .wait_for_event(zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)))
1209 .context("waiting for event")?;
1210 let (bytes, _handles) = event_buf.split();
1211 let (header, _body) = decode_transaction_header(&bytes).expect("event decode");
1212 assert_eq!(header.ordinal, EVENT_ORDINAL);
1213
1214 Ok(())
1215 }
1216
1217 #[test]
1218 fn sync_client_with_racing_events() -> Result<(), Error> {
1219 let (client_end, server_end) = zx::Channel::create();
1220 let client1 = Arc::new(sync::Client::new(client_end, "test_protocol"));
1221 let client2 = client1.clone();
1222
1223 let thread1 = thread::spawn(move || {
1224 let result = client1.wait_for_event(zx::MonotonicInstant::after(
1225 zx::MonotonicDuration::from_seconds(5),
1226 ));
1227 assert!(result.is_ok());
1228 });
1229
1230 let thread2 = thread::spawn(move || {
1231 let result = client2.wait_for_event(zx::MonotonicInstant::after(
1232 zx::MonotonicDuration::from_seconds(5),
1233 ));
1234 assert!(result.is_ok());
1235 });
1236
1237 send_transaction(
1238 TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
1239 &server_end,
1240 );
1241 send_transaction(
1242 TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
1243 &server_end,
1244 );
1245
1246 assert!(thread1.join().is_ok());
1247 assert!(thread2.join().is_ok());
1248
1249 Ok(())
1250 }
1251
1252 #[test]
1253 fn sync_client_wait_for_event_gets_method_response() -> Result<(), Error> {
1254 let (client_end, server_end) = zx::Channel::create();
1255 let client = sync::Client::new(client_end, "test_protocol");
1256 send_transaction(
1257 TransactionHeader::new(3902304923, SEND_ORDINAL, DynamicFlags::empty()),
1258 &server_end,
1259 );
1260 assert_matches!(
1261 client.wait_for_event(zx::MonotonicInstant::after(
1262 zx::MonotonicDuration::from_seconds(5)
1263 )),
1264 Err(crate::Error::UnexpectedSyncResponse)
1265 );
1266 Ok(())
1267 }
1268
1269 #[test]
1270 fn sync_client_one_way_call_suceeds_after_peer_closed() -> Result<(), Error> {
1271 let (client_end, server_end) = zx::Channel::create();
1272 let client = sync::Client::new(client_end, "test_protocol");
1273 drop(server_end);
1274 assert_matches!(client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()), Ok(()));
1275 Ok(())
1276 }
1277
1278 #[test]
1279 fn sync_client_two_way_call_fails_after_peer_closed() -> Result<(), Error> {
1280 let (client_end, server_end) = zx::Channel::create();
1281 let client = sync::Client::new(client_end, "test_protocol");
1282 drop(server_end);
1283 assert_matches!(
1284 client.send_query::<u8, u8>(
1285 SEND_DATA,
1286 SEND_ORDINAL,
1287 DynamicFlags::empty(),
1288 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))
1289 ),
1290 Err(crate::Error::ClientChannelClosed {
1291 status: zx_status::Status::PEER_CLOSED,
1292 protocol_name: "test_protocol",
1293 epitaph: None,
1294 })
1295 );
1296 Ok(())
1297 }
1298
1299 #[test]
1302 fn sync_client_send_does_not_receive_epitaphs() -> Result<(), Error> {
1303 let (client_end, server_end) = zx::Channel::create();
1304 let client = sync::Client::new(client_end, "test_protocol");
1305 server_end
1307 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1308 .expect("failed to write epitaph");
1309 assert_matches!(
1310 client.send_query::<u8, u8>(
1311 SEND_DATA,
1312 SEND_ORDINAL,
1313 DynamicFlags::empty(),
1314 zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))
1315 ),
1316 Err(crate::Error::ClientChannelClosed {
1317 status: zx_status::Status::PEER_CLOSED,
1318 protocol_name: "test_protocol",
1319 epitaph: None,
1320 })
1321 );
1322 Ok(())
1323 }
1324
1325 #[test]
1326 fn sync_client_wait_for_events_does_receive_epitaphs() -> Result<(), Error> {
1327 let (client_end, server_end) = zx::Channel::create();
1328 let client = sync::Client::new(client_end, "test_protocol");
1329 server_end
1331 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1332 .expect("failed to write epitaph");
1333 assert_matches!(
1334 client.wait_for_event(zx::MonotonicInstant::after(
1335 zx::MonotonicDuration::from_seconds(5)
1336 )),
1337 Err(crate::Error::ClientChannelClosed {
1338 status: zx_status::Status::UNAVAILABLE,
1339 protocol_name: "test_protocol",
1340 epitaph: Some(epitaph),
1341 }) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1342 );
1343 Ok(())
1344 }
1345
1346 #[test]
1347 fn sync_client_into_channel() -> Result<(), Error> {
1348 let (client_end, _server_end) = zx::Channel::create();
1349 let client_end_raw = client_end.raw_handle();
1350 let client = sync::Client::new(client_end, "test_protocol");
1351 assert_eq!(client.into_channel().raw_handle(), client_end_raw);
1352 Ok(())
1353 }
1354
1355 #[fasync::run_singlethreaded(test)]
1356 async fn client() {
1357 let (client_end, server_end) = zx::Channel::create();
1358 let client_end = AsyncChannel::from_channel(client_end);
1359 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1360
1361 let server = AsyncChannel::from_channel(server_end);
1362 let receiver = async move {
1363 let mut buffer = MessageBufEtc::new();
1364 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
1365 assert_eq!(buffer.bytes(), expected_sent_bytes_oneway());
1366 };
1367
1368 let receiver = receiver
1370 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1371 panic!("did not receive message in time!")
1372 });
1373
1374 client
1375 .send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty())
1376 .expect("failed to send msg");
1377
1378 receiver.await;
1379 }
1380
1381 #[fasync::run_singlethreaded(test)]
1382 async fn client_with_response() {
1383 let (client_end, server_end) = zx::Channel::create();
1384 let client_end = AsyncChannel::from_channel(client_end);
1385 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1386
1387 let server = AsyncChannel::from_channel(server_end);
1388 let mut buffer = MessageBufEtc::new();
1389 let receiver = async move {
1390 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
1391 let two_way_tx_id = 1u8;
1392 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
1393
1394 let (bytes, handles) = (&mut vec![], &mut vec![]);
1395 let header =
1396 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
1397 encode_transaction(header, bytes, handles);
1398 server.write_etc(bytes, handles).expect("Server channel write failed");
1399 };
1400
1401 let receiver = receiver
1403 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1404 panic!("did not receiver message in time!")
1405 });
1406
1407 let sender = client
1408 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
1409 .map_ok(|x| assert_eq!(x, SEND_DATA))
1410 .unwrap_or_else(|e| panic!("fidl error: {e:?}"));
1411
1412 let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1414 panic!("did not receive response in time!")
1415 });
1416
1417 let ((), ()) = join!(receiver, sender);
1418 }
1419
1420 #[fasync::run_singlethreaded(test)]
1421 async fn client_with_response_receives_epitaph() {
1422 let (client_end, server_end) = zx::Channel::create();
1423 let client_end = AsyncChannel::from_channel(client_end);
1424 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1425
1426 let server = AsyncChannel::from_channel(server_end);
1427 let mut buffer = zx::MessageBufEtc::new();
1428 let receiver = async move {
1429 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
1430 server
1431 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1432 .expect("failed to write epitaph");
1433 };
1434 let receiver = receiver
1436 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1437 panic!("did not receive message in time!")
1438 });
1439
1440 let sender = async move {
1441 const ORDINAL: u64 = 42 << 32;
1442 let result = client.send_query::<u8, u8, ORDINAL>(55, DynamicFlags::empty()).await;
1443 assert_matches!(
1444 result,
1445 Err(crate::Error::ClientChannelClosed {
1446 status: zx_status::Status::UNAVAILABLE,
1447 protocol_name: "test_protocol",
1448 epitaph: Some(epitaph),
1449 }) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1450 );
1451 };
1452 let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1454 panic!("did not receive response in time!")
1455 });
1456
1457 let ((), ()) = join!(receiver, sender);
1458 }
1459
1460 #[fasync::run_singlethreaded(test)]
1461 #[should_panic]
1462 async fn event_cant_be_taken_twice() {
1463 let (client_end, _) = zx::Channel::create();
1464 let client_end = AsyncChannel::from_channel(client_end);
1465 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1466 let _foo = client.take_event_receiver();
1467 client.take_event_receiver();
1468 }
1469
1470 #[fasync::run_singlethreaded(test)]
1471 async fn event_can_be_taken_after_drop() {
1472 let (client_end, _) = zx::Channel::create();
1473 let client_end = AsyncChannel::from_channel(client_end);
1474 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1475 let foo = client.take_event_receiver();
1476 drop(foo);
1477 client.take_event_receiver();
1478 }
1479
1480 #[fasync::run_singlethreaded(test)]
1481 async fn receiver_termination_test() {
1482 let (client_end, _) = zx::Channel::create();
1483 let client_end = AsyncChannel::from_channel(client_end);
1484 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1485 let mut foo = client.take_event_receiver();
1486 assert!(!foo.is_terminated(), "receiver should not report terminated before being polled");
1487 let _ = foo.next().await;
1488 assert!(
1489 foo.is_terminated(),
1490 "receiver should report terminated after seeing channel is closed"
1491 );
1492 }
1493
1494 #[fasync::run_singlethreaded(test)]
1495 #[should_panic(expected = "polled EventReceiver after `None`")]
1496 async fn receiver_cant_be_polled_more_than_once_on_closed_stream() {
1497 let (client_end, _) = zx::Channel::create();
1498 let client_end = AsyncChannel::from_channel(client_end);
1499 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1500 let foo = client.take_event_receiver();
1501 drop(foo);
1502 let mut bar = client.take_event_receiver();
1503 assert!(bar.next().await.is_none(), "read on closed channel should return none");
1504 let _ = bar.next().await;
1506 }
1507
1508 #[fasync::run_singlethreaded(test)]
1509 #[should_panic(expected = "polled EventReceiver after `None`")]
1510 async fn receiver_panics_when_polled_after_receiving_epitaph_then_none() {
1511 let (client_end, server_end) = zx::Channel::create();
1512 let client_end = AsyncChannel::from_channel(client_end);
1513 let server_end = AsyncChannel::from_channel(server_end);
1514 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1515 let mut stream = client.take_event_receiver();
1516
1517 epitaph::write_epitaph_impl(&server_end, zx_status::Status::UNAVAILABLE)
1518 .expect("wrote epitaph");
1519 drop(server_end);
1520
1521 assert_matches!(
1522 stream.next().await,
1523 Some(Err(crate::Error::ClientChannelClosed {
1524 status: zx_status::Status::UNAVAILABLE,
1525 protocol_name: "test_protocol",
1526 epitaph: Some(epitaph),
1527 })) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1528 );
1529 assert_matches!(stream.next().await, None);
1530 let _ = stream.next().await;
1532 }
1533
1534 #[fasync::run_singlethreaded(test)]
1535 async fn event_can_be_taken() {
1536 let (client_end, _) = zx::Channel::create();
1537 let client_end = AsyncChannel::from_channel(client_end);
1538 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1539 client.take_event_receiver();
1540 }
1541
1542 #[fasync::run_singlethreaded(test)]
1543 async fn event_received() {
1544 let (client_end, server_end) = zx::Channel::create();
1545 let client_end = AsyncChannel::from_channel(client_end);
1546 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1547
1548 let server = AsyncChannel::from_channel(server_end);
1550 let (bytes, handles) = (&mut vec![], &mut vec![]);
1551 const ORDINAL: u64 = 5;
1552 let header = TransactionHeader::new(0, ORDINAL, DynamicFlags::empty());
1553 encode_transaction(header, bytes, handles);
1554 server.write_etc(bytes, handles).expect("Server channel write failed");
1555 drop(server);
1556
1557 let recv = client
1558 .take_event_receiver()
1559 .into_future()
1560 .then(|(x, stream)| {
1561 let x = x.expect("should contain one element");
1562 let x = x.expect("fidl error");
1563 let x: i32 =
1564 decode_transaction_body::<i32, DefaultFuchsiaResourceDialect, ORDINAL>(x)
1565 .expect("failed to decode event");
1566 assert_eq!(x, 55);
1567 stream.into_future()
1568 })
1569 .map(|(x, _stream)| assert!(x.is_none(), "should have emptied"));
1570
1571 let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1573 panic!("did not receive event in time!")
1574 });
1575
1576 recv.await;
1577 }
1578
1579 #[fasync::run_singlethreaded(test)]
1583 async fn receiver_can_be_taken_after_end_of_stream() {
1584 let (client_end, server_end) = zx::Channel::create();
1585 let client_end = AsyncChannel::from_channel(client_end);
1586 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1587
1588 let server = AsyncChannel::from_channel(server_end);
1590 let (bytes, handles) = (&mut vec![], &mut vec![]);
1591 const ORDINAL: u64 = 5;
1592 let header = TransactionHeader::new(0, ORDINAL, DynamicFlags::empty());
1593 encode_transaction(header, bytes, handles);
1594 server.write_etc(bytes, handles).expect("Server channel write failed");
1595 drop(server);
1596
1597 {
1601 let recv = client
1602 .take_event_receiver()
1603 .into_future()
1604 .then(|(x, stream)| {
1605 let x = x.expect("should contain one element");
1606 let x = x.expect("fidl error");
1607 let x: i32 =
1608 decode_transaction_body::<i32, DefaultFuchsiaResourceDialect, ORDINAL>(x)
1609 .expect("failed to decode event");
1610 assert_eq!(x, 55);
1611 stream.into_future()
1612 })
1613 .map(|(x, _stream)| assert!(x.is_none(), "should have emptied"));
1614
1615 let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1617 panic!("did not receive event in time!")
1618 });
1619
1620 recv.await;
1621 }
1622
1623 let mut c = client.take_event_receiver();
1626 assert!(
1627 c.next().await.is_none(),
1628 "receiver on closed channel should return none on first call"
1629 );
1630 }
1631
1632 #[fasync::run_singlethreaded(test)]
1633 async fn event_incompatible_format() {
1634 let (client_end, server_end) = zx::Channel::create();
1635 let client_end = AsyncChannel::from_channel(client_end);
1636 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1637
1638 let server = AsyncChannel::from_channel(server_end);
1640 let (bytes, handles) = (&mut vec![], &mut vec![]);
1641 let header = TransactionHeader::new_full(
1642 0,
1643 5,
1644 crate::encoding::Context {
1645 wire_format_version: crate::encoding::WireFormatVersion::V2,
1646 },
1647 DynamicFlags::empty(),
1648 0,
1649 );
1650 encode_transaction(header, bytes, handles);
1651 server.write_etc(bytes, handles).expect("Server channel write failed");
1652 drop(server);
1653
1654 let mut event_receiver = client.take_event_receiver();
1655 let recv = event_receiver.next().map(|event| {
1656 assert_matches!(event, Some(Err(crate::Error::IncompatibleMagicNumber(0))))
1657 });
1658
1659 let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
1661 panic!("did not receive event in time!")
1662 });
1663
1664 recv.await;
1665 }
1666
1667 #[test]
1668 fn client_always_wakes_pending_futures() {
1669 let mut executor = fasync::TestExecutor::new();
1670
1671 let (client_end, server_end) = zx::Channel::create();
1672 let client_end = AsyncChannel::from_channel(client_end);
1673 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1674
1675 let mut event_receiver = client.take_event_receiver();
1676
1677 let (response_waker, response_waker_count) = new_count_waker();
1679 let response_cx = &mut Context::from_waker(&response_waker);
1680 let mut response_txid = Txid(0);
1681 let mut response_future = client
1682 .send_raw_query(|tx_id, bytes, handles| {
1683 response_txid = tx_id;
1684 let header = TransactionHeader::new(
1685 response_txid.as_raw_id(),
1686 SEND_ORDINAL,
1687 DynamicFlags::empty(),
1688 );
1689 encode_transaction(header, bytes, handles);
1690 Ok(())
1691 })
1692 .expect("Couldn't send query");
1693 assert!(response_future.poll_unpin(response_cx).is_pending());
1694
1695 let (event_waker, event_waker_count) = new_count_waker();
1697 let event_cx = &mut Context::from_waker(&event_waker);
1698 assert!(event_receiver.poll_next_unpin(event_cx).is_pending());
1699
1700 assert_eq!(response_waker_count.get(), 0);
1702 assert_eq!(event_waker_count.get(), 0);
1703
1704 send_transaction(TransactionHeader::new(0, 5, DynamicFlags::empty()), &server_end);
1706
1707 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1709
1710 assert_eq!(response_waker_count.get(), 0);
1712 assert_eq!(event_waker_count.get(), 1);
1713
1714 assert!(event_receiver.poll_next_unpin(event_cx).is_ready());
1716
1717 send_transaction(
1719 TransactionHeader::new(response_txid.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty()),
1720 &server_end,
1721 );
1722
1723 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1725
1726 assert_eq!(response_waker_count.get(), 1);
1728 }
1729
1730 #[test]
1731 fn client_always_wakes_pending_futures_on_epitaph() {
1732 let mut executor = fasync::TestExecutor::new();
1733
1734 let (client_end, server_end) = zx::Channel::create();
1735 let client_end = AsyncChannel::from_channel(client_end);
1736 let server_end = AsyncChannel::from_channel(server_end);
1737 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1738
1739 let mut event_receiver = client.take_event_receiver();
1740
1741 let (response1_waker, response1_waker_count) = new_count_waker();
1743 let response1_cx = &mut Context::from_waker(&response1_waker);
1744 let mut response1_future = client
1745 .send_raw_query(|tx_id, bytes, handles| {
1746 let header =
1747 TransactionHeader::new(tx_id.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty());
1748 encode_transaction(header, bytes, handles);
1749 Ok(())
1750 })
1751 .expect("Couldn't send query");
1752 assert!(response1_future.poll_unpin(response1_cx).is_pending());
1753
1754 let (event_waker, event_waker_count) = new_count_waker();
1756 let event_cx = &mut Context::from_waker(&event_waker);
1757 assert!(event_receiver.poll_next_unpin(event_cx).is_pending());
1758
1759 let (response2_waker, response2_waker_count) = new_count_waker();
1761 let response2_cx = &mut Context::from_waker(&response2_waker);
1762 let mut response2_future = client
1763 .send_raw_query(|tx_id, bytes, handles| {
1764 let header =
1765 TransactionHeader::new(tx_id.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty());
1766 encode_transaction(header, bytes, handles);
1767 Ok(())
1768 })
1769 .expect("Couldn't send query");
1770 assert!(response2_future.poll_unpin(response2_cx).is_pending());
1771
1772 let wakers = vec![response1_waker_count, response2_waker_count, event_waker_count];
1773
1774 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1776
1777 assert_eq!(0, wakers.iter().fold(0, |acc, x| acc + x.get()));
1779
1780 epitaph::write_epitaph_impl(&server_end, zx_status::Status::UNAVAILABLE)
1782 .expect("wrote epitaph");
1783
1784 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1786
1787 for wake_count in &wakers {
1790 assert_eq!(wake_count.get(), 1);
1791 }
1792
1793 assert_matches!(
1795 response1_future.poll_unpin(response1_cx),
1796 Poll::Ready(Err(crate::Error::ClientChannelClosed {
1797 status: zx_status::Status::UNAVAILABLE,
1798 protocol_name: "test_protocol",
1799 epitaph: Some(epitaph),
1800 })) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1801 );
1802
1803 let _ = executor.run_until_stalled(&mut future::pending::<()>());
1805
1806 assert_matches!(
1808 response2_future.poll_unpin(response2_cx),
1809 Poll::Ready(Err(crate::Error::ClientChannelClosed {
1810 status: zx_status::Status::UNAVAILABLE,
1811 protocol_name: "test_protocol",
1812 epitaph: Some(epitaph),
1813 })) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
1814 );
1815
1816 assert!(event_receiver.poll_next_unpin(event_cx).is_ready());
1818 }
1819
1820 #[fasync::run_singlethreaded(test)]
1821 async fn client_allows_take_event_stream_even_if_event_delivered() {
1822 let (client_end, server_end) = zx::Channel::create();
1823 let client_end = AsyncChannel::from_channel(client_end);
1824 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1825
1826 send_transaction(TransactionHeader::new(0, 5, DynamicFlags::empty()), &server_end);
1828
1829 let (response_waker, _response_waker_count) = new_count_waker();
1831 let response_cx = &mut Context::from_waker(&response_waker);
1832 let mut response_future =
1833 client.send_query::<u8, u8, SEND_ORDINAL>(55, DynamicFlags::empty());
1834 assert!(response_future.poll_unpin(response_cx).is_pending());
1835
1836 let mut _event_receiver = client.take_event_receiver();
1838 }
1839
1840 #[fasync::run_singlethreaded(test)]
1841 async fn client_reports_epitaph_from_all_read_actions() {
1842 #[derive(Debug, PartialEq)]
1843 enum Action {
1844 SendMsg, SendQuery, WaitQuery, RecvEvent, }
1849 impl Action {
1850 fn should_report_epitaph(&self) -> bool {
1851 match self {
1852 Action::SendMsg | Action::SendQuery => false,
1853 Action::WaitQuery | Action::RecvEvent => true,
1854 }
1855 }
1856 }
1857 use Action::*;
1858 for two_actions in &[
1861 [SendMsg, SendMsg],
1862 [SendMsg, SendQuery],
1863 [SendMsg, WaitQuery],
1864 [SendMsg, RecvEvent],
1865 [SendQuery, SendMsg],
1866 [SendQuery, SendQuery],
1867 [SendQuery, WaitQuery],
1868 [SendQuery, RecvEvent],
1869 [WaitQuery, SendMsg],
1870 [WaitQuery, SendQuery],
1871 [WaitQuery, WaitQuery],
1872 [WaitQuery, RecvEvent],
1873 [RecvEvent, SendMsg],
1874 [RecvEvent, SendQuery],
1875 [RecvEvent, WaitQuery],
1876 ] {
1879 let (client_end, server_end) = zx::Channel::create();
1880 let client_end = AsyncChannel::from_channel(client_end);
1881 let client = Client::new(client_end, "test_protocol");
1882
1883 let server_end = AsyncChannel::from_channel(server_end);
1885 server_end
1886 .close_with_epitaph(zx_status::Status::UNAVAILABLE)
1887 .expect("failed to write epitaph");
1888
1889 let mut event_receiver = client.take_event_receiver();
1890
1891 for (index, action) in two_actions.iter().enumerate() {
1893 let err = match action {
1894 SendMsg => {
1895 client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()).err()
1896 }
1897 WaitQuery => client
1898 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
1899 .await
1900 .err(),
1901 SendQuery => client
1902 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
1903 .check()
1904 .err(),
1905 RecvEvent => event_receiver.next().await.unwrap().err(),
1906 };
1907 let details = format!("index: {index:?}, two_actions: {two_actions:?}");
1908 match err {
1909 None => assert!(
1910 !action.should_report_epitaph(),
1911 "expected epitaph, but succeeded.\n{details}"
1912 ),
1913 Some(crate::Error::ClientChannelClosed {
1914 status: zx_status::Status::UNAVAILABLE,
1915 protocol_name: "test_protocol",
1916 epitaph: Some(epitaph),
1917 }) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32 => assert!(
1918 action.should_report_epitaph(),
1919 "got epitaph unexpectedly.\n{details}",
1920 ),
1921 Some(err) => panic!("unexpected error: {err:#?}.\n{details}"),
1922 }
1923 }
1924
1925 if two_actions.contains(&RecvEvent) {
1927 assert_matches!(event_receiver.next().await, None);
1928 }
1929 }
1930 }
1931
1932 #[test]
1933 fn client_query_result_check() {
1934 let mut executor = fasync::TestExecutor::new();
1935 let (client_end, server_end) = zx::Channel::create();
1936 let client_end = AsyncChannel::from_channel(client_end);
1937 let client = Client::new(client_end, "test_protocol");
1938
1939 let server = AsyncChannel::from_channel(server_end);
1940
1941 let active_fut =
1943 client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
1944
1945 let mut checked_fut = active_fut.check().expect("failed to check future");
1946
1947 let mut buffer = MessageBufEtc::new();
1949 executor.run_singlethreaded(server.recv_etc_msg(&mut buffer)).expect("failed to recv msg");
1950 let two_way_tx_id = 1u8;
1951 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
1952
1953 let (bytes, handles) = (&mut vec![], &mut vec![]);
1954 let header =
1955 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
1956 encode_transaction(header, bytes, handles);
1957 server.write_etc(bytes, handles).expect("Server channel write failed");
1958
1959 executor
1960 .run_singlethreaded(&mut checked_fut)
1961 .map(|x| assert_eq!(x, SEND_DATA))
1962 .unwrap_or_else(|e| panic!("fidl error: {e:?}"));
1963
1964 drop(server);
1966
1967 let query_fut = client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
1968
1969 let mut checked_fut = query_fut.check().expect("failed to check future");
1971 assert_matches!(
1973 executor.run_singlethreaded(&mut checked_fut),
1974 Err(crate::Error::ClientChannelClosed {
1975 status: zx_status::Status::PEER_CLOSED,
1976 protocol_name: "test_protocol",
1977 epitaph: None,
1978 })
1979 );
1980 }
1981
1982 #[fasync::run_singlethreaded(test)]
1983 async fn client_into_channel() {
1984 let (client_end, _server_end) = zx::Channel::create();
1987 let client_end = AsyncChannel::from_channel(client_end);
1988 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
1989
1990 assert!(client.into_channel().is_ok());
1991 }
1992
1993 #[fasync::run_singlethreaded(test)]
1994 async fn client_into_channel_outstanding_messages() {
1995 let (client_end, _server_end) = zx::Channel::create();
1998 let client_end = AsyncChannel::from_channel(client_end);
1999 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2000
2001 {
2002 let _sender =
2005 client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
2006 }
2007
2008 assert!(client.into_channel().is_err());
2009 }
2010
2011 #[fasync::run_singlethreaded(test)]
2012 async fn client_into_channel_active_clone() {
2013 let (client_end, _server_end) = zx::Channel::create();
2016 let client_end = AsyncChannel::from_channel(client_end);
2017 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2018
2019 let _cloned_client = client.clone();
2020
2021 assert!(client.into_channel().is_err());
2022 }
2023
2024 #[fasync::run_singlethreaded(test)]
2025 async fn client_into_channel_outstanding_messages_get_received() {
2026 let (client_end, server_end) = zx::Channel::create();
2027 let client_end = AsyncChannel::from_channel(client_end);
2028 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2029
2030 let server = AsyncChannel::from_channel(server_end);
2031 let mut buffer = MessageBufEtc::new();
2032 let receiver = async move {
2033 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
2034 let two_way_tx_id = 1u8;
2035 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2036
2037 let (bytes, handles) = (&mut vec![], &mut vec![]);
2038 let header =
2039 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2040 encode_transaction(header, bytes, handles);
2041 server.write_etc(bytes, handles).expect("Server channel write failed");
2042 };
2043
2044 let receiver = receiver
2046 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
2047 panic!("did not receiver message in time!")
2048 });
2049
2050 let sender = client
2051 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
2052 .map_ok(|x| assert_eq!(x, SEND_DATA))
2053 .unwrap_or_else(|e| panic!("fidl error: {e:?}"));
2054
2055 let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
2057 panic!("did not receive response in time!")
2058 });
2059
2060 let ((), ()) = join!(receiver, sender);
2061
2062 assert!(client.into_channel().is_ok());
2063 }
2064
2065 #[fasync::run_singlethreaded(test)]
2066 async fn client_decode_errors_are_broadcast() {
2067 let (client_end, server_end) = zx::Channel::create();
2068 let client_end = AsyncChannel::from_channel(client_end);
2069 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2070
2071 let server = AsyncChannel::from_channel(server_end);
2072
2073 let _server = fasync::Task::spawn(async move {
2074 let mut buffer = MessageBufEtc::new();
2075 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
2076 let two_way_tx_id = 1u8;
2077 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2078
2079 let (bytes, handles) = (&mut vec![], &mut vec![]);
2080 let header =
2081 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2082 encode_transaction(header, bytes, handles);
2083 bytes[4] = 0;
2085 server.write_etc(bytes, handles).expect("Server channel write failed");
2086
2087 pending::<()>().await;
2089 });
2090
2091 let futures = FuturesUnordered::new();
2092
2093 for _ in 0..4 {
2094 futures.push(async {
2095 assert_matches!(
2096 client
2097 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
2098 .map_ok(|x| assert_eq!(x, SEND_DATA))
2099 .await,
2100 Err(crate::Error::UnsupportedWireFormatVersion)
2101 );
2102 });
2103 }
2104
2105 futures
2106 .collect::<Vec<_>>()
2107 .on_timeout(zx::MonotonicDuration::from_seconds(1).after_now(), || panic!("timed out!"))
2108 .await;
2109 }
2110
2111 #[fasync::run_singlethreaded(test)]
2112 async fn into_channel_from_waker_succeeds() {
2113 let (client_end, server_end) = zx::Channel::create();
2114 let client_end = AsyncChannel::from_channel(client_end);
2115 let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
2116
2117 let server = AsyncChannel::from_channel(server_end);
2118 let mut buffer = MessageBufEtc::new();
2119 let receiver = async move {
2120 server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
2121 let two_way_tx_id = 1u8;
2122 assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
2123
2124 let (bytes, handles) = (&mut vec![], &mut vec![]);
2125 let header =
2126 TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
2127 encode_transaction(header, bytes, handles);
2128 server.write_etc(bytes, handles).expect("Server channel write failed");
2129 };
2130
2131 struct Sender {
2132 future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>,
2133 }
2134
2135 let (done_tx, done_rx) = oneshot::channel();
2136
2137 let sender = Arc::new(Sender {
2138 future: Mutex::new(Box::pin(async move {
2139 client
2140 .send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
2141 .map_ok(|x| assert_eq!(x, SEND_DATA))
2142 .unwrap_or_else(|e| panic!("fidl error: {e:?}"))
2143 .await;
2144
2145 assert!(client.into_channel().is_ok());
2146
2147 let _ = done_tx.send(());
2148 })),
2149 });
2150
2151 impl ArcWake for Sender {
2156 fn wake_by_ref(arc_self: &Arc<Self>) {
2157 assert!(
2158 arc_self
2159 .future
2160 .lock()
2161 .poll_unpin(&mut Context::from_waker(&noop_waker()))
2162 .is_ready()
2163 );
2164 }
2165 }
2166
2167 let waker = waker(sender.clone());
2168
2169 assert!(sender.future.lock().poll_unpin(&mut Context::from_waker(&waker)).is_pending());
2170
2171 receiver.await;
2172
2173 done_rx.await.unwrap();
2174 }
2175}