1mod service_map;
22
23use self::service_map::ServiceMap;
24use crate::future_help::{Observer, log_errors};
25use crate::handle_info::{HandleKey, HandleType, handle_info};
26use crate::labels::{NodeId, TransferKey};
27use crate::peer::{FramedStreamReader, FramedStreamWriter, Peer, PeerConnRef};
28use crate::proxy::{
29 IntoProxied, ProxyTransferInitiationReceiver, RemoveFromProxyTable, StreamRefSender,
30};
31use anyhow::{Context as _, Error, bail, format_err};
32use async_utils::mutex_ticket::MutexTicket;
33use fidl::{AsHandleRef, Channel, EventPair, HandleBased, NullableHandle, Socket};
34use fidl_fuchsia_overnet_protocol::{
35 ChannelHandle, EventPairHandle, EventPairRights, SocketHandle, SocketType, StreamId, StreamRef,
36 ZirconHandle,
37};
38use fuchsia_async::Task;
39use futures::channel::oneshot;
40use futures::future::poll_fn;
41use futures::lock::Mutex;
42use futures::prelude::*;
43use futures::ready;
44use std::collections::{BTreeMap, HashMap};
45use std::pin::pin;
46use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
47use std::sync::{Arc, Weak};
48use std::task::{Context, Poll, Waker};
49use std::time::Duration;
50
51pub use self::service_map::ListablePeer;
52
53#[derive(Debug)]
54enum PendingTransfer {
55 Complete(FoundTransfer),
56 Waiting(Waker),
57}
58
59type PendingTransferMap = BTreeMap<TransferKey, PendingTransfer>;
60
61#[derive(Debug)]
62pub(crate) enum FoundTransfer {
63 Fused(NullableHandle),
64 Remote(FramedStreamWriter, FramedStreamReader),
65}
66
67#[derive(Debug)]
68pub(crate) enum OpenedTransfer {
69 Fused,
70 Remote(FramedStreamWriter, FramedStreamReader, NullableHandle),
71}
72
73#[derive(Debug)]
74#[allow(dead_code)]
75enum CircuitState {
76 Waiters(Vec<oneshot::Sender<()>>),
77 Peer(Arc<Peer>),
78}
79
80impl CircuitState {
81 fn peer(&self) -> Option<Arc<Peer>> {
82 if let CircuitState::Peer(peer) = self { Some(Arc::clone(peer)) } else { None }
83 }
84}
85
86#[derive(Debug)]
87struct PeerMaps {
88 circuit_clients: BTreeMap<NodeId, CircuitState>,
89}
90
91#[derive(Debug)]
93pub struct ListPeersContext(Mutex<Option<Observer<Vec<ListablePeer>>>>);
94
95static LIST_PEERS_CALL: AtomicU64 = AtomicU64::new(0);
96
97impl ListPeersContext {
98 pub async fn list_peers(&self) -> Result<Vec<ListablePeer>, Error> {
100 let call_id = LIST_PEERS_CALL.fetch_add(1, Ordering::SeqCst);
101 log::trace!(list_peers_call = call_id; "get observer");
102 let mut obs = self
103 .0
104 .lock()
105 .await
106 .take()
107 .ok_or_else(|| anyhow::format_err!("Already listing peers"))?;
108 log::trace!(list_peers_call = call_id; "wait for value");
109 let r = obs.next().await;
110 log::trace!(list_peers_call = call_id; "replace observer");
111 *self.0.lock().await = Some(obs);
112 log::trace!(list_peers_call = call_id; "return");
113 Ok(r.unwrap_or_else(Vec::new))
114 }
115}
116
117pub enum AscenddClientRouting {
119 Enabled,
121 Disabled,
123}
124
125pub struct Router {
130 node_id: NodeId,
132 peers: Mutex<PeerMaps>,
134 service_map: ServiceMap,
135 proxied_streams: Mutex<HashMap<HandleKey, ProxiedHandle>>,
136 pending_transfers: Mutex<PendingTransferMap>,
137 task: Mutex<Option<Task<()>>>,
138 ascendd_client_routing: AtomicBool,
140 circuit_node: circuit::ConnectionNode,
141}
142
143struct ProxiedHandle {
144 remove_sender: futures::channel::oneshot::Sender<RemoveFromProxyTable>,
145 original_paired: HandleKey,
146 proxy_task: Task<()>,
147}
148
149fn generate_node_id() -> NodeId {
151 rand::random::<u64>().into()
152}
153
154fn sorted<T: std::cmp::Ord>(mut v: Vec<T>) -> Vec<T> {
155 v.sort();
156 v
157}
158
159impl std::fmt::Debug for Router {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 write!(f, "Router({:?})", self.node_id)
162 }
163}
164
165const OVERNET_CIRCUIT_PROTOCOL: &'static str = "Overnet:0";
168
169impl Router {
170 pub fn new(router_interval: Option<Duration>) -> Result<Arc<Self>, Error> {
173 Router::with_node_id(generate_node_id(), router_interval)
174 }
175
176 pub fn with_node_id(
178 node_id: NodeId,
179 router_interval: Option<Duration>,
180 ) -> Result<Arc<Self>, Error> {
181 let service_map = ServiceMap::new(node_id);
182 let (new_peer_sender, new_peer_receiver) = futures::channel::mpsc::channel(1);
183 let (circuit_node, circuit_connections) = if let Some(interval) = router_interval {
184 let (a, b) = circuit::ConnectionNode::new_with_router(
185 &node_id.circuit_string(),
186 OVERNET_CIRCUIT_PROTOCOL,
187 interval,
188 new_peer_sender,
189 )?;
190 (a, b.boxed())
191 } else {
192 let (a, b) = circuit::ConnectionNode::new(
193 &node_id.circuit_string(),
194 OVERNET_CIRCUIT_PROTOCOL,
195 new_peer_sender,
196 )?;
197 (a, b.boxed())
198 };
199 let router = Arc::new(Router {
200 node_id,
201 service_map,
202 peers: Mutex::new(PeerMaps { circuit_clients: BTreeMap::new() }),
203 proxied_streams: Mutex::new(HashMap::new()),
204 pending_transfers: Mutex::new(PendingTransferMap::new()),
205 task: Mutex::new(None),
206 ascendd_client_routing: AtomicBool::new(true),
208 circuit_node,
209 });
210
211 let weak_router = Arc::downgrade(&router);
212 *router.task.lock().now_or_never().unwrap() = Some(Task::spawn(log_errors(
213 run_circuits(weak_router, circuit_connections, new_peer_receiver),
214 format!("router {:?} support loop failed", node_id),
215 )));
216
217 Ok(router)
218 }
219
220 pub fn circuit_node(&self) -> &circuit::Node {
223 self.circuit_node.node()
224 }
225
226 pub fn node_id(&self) -> NodeId {
228 self.node_id
229 }
230
231 pub fn client_routing(&self) -> AscenddClientRouting {
233 if self.ascendd_client_routing.load(std::sync::atomic::Ordering::SeqCst) {
234 AscenddClientRouting::Enabled
235 } else {
236 AscenddClientRouting::Disabled
237 }
238 }
239
240 pub fn set_client_routing(&self, client_routing: AscenddClientRouting) {
242 let client_routing = match client_routing {
243 AscenddClientRouting::Enabled => true,
244 AscenddClientRouting::Disabled => false,
245 };
246 self.ascendd_client_routing.store(client_routing, std::sync::atomic::Ordering::SeqCst);
247 }
248
249 pub(crate) fn service_map(&self) -> &ServiceMap {
250 &self.service_map
251 }
252
253 pub async fn connect_to_service(
255 self: &Arc<Self>,
256 node_id: NodeId,
257 service_name: &str,
258 chan: Channel,
259 ) -> Result<(), Error> {
260 let is_local = node_id == self.node_id;
261 log::trace!(
262 service_name:%,
263 node_id = node_id.0,
264 local = is_local;
265 "Request connect_to_service",
266 );
267 if is_local {
268 self.service_map().connect(service_name, chan).await
269 } else {
270 self.client_peer(node_id)
271 .await
272 .with_context(|| {
273 format_err!(
274 "Fetching client peer for new stream to {:?} for service {:?}",
275 node_id,
276 service_name,
277 )
278 })?
279 .new_stream(service_name, chan, self)
280 .await
281 }
282 }
283
284 pub async fn register_service(
287 &self,
288 service_name: String,
289 provider: impl Fn(fidl::Channel) -> Result<(), Error> + Send + 'static,
290 ) -> Result<(), Error> {
291 self.service_map().register_service(service_name, provider).await;
292 Ok(())
293 }
294
295 pub async fn new_list_peers_context(&self) -> ListPeersContext {
297 ListPeersContext(Mutex::new(Some(self.service_map.new_list_peers_observer().await)))
298 }
299
300 async fn client_peer(self: &Arc<Self>, peer_node_id: NodeId) -> Result<Arc<Peer>, Error> {
301 loop {
302 let mut peers = self.peers.lock().await;
303 match peers.circuit_clients.get_mut(&peer_node_id) {
304 Some(CircuitState::Peer(peer)) => break Ok(Arc::clone(&peer)),
305 Some(CircuitState::Waiters(waiters)) => {
306 let (sender, receiver) = oneshot::channel();
307 waiters.push(sender);
308 std::mem::drop(peers);
309 let _ = receiver.await;
310 }
311 None => {
312 peers.circuit_clients.insert(peer_node_id, CircuitState::Waiters(Vec::new()));
313 }
314 }
315 }
316 }
317
318 fn add_proxied(
319 self: &Arc<Self>,
320 proxied_streams: &mut HashMap<HandleKey, ProxiedHandle>,
321 this_handle_key: HandleKey,
322 pair_handle_key: HandleKey,
323 remove_sender: futures::channel::oneshot::Sender<RemoveFromProxyTable>,
324 f: impl 'static + Send + Future<Output = Result<(), Error>>,
325 ) {
326 let router = Arc::downgrade(&self);
327 let proxy_task = Task::spawn(async move {
328 if let Err(e) = f.await {
329 log::trace!(this_handle_key:?, pair_handle_key:?; "Proxy failed: {:?}", e);
330 } else {
331 log::trace!(this_handle_key:?, pair_handle_key:?; "Proxy completed successfully",);
332 }
333 if let Some(router) = Weak::upgrade(&router) {
334 router.remove_proxied(this_handle_key, pair_handle_key).await;
335 }
336 });
337 assert!(
338 proxied_streams
339 .insert(
340 this_handle_key,
341 ProxiedHandle { remove_sender, original_paired: pair_handle_key, proxy_task },
342 )
343 .is_none()
344 );
345 }
346
347 async fn remove_proxied(
350 self: &Arc<Self>,
351 this_handle_key: HandleKey,
352 pair_handle_key: HandleKey,
353 ) {
354 let mut proxied_streams = self.proxied_streams.lock().await;
355 log::trace!(
356 node_id = self.node_id.0,
357 this_handle_key:?,
358 pair_handle_key:?,
359 all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
360 "REMOVE_PROXIED",
361 );
362 if let Some(removed) = proxied_streams.remove(&this_handle_key) {
363 assert_eq!(removed.original_paired, pair_handle_key);
364 let _ = removed.remove_sender.send(RemoveFromProxyTable::Dropped);
365 removed.proxy_task.detach();
366 }
367 }
368
369 pub(crate) async fn send_proxied(
372 self: &Arc<Self>,
373 handle: NullableHandle,
374 conn: PeerConnRef<'_>,
375 ) -> Result<ZirconHandle, Error> {
376 let raw_handle = handle.raw_handle(); let info = handle_info(handle.as_handle_ref())
378 .with_context(|| format!("Getting handle information for {}", raw_handle))?;
379 let mut proxied_streams = self.proxied_streams.lock().await;
380 log::trace!(
381 node_id = self.node_id.0,
382 handle:?,
383 info:?,
384 all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
385 "SEND_PROXIED",
386 );
387 if let Some(pair) = proxied_streams.remove(&info.pair_handle_key) {
388 drop(proxied_streams);
392 assert_eq!(info.this_handle_key, pair.original_paired);
393 log::trace!(
394 handle:?,
395 orig_pair:? = pair.original_paired;
396 "Send paired proxied"
397 );
398 let drain_stream = conn.alloc_uni().await?.into();
401 let (stream_ref_sender, stream_ref_receiver) = StreamRefSender::new();
402 pair.remove_sender
403 .send(RemoveFromProxyTable::InitiateTransfer {
404 paired_handle: handle,
405 drain_stream,
406 stream_ref_sender,
407 })
408 .map_err(|_| format_err!("Failed to initiate transfer"))?;
409 let stream_ref = stream_ref_receiver
410 .await
411 .with_context(|| format!("waiting for stream_ref for {:?}", raw_handle))?;
412 pair.proxy_task.detach();
413 match info.handle_type {
414 HandleType::Channel(rights) => {
415 Ok(ZirconHandle::Channel(ChannelHandle { stream_ref, rights }))
416 }
417 HandleType::Socket(socket_type, rights) => {
418 Ok(ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights }))
419 }
420 HandleType::EventPair => Ok(ZirconHandle::EventPair(EventPairHandle {
421 stream_ref,
422 rights: EventPairRights::empty(),
423 })),
424 }
425 } else {
426 log::trace!(handle:?; "Send proxied");
428 let (tx, rx) = futures::channel::oneshot::channel();
429 let rx = ProxyTransferInitiationReceiver::new(rx.map_err(move |_| {
430 format_err!(
431 "cancelled transfer via send_proxied {:?}\n{}",
432 info,
433 111 )
435 }));
436 let (stream_writer, stream_reader) = conn.alloc_bidi().await?;
437 let stream_ref = StreamRef::Creating(StreamId { id: stream_writer.id() });
438 Ok(match info.handle_type {
439 HandleType::Channel(rights) => {
440 self.add_proxied(
441 &mut *proxied_streams,
442 info.this_handle_key,
443 info.pair_handle_key,
444 tx,
445 crate::proxy::spawn_send(
446 Channel::from_handle(handle).into_proxied()?,
447 rx,
448 stream_writer.into(),
449 stream_reader.into(),
450 Arc::downgrade(&self),
451 ),
452 );
453 ZirconHandle::Channel(ChannelHandle { stream_ref, rights })
454 }
455 HandleType::Socket(socket_type, rights) => {
456 self.add_proxied(
457 &mut *proxied_streams,
458 info.this_handle_key,
459 info.pair_handle_key,
460 tx,
461 crate::proxy::spawn_send(
462 Socket::from_handle(handle).into_proxied()?,
463 rx,
464 stream_writer.into(),
465 stream_reader.into(),
466 Arc::downgrade(&self),
467 ),
468 );
469 ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights })
470 }
471 HandleType::EventPair => {
472 self.add_proxied(
473 &mut *proxied_streams,
474 info.this_handle_key,
475 info.pair_handle_key,
476 tx,
477 crate::proxy::spawn_send(
478 EventPair::from_handle(handle).into_proxied()?,
479 rx,
480 stream_writer.into(),
481 stream_reader.into(),
482 Arc::downgrade(&self),
483 ),
484 );
485 ZirconHandle::EventPair(EventPairHandle {
486 stream_ref,
487 rights: EventPairRights::empty(),
488 })
489 }
490 })
491 }
492 }
493
494 pub(crate) async fn recv_proxied(
497 self: &Arc<Self>,
498 handle: ZirconHandle,
499 conn: PeerConnRef<'_>,
500 ) -> Result<NullableHandle, Error> {
501 match handle {
502 ZirconHandle::Channel(ChannelHandle { stream_ref, rights }) => {
503 self.recv_proxied_handle(conn, stream_ref, move || Ok(Channel::create()), rights)
504 .await
505 }
506 ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights }) => {
507 self.recv_proxied_handle(
508 conn,
509 stream_ref,
510 move || {
511 Ok(match socket_type {
512 SocketType::Stream => Socket::create_stream(),
513 SocketType::Datagram => Socket::create_datagram(),
514 })
515 },
516 rights,
517 )
518 .await
519 }
520 ZirconHandle::EventPair(EventPairHandle { stream_ref, rights }) => {
521 self.recv_proxied_handle(conn, stream_ref, move || Ok(EventPair::create()), rights)
522 .await
523 }
524 }
525 }
526
527 async fn recv_proxied_handle<Hdl, CreateType>(
528 self: &Arc<Self>,
529 conn: PeerConnRef<'_>,
530 stream_ref: StreamRef,
531 create_handles: impl FnOnce() -> Result<(CreateType, CreateType), Error> + 'static,
532 rights: CreateType::Rights,
533 ) -> Result<NullableHandle, Error>
534 where
535 Hdl: 'static + for<'a> crate::proxy::ProxyableRW<'a>,
536 CreateType: 'static
537 + fidl::HandleBased
538 + IntoProxied<Proxied = Hdl>
539 + std::fmt::Debug
540 + crate::handle_info::WithRights,
541 {
542 let (tx, rx) = futures::channel::oneshot::channel();
543 let rx = ProxyTransferInitiationReceiver::new(
544 rx.map_err(move |_| format_err!("cancelled transfer via recv_proxied")),
545 );
546 let (h, p) = crate::proxy::spawn_recv(
547 create_handles,
548 rights,
549 rx,
550 stream_ref,
551 conn,
552 Arc::downgrade(&self),
553 )
554 .await?;
555 if let Some(p) = p {
556 let info = handle_info(h.as_handle_ref())?;
557 self.add_proxied(
558 &mut *self.proxied_streams.lock().await,
559 info.pair_handle_key,
560 info.this_handle_key,
561 tx,
562 p,
563 );
564 }
565 Ok(h)
566 }
567
568 pub(crate) async fn post_transfer(
570 &self,
571 transfer_key: TransferKey,
572 other_end: FoundTransfer,
573 ) -> Result<(), Error> {
574 let mut pending_transfers = self.pending_transfers.lock().await;
575 match pending_transfers.insert(transfer_key, PendingTransfer::Complete(other_end)) {
576 Some(PendingTransfer::Complete(_)) => bail!("Duplicate transfer received"),
577 Some(PendingTransfer::Waiting(w)) => w.wake(),
578 None => (),
579 }
580 Ok(())
581 }
582
583 fn poll_find_transfer(
584 &self,
585 ctx: &mut Context<'_>,
586 transfer_key: TransferKey,
587 lock: &mut MutexTicket<'_, PendingTransferMap>,
588 ) -> Poll<Result<FoundTransfer, Error>> {
589 let mut pending_transfers = ready!(lock.poll(ctx));
590 if let Some(PendingTransfer::Complete(other_end)) = pending_transfers.remove(&transfer_key)
591 {
592 Poll::Ready(Ok(other_end))
593 } else {
594 pending_transfers.insert(transfer_key, PendingTransfer::Waiting(ctx.waker().clone()));
595 Poll::Pending
596 }
597 }
598
599 pub(crate) async fn find_transfer(
601 &self,
602 transfer_key: TransferKey,
603 ) -> Result<FoundTransfer, Error> {
604 let mut lock = MutexTicket::new(&self.pending_transfers);
605 poll_fn(|ctx| self.poll_find_transfer(ctx, transfer_key, &mut lock)).await
606 }
607
608 pub(crate) async fn open_transfer(
611 self: &Arc<Router>,
612 target: NodeId,
613 transfer_key: TransferKey,
614 handle: NullableHandle,
615 ) -> Result<OpenedTransfer, Error> {
616 if target == self.node_id {
617 let info = handle_info(handle.as_handle_ref())?;
621 let mut proxied_streams = self.proxied_streams.lock().await;
622 log::trace!(
623 node_id = self.node_id.0,
624 key:? = transfer_key,
625 info:? = info,
626 all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
627 "OPEN_TRANSFER_REMOVE_PROXIED",
628 );
629 if let Some(removed) = proxied_streams.remove(&info.this_handle_key) {
630 assert_eq!(removed.original_paired, info.pair_handle_key);
631 assert!(removed.remove_sender.send(RemoveFromProxyTable::Dropped).is_ok());
632 removed.proxy_task.detach();
633 }
634 if let Some(removed) = proxied_streams.remove(&info.pair_handle_key) {
635 assert_eq!(removed.original_paired, info.this_handle_key);
636 assert!(removed.remove_sender.send(RemoveFromProxyTable::Dropped).is_ok());
637 removed.proxy_task.detach();
638 }
639 self.post_transfer(transfer_key, FoundTransfer::Fused(handle)).await?;
640 Ok(OpenedTransfer::Fused)
641 } else {
642 if let Some((writer, reader)) =
643 self.client_peer(target).await?.send_open_transfer(transfer_key).await
644 {
645 Ok(OpenedTransfer::Remote(writer, reader, handle))
646 } else {
647 bail!("{:?} failed sending open transfer to {:?}", self.node_id, target)
648 }
649 }
650 }
651
652 pub(crate) async fn client_closed(&self, peer_node_id: NodeId) {
653 self.peers.lock().await.circuit_clients.remove(&peer_node_id);
654 }
655}
656
657async fn run_circuits(
659 router: Weak<Router>,
660 connections: impl futures::Stream<Item = circuit::Connection> + Send,
661 peers: impl futures::Stream<Item = String> + Send,
662) -> Result<(), Error> {
663 let new_peer_fut = {
665 let router = router.clone();
666 async move {
667 let mut peers = pin!(peers);
668 while let Some(peer_node_id) = peers.next().await {
669 let router = match router.upgrade() {
670 Some(x) => x,
671 None => {
672 log::warn!("Router disappeared from under circuit runner");
673 break;
674 }
675 };
676
677 let res = async {
678 let peer_node_id_num = NodeId::from_circuit_string(&peer_node_id)
679 .map_err(|_| format_err!("Invalid node id: {:?}", peer_node_id))?;
680 let mut peers = router.peers.lock().await;
681
682 if peers.circuit_clients.get(&peer_node_id_num).and_then(|x| x.peer()).is_some()
683 {
684 log::warn!(peer:? = peer_node_id; "Re-establishing connection");
685 }
686
687 let (reader, writer_remote) = circuit::stream::stream();
688 let (reader_remote, writer) = circuit::stream::stream();
689 let conn = router
690 .circuit_node
691 .connect_to_peer(&peer_node_id, reader_remote, writer_remote)
692 .await?;
693 let peer = Peer::new_circuit_client(
694 conn,
695 writer,
696 reader,
697 router.service_map.new_local_service_observer().await,
698 &router,
699 peer_node_id_num,
700 )?;
701
702 if let Some(CircuitState::Waiters(waiters)) =
703 peers.circuit_clients.insert(peer_node_id_num, CircuitState::Peer(peer))
704 {
705 for waiter in waiters {
706 let _ = waiter.send(());
707 }
708 }
709
710 Result::<_, Error>::Ok(())
711 }
712 .await;
713
714 if let Err(e) = res {
715 log::warn!(peer:? = peer_node_id; "Attempt to connect to peer failed: {:?}", e);
716 }
717 }
718 }
719 };
720
721 let new_conn_fut = async move {
723 let mut connections = pin!(connections);
724 while let Some(conn) = connections.next().await {
725 let peer_name = conn.from().to_owned();
726 let res = async {
727 let router = router.upgrade().ok_or_else(|| format_err!("router gone"))?;
728 Peer::new_circuit_server(conn, &router).await?;
729 Result::<_, Error>::Ok(())
730 }
731 .await;
732
733 if let Err(e) = res {
734 log::warn!(
735 peer:? = peer_name;
736 "Attempt to receive connection from peer failed: {:?}",
737 e
738 );
739 }
740 }
741 };
742
743 futures::future::join(new_peer_fut, new_conn_fut).await;
744 Ok(())
745}
746
747#[cfg(test)]
748mod tests {
749
750 use super::*;
751 use crate::test_util::*;
752 use circuit::Quality;
753 use circuit::multi_stream::multi_stream_node_connection;
754 use circuit::stream::stream;
755
756 #[fuchsia::test]
757 async fn no_op(run: usize) {
758 let mut node_id_gen = NodeIdGenerator::new("router::no_op", run);
759 node_id_gen.new_router().unwrap();
760 let id = node_id_gen.next().unwrap();
761 assert_eq!(Router::with_node_id(id, None).unwrap().node_id, id);
762 }
763
764 async fn register_test_service(
765 serving_router: Arc<Router>,
766 client_router: Arc<Router>,
767 service: &'static str,
768 ) -> futures::channel::oneshot::Receiver<Channel> {
769 use fuchsia_sync::Mutex;
770 let (send, recv) = futures::channel::oneshot::channel();
771 serving_router
772 .service_map()
773 .register_service(service.to_string(), {
774 let sender = Mutex::new(Some(send));
775 move |chan| {
776 println!("{} got request", service);
777 sender.lock().take().unwrap().send(chan).unwrap();
778 println!("{} forwarded channel", service);
779 Ok(())
780 }
781 })
782 .await;
783 let serving_node_id = serving_router.node_id();
784 println!("{} wait for service to appear @ client", service);
785 let lpc = client_router.new_list_peers_context().await;
786 loop {
787 let peers = lpc.list_peers().await.unwrap();
788 println!("{} got peers {:?}", service, peers);
789 if peers
790 .iter()
791 .find(move |peer| {
792 serving_node_id == peer.node_id
793 && peer
794 .services
795 .iter()
796 .find(move |&advertised_service| advertised_service == service)
797 .is_some()
798 })
799 .is_some()
800 {
801 break;
802 }
803 }
804 recv
805 }
806
807 async fn run_two_node<
808 F: 'static + Clone + Sync + Send + Fn(Arc<Router>, Arc<Router>) -> Fut,
809 Fut: 'static + Send + Future<Output = Result<(), Error>>,
810 >(
811 name: &'static str,
812 run: usize,
813 f: F,
814 ) -> Result<(), Error> {
815 let mut node_id_gen = NodeIdGenerator::new(name, run);
816 let router1 = node_id_gen.new_router()?;
817 let router2 = node_id_gen.new_router()?;
818 let (circuit1_reader, circuit1_writer) = stream();
819 let (circuit2_reader, circuit2_writer) = stream();
820 let (out_1, _) = futures::channel::mpsc::unbounded();
821 let (out_2, _) = futures::channel::mpsc::unbounded();
822
823 let conn_1 = multi_stream_node_connection(
824 router1.circuit_node(),
825 circuit1_reader,
826 circuit2_writer,
827 true,
828 Quality::IN_PROCESS,
829 out_1,
830 "router1".to_owned(),
831 );
832 let conn_2 = multi_stream_node_connection(
833 router2.circuit_node(),
834 circuit2_reader,
835 circuit1_writer,
836 false,
837 Quality::IN_PROCESS,
838 out_2,
839 "router2".to_owned(),
840 );
841 let _fwd = Task::spawn(async move {
842 if let Err(e) = futures::future::try_join(conn_1, conn_2).await {
843 log::trace!("forwarding failed: {:?}", e)
844 }
845 });
846 f(router1, router2).await
847 }
848
849 #[fuchsia::test]
850 async fn no_op_env(run: usize) -> Result<(), Error> {
851 run_two_node("router::no_op_env", run, |_router1, _router2| async { Ok(()) }).await
852 }
853
854 #[fuchsia::test]
855 async fn create_stream(run: usize) -> Result<(), Error> {
856 run_two_node("create_stream", run, |router1, router2| async move {
857 let (_, p) = fidl::Channel::create();
858 println!("create_stream: register service");
859 let s = register_test_service(router2.clone(), router1.clone(), "create_stream").await;
860 println!("create_stream: connect to service");
861 router1.connect_to_service(router2.node_id, "create_stream", p).await?;
862 println!("create_stream: wait for connection");
863 let _ = s.await?;
864 Ok(())
865 })
866 .await
867 }
868
869 #[fuchsia::test]
870 async fn send_datagram_immediately(run: usize) -> Result<(), Error> {
871 run_two_node("send_datagram_immediately", run, |router1, router2| async move {
872 let (c, p) = fidl::Channel::create();
873 println!("send_datagram_immediately: register service");
874 let s = register_test_service(
875 router2.clone(),
876 router1.clone(),
877 "send_datagram_immediately",
878 )
879 .await;
880 println!("send_datagram_immediately: connect to service");
881 router1.connect_to_service(router2.node_id, "send_datagram_immediately", p).await?;
882 println!("send_datagram_immediately: wait for connection");
883 let s = s.await?;
884 let c = fidl::AsyncChannel::from_channel(c);
885 let s = fidl::AsyncChannel::from_channel(s);
886 c.write(&[1, 2, 3, 4, 5], &mut Vec::new())?;
887 let mut buf = fidl::MessageBufEtc::new();
888 println!("send_datagram_immediately: wait for datagram");
889 s.recv_etc_msg(&mut buf).await?;
890 assert_eq!(buf.n_handle_infos(), 0);
891 assert_eq!(buf.bytes(), &[1, 2, 3, 4, 5]);
892 Ok(())
893 })
894 .await
895 }
896
897 #[fuchsia::test]
898 async fn ping_pong(run: usize) -> Result<(), Error> {
899 run_two_node("ping_pong", run, |router1, router2| async move {
900 let (c, p) = fidl::Channel::create();
901 println!("ping_pong: register service");
902 let s = register_test_service(router2.clone(), router1.clone(), "ping_pong").await;
903 println!("ping_pong: connect to service");
904 router1.connect_to_service(router2.node_id, "ping_pong", p).await?;
905 println!("ping_pong: wait for connection");
906 let s = s.await?;
907 let c = fidl::AsyncChannel::from_channel(c);
908 let s = fidl::AsyncChannel::from_channel(s);
909 println!("ping_pong: send ping");
910 c.write(&[1, 2, 3, 4, 5], &mut Vec::new())?;
911 println!("ping_pong: receive ping");
912 let mut buf = fidl::MessageBufEtc::new();
913 s.recv_etc_msg(&mut buf).await?;
914 assert_eq!(buf.n_handle_infos(), 0);
915 assert_eq!(buf.bytes(), &[1, 2, 3, 4, 5]);
916 println!("ping_pong: send pong");
917 s.write(&[9, 8, 7, 6, 5, 4, 3, 2, 1], &mut Vec::new())?;
918 println!("ping_pong: receive pong");
919 let mut buf = fidl::MessageBufEtc::new();
920 c.recv_etc_msg(&mut buf).await?;
921 assert_eq!(buf.n_handle_infos(), 0);
922 assert_eq!(buf.bytes(), &[9, 8, 7, 6, 5, 4, 3, 2, 1]);
923 Ok(())
924 })
925 .await
926 }
927
928 fn ensure_pending(f: &mut (impl Send + Unpin + Future<Output = ()>)) {
929 let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
930 for _ in 0..1000 {
932 assert!(f.poll_unpin(&mut ctx).is_pending());
933 }
934 }
935
936 #[fuchsia::test]
937 async fn concurrent_list_peer_calls_will_error(run: usize) -> Result<(), Error> {
938 let mut node_id_gen = NodeIdGenerator::new("concurrent_list_peer_calls_will_error", run);
939 let n = node_id_gen.new_router().unwrap();
940 let lp = n.new_list_peers_context().await;
941 lp.list_peers().await.unwrap();
942 let mut never_completes = async {
943 lp.list_peers().await.unwrap();
944 }
945 .boxed();
946 ensure_pending(&mut never_completes);
947 lp.list_peers().await.expect_err("Concurrent list peers should fail");
948 ensure_pending(&mut never_completes);
949 Ok(())
950 }
951}