1mod service_map;
22
23use self::service_map::ServiceMap;
24use crate::future_help::{log_errors, Observer};
25use crate::handle_info::{handle_info, HandleKey, HandleType};
26use crate::labels::{NodeId, TransferKey};
27use crate::peer::{FramedStreamReader, FramedStreamWriter, Peer, PeerConnRef};
28use crate::proxy::{
29 IntoProxied, ProxyTransferInitiationReceiver, RemoveFromProxyTable, StreamRefSender,
30};
31use anyhow::{bail, format_err, Context as _, Error};
32use async_utils::mutex_ticket::MutexTicket;
33use fidl::{AsHandleRef, Channel, EventPair, Handle, HandleBased, Socket};
34use fidl_fuchsia_overnet_protocol::{
35 ChannelHandle, EventPairHandle, EventPairRights, SocketHandle, SocketType, StreamId, StreamRef,
36 ZirconHandle,
37};
38use fuchsia_async::Task;
39use futures::channel::oneshot;
40use futures::future::poll_fn;
41use futures::lock::Mutex;
42use futures::prelude::*;
43use futures::ready;
44use rand::Rng;
45use std::collections::{BTreeMap, HashMap};
46use std::pin::pin;
47use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
48use std::sync::{Arc, Weak};
49use std::task::{Context, Poll, Waker};
50use std::time::Duration;
51
52pub use self::service_map::ListablePeer;
53
54#[derive(Debug)]
55enum PendingTransfer {
56 Complete(FoundTransfer),
57 Waiting(Waker),
58}
59
60type PendingTransferMap = BTreeMap<TransferKey, PendingTransfer>;
61
62#[derive(Debug)]
63pub(crate) enum FoundTransfer {
64 Fused(Handle),
65 Remote(FramedStreamWriter, FramedStreamReader),
66}
67
68#[derive(Debug)]
69pub(crate) enum OpenedTransfer {
70 Fused,
71 Remote(FramedStreamWriter, FramedStreamReader, Handle),
72}
73
74#[derive(Debug)]
75#[allow(dead_code)]
76enum CircuitState {
77 Waiters(Vec<oneshot::Sender<()>>),
78 Peer(Arc<Peer>),
79}
80
81impl CircuitState {
82 fn peer(&self) -> Option<Arc<Peer>> {
83 if let CircuitState::Peer(peer) = self {
84 Some(Arc::clone(peer))
85 } else {
86 None
87 }
88 }
89}
90
91#[derive(Debug)]
92struct PeerMaps {
93 circuit_clients: BTreeMap<NodeId, CircuitState>,
94}
95
96#[derive(Debug)]
98pub struct ListPeersContext(Mutex<Option<Observer<Vec<ListablePeer>>>>);
99
100static LIST_PEERS_CALL: AtomicU64 = AtomicU64::new(0);
101
102impl ListPeersContext {
103 pub async fn list_peers(&self) -> Result<Vec<ListablePeer>, Error> {
105 let call_id = LIST_PEERS_CALL.fetch_add(1, Ordering::SeqCst);
106 log::trace!(list_peers_call = call_id; "get observer");
107 let mut obs = self
108 .0
109 .lock()
110 .await
111 .take()
112 .ok_or_else(|| anyhow::format_err!("Already listing peers"))?;
113 log::trace!(list_peers_call = call_id; "wait for value");
114 let r = obs.next().await;
115 log::trace!(list_peers_call = call_id; "replace observer");
116 *self.0.lock().await = Some(obs);
117 log::trace!(list_peers_call = call_id; "return");
118 Ok(r.unwrap_or_else(Vec::new))
119 }
120}
121
122pub enum AscenddClientRouting {
124 Enabled,
126 Disabled,
128}
129
130pub struct Router {
135 node_id: NodeId,
137 peers: Mutex<PeerMaps>,
139 service_map: ServiceMap,
140 proxied_streams: Mutex<HashMap<HandleKey, ProxiedHandle>>,
141 pending_transfers: Mutex<PendingTransferMap>,
142 task: Mutex<Option<Task<()>>>,
143 ascendd_client_routing: AtomicBool,
145 circuit_node: circuit::ConnectionNode,
146}
147
148struct ProxiedHandle {
149 remove_sender: futures::channel::oneshot::Sender<RemoveFromProxyTable>,
150 original_paired: HandleKey,
151 proxy_task: Task<()>,
152}
153
154fn generate_node_id() -> NodeId {
156 rand::thread_rng().gen::<u64>().into()
157}
158
159fn sorted<T: std::cmp::Ord>(mut v: Vec<T>) -> Vec<T> {
160 v.sort();
161 v
162}
163
164impl std::fmt::Debug for Router {
165 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166 write!(f, "Router({:?})", self.node_id)
167 }
168}
169
170const OVERNET_CIRCUIT_PROTOCOL: &'static str = "Overnet:0";
173
174impl Router {
175 pub fn new(router_interval: Option<Duration>) -> Result<Arc<Self>, Error> {
178 Router::with_node_id(generate_node_id(), router_interval)
179 }
180
181 pub fn with_node_id(
183 node_id: NodeId,
184 router_interval: Option<Duration>,
185 ) -> Result<Arc<Self>, Error> {
186 let service_map = ServiceMap::new(node_id);
187 let (new_peer_sender, new_peer_receiver) = futures::channel::mpsc::channel(1);
188 let (circuit_node, circuit_connections) = if let Some(interval) = router_interval {
189 let (a, b) = circuit::ConnectionNode::new_with_router(
190 &node_id.circuit_string(),
191 OVERNET_CIRCUIT_PROTOCOL,
192 interval,
193 new_peer_sender,
194 )?;
195 (a, b.boxed())
196 } else {
197 let (a, b) = circuit::ConnectionNode::new(
198 &node_id.circuit_string(),
199 OVERNET_CIRCUIT_PROTOCOL,
200 new_peer_sender,
201 )?;
202 (a, b.boxed())
203 };
204 let router = Arc::new(Router {
205 node_id,
206 service_map,
207 peers: Mutex::new(PeerMaps { circuit_clients: BTreeMap::new() }),
208 proxied_streams: Mutex::new(HashMap::new()),
209 pending_transfers: Mutex::new(PendingTransferMap::new()),
210 task: Mutex::new(None),
211 ascendd_client_routing: AtomicBool::new(true),
213 circuit_node,
214 });
215
216 let weak_router = Arc::downgrade(&router);
217 *router.task.lock().now_or_never().unwrap() = Some(Task::spawn(log_errors(
218 run_circuits(weak_router, circuit_connections, new_peer_receiver),
219 format!("router {:?} support loop failed", node_id),
220 )));
221
222 Ok(router)
223 }
224
225 pub fn circuit_node(&self) -> &circuit::Node {
228 self.circuit_node.node()
229 }
230
231 pub fn node_id(&self) -> NodeId {
233 self.node_id
234 }
235
236 pub fn client_routing(&self) -> AscenddClientRouting {
238 if self.ascendd_client_routing.load(std::sync::atomic::Ordering::SeqCst) {
239 AscenddClientRouting::Enabled
240 } else {
241 AscenddClientRouting::Disabled
242 }
243 }
244
245 pub fn set_client_routing(&self, client_routing: AscenddClientRouting) {
247 let client_routing = match client_routing {
248 AscenddClientRouting::Enabled => true,
249 AscenddClientRouting::Disabled => false,
250 };
251 self.ascendd_client_routing.store(client_routing, std::sync::atomic::Ordering::SeqCst);
252 }
253
254 pub(crate) fn service_map(&self) -> &ServiceMap {
255 &self.service_map
256 }
257
258 pub async fn connect_to_service(
260 self: &Arc<Self>,
261 node_id: NodeId,
262 service_name: &str,
263 chan: Channel,
264 ) -> Result<(), Error> {
265 let is_local = node_id == self.node_id;
266 log::trace!(
267 service_name:%,
268 node_id = node_id.0,
269 local = is_local;
270 "Request connect_to_service",
271 );
272 if is_local {
273 self.service_map().connect(service_name, chan).await
274 } else {
275 self.client_peer(node_id)
276 .await
277 .with_context(|| {
278 format_err!(
279 "Fetching client peer for new stream to {:?} for service {:?}",
280 node_id,
281 service_name,
282 )
283 })?
284 .new_stream(service_name, chan, self)
285 .await
286 }
287 }
288
289 pub async fn register_service(
292 &self,
293 service_name: String,
294 provider: impl Fn(fidl::Channel) -> Result<(), Error> + Send + 'static,
295 ) -> Result<(), Error> {
296 self.service_map().register_service(service_name, provider).await;
297 Ok(())
298 }
299
300 pub async fn new_list_peers_context(&self) -> ListPeersContext {
302 ListPeersContext(Mutex::new(Some(self.service_map.new_list_peers_observer().await)))
303 }
304
305 async fn client_peer(self: &Arc<Self>, peer_node_id: NodeId) -> Result<Arc<Peer>, Error> {
306 loop {
307 let mut peers = self.peers.lock().await;
308 match peers.circuit_clients.get_mut(&peer_node_id) {
309 Some(CircuitState::Peer(peer)) => break Ok(Arc::clone(&peer)),
310 Some(CircuitState::Waiters(waiters)) => {
311 let (sender, receiver) = oneshot::channel();
312 waiters.push(sender);
313 std::mem::drop(peers);
314 let _ = receiver.await;
315 }
316 None => {
317 peers.circuit_clients.insert(peer_node_id, CircuitState::Waiters(Vec::new()));
318 }
319 }
320 }
321 }
322
323 fn add_proxied(
324 self: &Arc<Self>,
325 proxied_streams: &mut HashMap<HandleKey, ProxiedHandle>,
326 this_handle_key: HandleKey,
327 pair_handle_key: HandleKey,
328 remove_sender: futures::channel::oneshot::Sender<RemoveFromProxyTable>,
329 f: impl 'static + Send + Future<Output = Result<(), Error>>,
330 ) {
331 let router = Arc::downgrade(&self);
332 let proxy_task = Task::spawn(async move {
333 if let Err(e) = f.await {
334 log::trace!(this_handle_key:?, pair_handle_key:?; "Proxy failed: {:?}", e);
335 } else {
336 log::trace!(this_handle_key:?, pair_handle_key:?; "Proxy completed successfully",);
337 }
338 if let Some(router) = Weak::upgrade(&router) {
339 router.remove_proxied(this_handle_key, pair_handle_key).await;
340 }
341 });
342 assert!(proxied_streams
343 .insert(
344 this_handle_key,
345 ProxiedHandle { remove_sender, original_paired: pair_handle_key, proxy_task },
346 )
347 .is_none());
348 }
349
350 async fn remove_proxied(
353 self: &Arc<Self>,
354 this_handle_key: HandleKey,
355 pair_handle_key: HandleKey,
356 ) {
357 let mut proxied_streams = self.proxied_streams.lock().await;
358 log::trace!(
359 node_id = self.node_id.0,
360 this_handle_key:?,
361 pair_handle_key:?,
362 all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
363 "REMOVE_PROXIED",
364 );
365 if let Some(removed) = proxied_streams.remove(&this_handle_key) {
366 assert_eq!(removed.original_paired, pair_handle_key);
367 let _ = removed.remove_sender.send(RemoveFromProxyTable::Dropped);
368 removed.proxy_task.detach();
369 }
370 }
371
372 pub(crate) async fn send_proxied(
375 self: &Arc<Self>,
376 handle: Handle,
377 conn: PeerConnRef<'_>,
378 ) -> Result<ZirconHandle, Error> {
379 let raw_handle = handle.raw_handle(); let info = handle_info(handle.as_handle_ref())
381 .with_context(|| format!("Getting handle information for {}", raw_handle))?;
382 let mut proxied_streams = self.proxied_streams.lock().await;
383 log::trace!(
384 node_id = self.node_id.0,
385 handle:?,
386 info:?,
387 all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
388 "SEND_PROXIED",
389 );
390 if let Some(pair) = proxied_streams.remove(&info.pair_handle_key) {
391 drop(proxied_streams);
395 assert_eq!(info.this_handle_key, pair.original_paired);
396 log::trace!(
397 handle:?,
398 orig_pair:? = pair.original_paired;
399 "Send paired proxied"
400 );
401 let drain_stream = conn.alloc_uni().await?.into();
404 let (stream_ref_sender, stream_ref_receiver) = StreamRefSender::new();
405 pair.remove_sender
406 .send(RemoveFromProxyTable::InitiateTransfer {
407 paired_handle: handle,
408 drain_stream,
409 stream_ref_sender,
410 })
411 .map_err(|_| format_err!("Failed to initiate transfer"))?;
412 let stream_ref = stream_ref_receiver
413 .await
414 .with_context(|| format!("waiting for stream_ref for {:?}", raw_handle))?;
415 pair.proxy_task.detach();
416 match info.handle_type {
417 HandleType::Channel(rights) => {
418 Ok(ZirconHandle::Channel(ChannelHandle { stream_ref, rights }))
419 }
420 HandleType::Socket(socket_type, rights) => {
421 Ok(ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights }))
422 }
423 HandleType::EventPair => Ok(ZirconHandle::EventPair(EventPairHandle {
424 stream_ref,
425 rights: EventPairRights::empty(),
426 })),
427 }
428 } else {
429 log::trace!(handle:?; "Send proxied");
431 let (tx, rx) = futures::channel::oneshot::channel();
432 let rx = ProxyTransferInitiationReceiver::new(rx.map_err(move |_| {
433 format_err!(
434 "cancelled transfer via send_proxied {:?}\n{}",
435 info,
436 111 )
438 }));
439 let (stream_writer, stream_reader) = conn.alloc_bidi().await?;
440 let stream_ref = StreamRef::Creating(StreamId { id: stream_writer.id() });
441 Ok(match info.handle_type {
442 HandleType::Channel(rights) => {
443 self.add_proxied(
444 &mut *proxied_streams,
445 info.this_handle_key,
446 info.pair_handle_key,
447 tx,
448 crate::proxy::spawn_send(
449 Channel::from_handle(handle).into_proxied()?,
450 rx,
451 stream_writer.into(),
452 stream_reader.into(),
453 Arc::downgrade(&self),
454 ),
455 );
456 ZirconHandle::Channel(ChannelHandle { stream_ref, rights })
457 }
458 HandleType::Socket(socket_type, rights) => {
459 self.add_proxied(
460 &mut *proxied_streams,
461 info.this_handle_key,
462 info.pair_handle_key,
463 tx,
464 crate::proxy::spawn_send(
465 Socket::from_handle(handle).into_proxied()?,
466 rx,
467 stream_writer.into(),
468 stream_reader.into(),
469 Arc::downgrade(&self),
470 ),
471 );
472 ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights })
473 }
474 HandleType::EventPair => {
475 self.add_proxied(
476 &mut *proxied_streams,
477 info.this_handle_key,
478 info.pair_handle_key,
479 tx,
480 crate::proxy::spawn_send(
481 EventPair::from_handle(handle).into_proxied()?,
482 rx,
483 stream_writer.into(),
484 stream_reader.into(),
485 Arc::downgrade(&self),
486 ),
487 );
488 ZirconHandle::EventPair(EventPairHandle {
489 stream_ref,
490 rights: EventPairRights::empty(),
491 })
492 }
493 })
494 }
495 }
496
497 pub(crate) async fn recv_proxied(
500 self: &Arc<Self>,
501 handle: ZirconHandle,
502 conn: PeerConnRef<'_>,
503 ) -> Result<Handle, Error> {
504 match handle {
505 ZirconHandle::Channel(ChannelHandle { stream_ref, rights }) => {
506 self.recv_proxied_handle(conn, stream_ref, move || Ok(Channel::create()), rights)
507 .await
508 }
509 ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights }) => {
510 self.recv_proxied_handle(
511 conn,
512 stream_ref,
513 move || {
514 Ok(match socket_type {
515 SocketType::Stream => Socket::create_stream(),
516 SocketType::Datagram => Socket::create_datagram(),
517 })
518 },
519 rights,
520 )
521 .await
522 }
523 ZirconHandle::EventPair(EventPairHandle { stream_ref, rights }) => {
524 self.recv_proxied_handle(conn, stream_ref, move || Ok(EventPair::create()), rights)
525 .await
526 }
527 }
528 }
529
530 async fn recv_proxied_handle<Hdl, CreateType>(
531 self: &Arc<Self>,
532 conn: PeerConnRef<'_>,
533 stream_ref: StreamRef,
534 create_handles: impl FnOnce() -> Result<(CreateType, CreateType), Error> + 'static,
535 rights: CreateType::Rights,
536 ) -> Result<Handle, Error>
537 where
538 Hdl: 'static + for<'a> crate::proxy::ProxyableRW<'a>,
539 CreateType: 'static
540 + fidl::HandleBased
541 + IntoProxied<Proxied = Hdl>
542 + std::fmt::Debug
543 + crate::handle_info::WithRights,
544 {
545 let (tx, rx) = futures::channel::oneshot::channel();
546 let rx = ProxyTransferInitiationReceiver::new(
547 rx.map_err(move |_| format_err!("cancelled transfer via recv_proxied")),
548 );
549 let (h, p) = crate::proxy::spawn_recv(
550 create_handles,
551 rights,
552 rx,
553 stream_ref,
554 conn,
555 Arc::downgrade(&self),
556 )
557 .await?;
558 if let Some(p) = p {
559 let info = handle_info(h.as_handle_ref())?;
560 self.add_proxied(
561 &mut *self.proxied_streams.lock().await,
562 info.pair_handle_key,
563 info.this_handle_key,
564 tx,
565 p,
566 );
567 }
568 Ok(h)
569 }
570
571 pub(crate) async fn post_transfer(
573 &self,
574 transfer_key: TransferKey,
575 other_end: FoundTransfer,
576 ) -> Result<(), Error> {
577 let mut pending_transfers = self.pending_transfers.lock().await;
578 match pending_transfers.insert(transfer_key, PendingTransfer::Complete(other_end)) {
579 Some(PendingTransfer::Complete(_)) => bail!("Duplicate transfer received"),
580 Some(PendingTransfer::Waiting(w)) => w.wake(),
581 None => (),
582 }
583 Ok(())
584 }
585
586 fn poll_find_transfer(
587 &self,
588 ctx: &mut Context<'_>,
589 transfer_key: TransferKey,
590 lock: &mut MutexTicket<'_, PendingTransferMap>,
591 ) -> Poll<Result<FoundTransfer, Error>> {
592 let mut pending_transfers = ready!(lock.poll(ctx));
593 if let Some(PendingTransfer::Complete(other_end)) = pending_transfers.remove(&transfer_key)
594 {
595 Poll::Ready(Ok(other_end))
596 } else {
597 pending_transfers.insert(transfer_key, PendingTransfer::Waiting(ctx.waker().clone()));
598 Poll::Pending
599 }
600 }
601
602 pub(crate) async fn find_transfer(
604 &self,
605 transfer_key: TransferKey,
606 ) -> Result<FoundTransfer, Error> {
607 let mut lock = MutexTicket::new(&self.pending_transfers);
608 poll_fn(|ctx| self.poll_find_transfer(ctx, transfer_key, &mut lock)).await
609 }
610
611 pub(crate) async fn open_transfer(
614 self: &Arc<Router>,
615 target: NodeId,
616 transfer_key: TransferKey,
617 handle: Handle,
618 ) -> Result<OpenedTransfer, Error> {
619 if target == self.node_id {
620 let info = handle_info(handle.as_handle_ref())?;
624 let mut proxied_streams = self.proxied_streams.lock().await;
625 log::trace!(
626 node_id = self.node_id.0,
627 key:? = transfer_key,
628 info:? = info,
629 all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
630 "OPEN_TRANSFER_REMOVE_PROXIED",
631 );
632 if let Some(removed) = proxied_streams.remove(&info.this_handle_key) {
633 assert_eq!(removed.original_paired, info.pair_handle_key);
634 assert!(removed.remove_sender.send(RemoveFromProxyTable::Dropped).is_ok());
635 removed.proxy_task.detach();
636 }
637 if let Some(removed) = proxied_streams.remove(&info.pair_handle_key) {
638 assert_eq!(removed.original_paired, info.this_handle_key);
639 assert!(removed.remove_sender.send(RemoveFromProxyTable::Dropped).is_ok());
640 removed.proxy_task.detach();
641 }
642 self.post_transfer(transfer_key, FoundTransfer::Fused(handle)).await?;
643 Ok(OpenedTransfer::Fused)
644 } else {
645 if let Some((writer, reader)) =
646 self.client_peer(target).await?.send_open_transfer(transfer_key).await
647 {
648 Ok(OpenedTransfer::Remote(writer, reader, handle))
649 } else {
650 bail!("{:?} failed sending open transfer to {:?}", self.node_id, target)
651 }
652 }
653 }
654
655 pub(crate) async fn client_closed(&self, peer_node_id: NodeId) {
656 self.peers.lock().await.circuit_clients.remove(&peer_node_id);
657 }
658}
659
660async fn run_circuits(
662 router: Weak<Router>,
663 connections: impl futures::Stream<Item = circuit::Connection> + Send,
664 peers: impl futures::Stream<Item = String> + Send,
665) -> Result<(), Error> {
666 let new_peer_fut = {
668 let router = router.clone();
669 async move {
670 let mut peers = pin!(peers);
671 while let Some(peer_node_id) = peers.next().await {
672 let router = match router.upgrade() {
673 Some(x) => x,
674 None => {
675 log::warn!("Router disappeared from under circuit runner");
676 break;
677 }
678 };
679
680 let res = async {
681 let peer_node_id_num = NodeId::from_circuit_string(&peer_node_id)
682 .map_err(|_| format_err!("Invalid node id: {:?}", peer_node_id))?;
683 let mut peers = router.peers.lock().await;
684
685 if peers.circuit_clients.get(&peer_node_id_num).and_then(|x| x.peer()).is_some()
686 {
687 log::warn!(peer:? = peer_node_id; "Re-establishing connection");
688 }
689
690 let (reader, writer_remote) = circuit::stream::stream();
691 let (reader_remote, writer) = circuit::stream::stream();
692 let conn = router
693 .circuit_node
694 .connect_to_peer(&peer_node_id, reader_remote, writer_remote)
695 .await?;
696 let peer = Peer::new_circuit_client(
697 conn,
698 writer,
699 reader,
700 router.service_map.new_local_service_observer().await,
701 &router,
702 peer_node_id_num,
703 )?;
704
705 if let Some(CircuitState::Waiters(waiters)) =
706 peers.circuit_clients.insert(peer_node_id_num, CircuitState::Peer(peer))
707 {
708 for waiter in waiters {
709 let _ = waiter.send(());
710 }
711 }
712
713 Result::<_, Error>::Ok(())
714 }
715 .await;
716
717 if let Err(e) = res {
718 log::warn!(peer:? = peer_node_id; "Attempt to connect to peer failed: {:?}", e);
719 }
720 }
721 }
722 };
723
724 let new_conn_fut = async move {
726 let mut connections = pin!(connections);
727 while let Some(conn) = connections.next().await {
728 let peer_name = conn.from().to_owned();
729 let res = async {
730 let router = router.upgrade().ok_or_else(|| format_err!("router gone"))?;
731 Peer::new_circuit_server(conn, &router).await?;
732 Result::<_, Error>::Ok(())
733 }
734 .await;
735
736 if let Err(e) = res {
737 log::warn!(
738 peer:? = peer_name;
739 "Attempt to receive connection from peer failed: {:?}",
740 e
741 );
742 }
743 }
744 };
745
746 futures::future::join(new_peer_fut, new_conn_fut).await;
747 Ok(())
748}
749
750#[cfg(test)]
751mod tests {
752
753 use super::*;
754 use crate::test_util::*;
755 use circuit::multi_stream::multi_stream_node_connection;
756 use circuit::stream::stream;
757 use circuit::Quality;
758
759 #[fuchsia::test]
760 async fn no_op(run: usize) {
761 let mut node_id_gen = NodeIdGenerator::new("router::no_op", run);
762 node_id_gen.new_router().unwrap();
763 let id = node_id_gen.next().unwrap();
764 assert_eq!(Router::with_node_id(id, None).unwrap().node_id, id);
765 }
766
767 async fn register_test_service(
768 serving_router: Arc<Router>,
769 client_router: Arc<Router>,
770 service: &'static str,
771 ) -> futures::channel::oneshot::Receiver<Channel> {
772 use fuchsia_sync::Mutex;
773 let (send, recv) = futures::channel::oneshot::channel();
774 serving_router
775 .service_map()
776 .register_service(service.to_string(), {
777 let sender = Mutex::new(Some(send));
778 move |chan| {
779 println!("{} got request", service);
780 sender.lock().take().unwrap().send(chan).unwrap();
781 println!("{} forwarded channel", service);
782 Ok(())
783 }
784 })
785 .await;
786 let serving_node_id = serving_router.node_id();
787 println!("{} wait for service to appear @ client", service);
788 let lpc = client_router.new_list_peers_context().await;
789 loop {
790 let peers = lpc.list_peers().await.unwrap();
791 println!("{} got peers {:?}", service, peers);
792 if peers
793 .iter()
794 .find(move |peer| {
795 serving_node_id == peer.node_id
796 && peer
797 .services
798 .iter()
799 .find(move |&advertised_service| advertised_service == service)
800 .is_some()
801 })
802 .is_some()
803 {
804 break;
805 }
806 }
807 recv
808 }
809
810 async fn run_two_node<
811 F: 'static + Clone + Sync + Send + Fn(Arc<Router>, Arc<Router>) -> Fut,
812 Fut: 'static + Send + Future<Output = Result<(), Error>>,
813 >(
814 name: &'static str,
815 run: usize,
816 f: F,
817 ) -> Result<(), Error> {
818 let mut node_id_gen = NodeIdGenerator::new(name, run);
819 let router1 = node_id_gen.new_router()?;
820 let router2 = node_id_gen.new_router()?;
821 let (circuit1_reader, circuit1_writer) = stream();
822 let (circuit2_reader, circuit2_writer) = stream();
823 let (out_1, _) = futures::channel::mpsc::unbounded();
824 let (out_2, _) = futures::channel::mpsc::unbounded();
825
826 let conn_1 = multi_stream_node_connection(
827 router1.circuit_node(),
828 circuit1_reader,
829 circuit2_writer,
830 true,
831 Quality::IN_PROCESS,
832 out_1,
833 "router1".to_owned(),
834 );
835 let conn_2 = multi_stream_node_connection(
836 router2.circuit_node(),
837 circuit2_reader,
838 circuit1_writer,
839 false,
840 Quality::IN_PROCESS,
841 out_2,
842 "router2".to_owned(),
843 );
844 let _fwd = Task::spawn(async move {
845 if let Err(e) = futures::future::try_join(conn_1, conn_2).await {
846 log::trace!("forwarding failed: {:?}", e)
847 }
848 });
849 f(router1, router2).await
850 }
851
852 #[fuchsia::test]
853 async fn no_op_env(run: usize) -> Result<(), Error> {
854 run_two_node("router::no_op_env", run, |_router1, _router2| async { Ok(()) }).await
855 }
856
857 #[fuchsia::test]
858 async fn create_stream(run: usize) -> Result<(), Error> {
859 run_two_node("create_stream", run, |router1, router2| async move {
860 let (_, p) = fidl::Channel::create();
861 println!("create_stream: register service");
862 let s = register_test_service(router2.clone(), router1.clone(), "create_stream").await;
863 println!("create_stream: connect to service");
864 router1.connect_to_service(router2.node_id, "create_stream", p).await?;
865 println!("create_stream: wait for connection");
866 let _ = s.await?;
867 Ok(())
868 })
869 .await
870 }
871
872 #[fuchsia::test]
873 async fn send_datagram_immediately(run: usize) -> Result<(), Error> {
874 run_two_node("send_datagram_immediately", run, |router1, router2| async move {
875 let (c, p) = fidl::Channel::create();
876 println!("send_datagram_immediately: register service");
877 let s = register_test_service(
878 router2.clone(),
879 router1.clone(),
880 "send_datagram_immediately",
881 )
882 .await;
883 println!("send_datagram_immediately: connect to service");
884 router1.connect_to_service(router2.node_id, "send_datagram_immediately", p).await?;
885 println!("send_datagram_immediately: wait for connection");
886 let s = s.await?;
887 let c = fidl::AsyncChannel::from_channel(c);
888 let s = fidl::AsyncChannel::from_channel(s);
889 c.write(&[1, 2, 3, 4, 5], &mut Vec::new())?;
890 let mut buf = fidl::MessageBufEtc::new();
891 println!("send_datagram_immediately: wait for datagram");
892 s.recv_etc_msg(&mut buf).await?;
893 assert_eq!(buf.n_handle_infos(), 0);
894 assert_eq!(buf.bytes(), &[1, 2, 3, 4, 5]);
895 Ok(())
896 })
897 .await
898 }
899
900 #[fuchsia::test]
901 async fn ping_pong(run: usize) -> Result<(), Error> {
902 run_two_node("ping_pong", run, |router1, router2| async move {
903 let (c, p) = fidl::Channel::create();
904 println!("ping_pong: register service");
905 let s = register_test_service(router2.clone(), router1.clone(), "ping_pong").await;
906 println!("ping_pong: connect to service");
907 router1.connect_to_service(router2.node_id, "ping_pong", p).await?;
908 println!("ping_pong: wait for connection");
909 let s = s.await?;
910 let c = fidl::AsyncChannel::from_channel(c);
911 let s = fidl::AsyncChannel::from_channel(s);
912 println!("ping_pong: send ping");
913 c.write(&[1, 2, 3, 4, 5], &mut Vec::new())?;
914 println!("ping_pong: receive ping");
915 let mut buf = fidl::MessageBufEtc::new();
916 s.recv_etc_msg(&mut buf).await?;
917 assert_eq!(buf.n_handle_infos(), 0);
918 assert_eq!(buf.bytes(), &[1, 2, 3, 4, 5]);
919 println!("ping_pong: send pong");
920 s.write(&[9, 8, 7, 6, 5, 4, 3, 2, 1], &mut Vec::new())?;
921 println!("ping_pong: receive pong");
922 let mut buf = fidl::MessageBufEtc::new();
923 c.recv_etc_msg(&mut buf).await?;
924 assert_eq!(buf.n_handle_infos(), 0);
925 assert_eq!(buf.bytes(), &[9, 8, 7, 6, 5, 4, 3, 2, 1]);
926 Ok(())
927 })
928 .await
929 }
930
931 fn ensure_pending(f: &mut (impl Send + Unpin + Future<Output = ()>)) {
932 let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
933 for _ in 0..1000 {
935 assert!(f.poll_unpin(&mut ctx).is_pending());
936 }
937 }
938
939 #[fuchsia::test]
940 async fn concurrent_list_peer_calls_will_error(run: usize) -> Result<(), Error> {
941 let mut node_id_gen = NodeIdGenerator::new("concurrent_list_peer_calls_will_error", run);
942 let n = node_id_gen.new_router().unwrap();
943 let lp = n.new_list_peers_context().await;
944 lp.list_peers().await.unwrap();
945 let mut never_completes = async {
946 lp.list_peers().await.unwrap();
947 }
948 .boxed();
949 ensure_pending(&mut never_completes);
950 lp.list_peers().await.expect_err("Concurrent list peers should fail");
951 ensure_pending(&mut never_completes);
952 Ok(())
953 }
954}