1mod service_map;
22
23use self::service_map::ServiceMap;
24use crate::future_help::{log_errors, Observer};
25use crate::handle_info::{handle_info, HandleKey, HandleType};
26use crate::labels::{NodeId, TransferKey};
27use crate::peer::{FramedStreamReader, FramedStreamWriter, Peer, PeerConnRef};
28use crate::proxy::{
29 IntoProxied, ProxyTransferInitiationReceiver, RemoveFromProxyTable, StreamRefSender,
30};
31use anyhow::{bail, format_err, Context as _, Error};
32use async_utils::mutex_ticket::MutexTicket;
33use fidl::{AsHandleRef, Channel, EventPair, Handle, HandleBased, Socket};
34use fidl_fuchsia_overnet_protocol::{
35 ChannelHandle, EventPairHandle, EventPairRights, SocketHandle, SocketType, StreamId, StreamRef,
36 ZirconHandle,
37};
38use fuchsia_async::Task;
39use futures::channel::oneshot;
40use futures::future::poll_fn;
41use futures::lock::Mutex;
42use futures::prelude::*;
43use futures::ready;
44use std::collections::{BTreeMap, HashMap};
45use std::pin::pin;
46use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
47use std::sync::{Arc, Weak};
48use std::task::{Context, Poll, Waker};
49use std::time::Duration;
50
51pub use self::service_map::ListablePeer;
52
53#[derive(Debug)]
54enum PendingTransfer {
55 Complete(FoundTransfer),
56 Waiting(Waker),
57}
58
59type PendingTransferMap = BTreeMap<TransferKey, PendingTransfer>;
60
61#[derive(Debug)]
62pub(crate) enum FoundTransfer {
63 Fused(Handle),
64 Remote(FramedStreamWriter, FramedStreamReader),
65}
66
67#[derive(Debug)]
68pub(crate) enum OpenedTransfer {
69 Fused,
70 Remote(FramedStreamWriter, FramedStreamReader, Handle),
71}
72
73#[derive(Debug)]
74#[allow(dead_code)]
75enum CircuitState {
76 Waiters(Vec<oneshot::Sender<()>>),
77 Peer(Arc<Peer>),
78}
79
80impl CircuitState {
81 fn peer(&self) -> Option<Arc<Peer>> {
82 if let CircuitState::Peer(peer) = self {
83 Some(Arc::clone(peer))
84 } else {
85 None
86 }
87 }
88}
89
90#[derive(Debug)]
91struct PeerMaps {
92 circuit_clients: BTreeMap<NodeId, CircuitState>,
93}
94
95#[derive(Debug)]
97pub struct ListPeersContext(Mutex<Option<Observer<Vec<ListablePeer>>>>);
98
99static LIST_PEERS_CALL: AtomicU64 = AtomicU64::new(0);
100
101impl ListPeersContext {
102 pub async fn list_peers(&self) -> Result<Vec<ListablePeer>, Error> {
104 let call_id = LIST_PEERS_CALL.fetch_add(1, Ordering::SeqCst);
105 log::trace!(list_peers_call = call_id; "get observer");
106 let mut obs = self
107 .0
108 .lock()
109 .await
110 .take()
111 .ok_or_else(|| anyhow::format_err!("Already listing peers"))?;
112 log::trace!(list_peers_call = call_id; "wait for value");
113 let r = obs.next().await;
114 log::trace!(list_peers_call = call_id; "replace observer");
115 *self.0.lock().await = Some(obs);
116 log::trace!(list_peers_call = call_id; "return");
117 Ok(r.unwrap_or_else(Vec::new))
118 }
119}
120
121pub enum AscenddClientRouting {
123 Enabled,
125 Disabled,
127}
128
129pub struct Router {
134 node_id: NodeId,
136 peers: Mutex<PeerMaps>,
138 service_map: ServiceMap,
139 proxied_streams: Mutex<HashMap<HandleKey, ProxiedHandle>>,
140 pending_transfers: Mutex<PendingTransferMap>,
141 task: Mutex<Option<Task<()>>>,
142 ascendd_client_routing: AtomicBool,
144 circuit_node: circuit::ConnectionNode,
145}
146
147struct ProxiedHandle {
148 remove_sender: futures::channel::oneshot::Sender<RemoveFromProxyTable>,
149 original_paired: HandleKey,
150 proxy_task: Task<()>,
151}
152
153fn generate_node_id() -> NodeId {
155 rand::random::<u64>().into()
156}
157
158fn sorted<T: std::cmp::Ord>(mut v: Vec<T>) -> Vec<T> {
159 v.sort();
160 v
161}
162
163impl std::fmt::Debug for Router {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 write!(f, "Router({:?})", self.node_id)
166 }
167}
168
169const OVERNET_CIRCUIT_PROTOCOL: &'static str = "Overnet:0";
172
173impl Router {
174 pub fn new(router_interval: Option<Duration>) -> Result<Arc<Self>, Error> {
177 Router::with_node_id(generate_node_id(), router_interval)
178 }
179
180 pub fn with_node_id(
182 node_id: NodeId,
183 router_interval: Option<Duration>,
184 ) -> Result<Arc<Self>, Error> {
185 let service_map = ServiceMap::new(node_id);
186 let (new_peer_sender, new_peer_receiver) = futures::channel::mpsc::channel(1);
187 let (circuit_node, circuit_connections) = if let Some(interval) = router_interval {
188 let (a, b) = circuit::ConnectionNode::new_with_router(
189 &node_id.circuit_string(),
190 OVERNET_CIRCUIT_PROTOCOL,
191 interval,
192 new_peer_sender,
193 )?;
194 (a, b.boxed())
195 } else {
196 let (a, b) = circuit::ConnectionNode::new(
197 &node_id.circuit_string(),
198 OVERNET_CIRCUIT_PROTOCOL,
199 new_peer_sender,
200 )?;
201 (a, b.boxed())
202 };
203 let router = Arc::new(Router {
204 node_id,
205 service_map,
206 peers: Mutex::new(PeerMaps { circuit_clients: BTreeMap::new() }),
207 proxied_streams: Mutex::new(HashMap::new()),
208 pending_transfers: Mutex::new(PendingTransferMap::new()),
209 task: Mutex::new(None),
210 ascendd_client_routing: AtomicBool::new(true),
212 circuit_node,
213 });
214
215 let weak_router = Arc::downgrade(&router);
216 *router.task.lock().now_or_never().unwrap() = Some(Task::spawn(log_errors(
217 run_circuits(weak_router, circuit_connections, new_peer_receiver),
218 format!("router {:?} support loop failed", node_id),
219 )));
220
221 Ok(router)
222 }
223
224 pub fn circuit_node(&self) -> &circuit::Node {
227 self.circuit_node.node()
228 }
229
230 pub fn node_id(&self) -> NodeId {
232 self.node_id
233 }
234
235 pub fn client_routing(&self) -> AscenddClientRouting {
237 if self.ascendd_client_routing.load(std::sync::atomic::Ordering::SeqCst) {
238 AscenddClientRouting::Enabled
239 } else {
240 AscenddClientRouting::Disabled
241 }
242 }
243
244 pub fn set_client_routing(&self, client_routing: AscenddClientRouting) {
246 let client_routing = match client_routing {
247 AscenddClientRouting::Enabled => true,
248 AscenddClientRouting::Disabled => false,
249 };
250 self.ascendd_client_routing.store(client_routing, std::sync::atomic::Ordering::SeqCst);
251 }
252
253 pub(crate) fn service_map(&self) -> &ServiceMap {
254 &self.service_map
255 }
256
257 pub async fn connect_to_service(
259 self: &Arc<Self>,
260 node_id: NodeId,
261 service_name: &str,
262 chan: Channel,
263 ) -> Result<(), Error> {
264 let is_local = node_id == self.node_id;
265 log::trace!(
266 service_name:%,
267 node_id = node_id.0,
268 local = is_local;
269 "Request connect_to_service",
270 );
271 if is_local {
272 self.service_map().connect(service_name, chan).await
273 } else {
274 self.client_peer(node_id)
275 .await
276 .with_context(|| {
277 format_err!(
278 "Fetching client peer for new stream to {:?} for service {:?}",
279 node_id,
280 service_name,
281 )
282 })?
283 .new_stream(service_name, chan, self)
284 .await
285 }
286 }
287
288 pub async fn register_service(
291 &self,
292 service_name: String,
293 provider: impl Fn(fidl::Channel) -> Result<(), Error> + Send + 'static,
294 ) -> Result<(), Error> {
295 self.service_map().register_service(service_name, provider).await;
296 Ok(())
297 }
298
299 pub async fn new_list_peers_context(&self) -> ListPeersContext {
301 ListPeersContext(Mutex::new(Some(self.service_map.new_list_peers_observer().await)))
302 }
303
304 async fn client_peer(self: &Arc<Self>, peer_node_id: NodeId) -> Result<Arc<Peer>, Error> {
305 loop {
306 let mut peers = self.peers.lock().await;
307 match peers.circuit_clients.get_mut(&peer_node_id) {
308 Some(CircuitState::Peer(peer)) => break Ok(Arc::clone(&peer)),
309 Some(CircuitState::Waiters(waiters)) => {
310 let (sender, receiver) = oneshot::channel();
311 waiters.push(sender);
312 std::mem::drop(peers);
313 let _ = receiver.await;
314 }
315 None => {
316 peers.circuit_clients.insert(peer_node_id, CircuitState::Waiters(Vec::new()));
317 }
318 }
319 }
320 }
321
322 fn add_proxied(
323 self: &Arc<Self>,
324 proxied_streams: &mut HashMap<HandleKey, ProxiedHandle>,
325 this_handle_key: HandleKey,
326 pair_handle_key: HandleKey,
327 remove_sender: futures::channel::oneshot::Sender<RemoveFromProxyTable>,
328 f: impl 'static + Send + Future<Output = Result<(), Error>>,
329 ) {
330 let router = Arc::downgrade(&self);
331 let proxy_task = Task::spawn(async move {
332 if let Err(e) = f.await {
333 log::trace!(this_handle_key:?, pair_handle_key:?; "Proxy failed: {:?}", e);
334 } else {
335 log::trace!(this_handle_key:?, pair_handle_key:?; "Proxy completed successfully",);
336 }
337 if let Some(router) = Weak::upgrade(&router) {
338 router.remove_proxied(this_handle_key, pair_handle_key).await;
339 }
340 });
341 assert!(proxied_streams
342 .insert(
343 this_handle_key,
344 ProxiedHandle { remove_sender, original_paired: pair_handle_key, proxy_task },
345 )
346 .is_none());
347 }
348
349 async fn remove_proxied(
352 self: &Arc<Self>,
353 this_handle_key: HandleKey,
354 pair_handle_key: HandleKey,
355 ) {
356 let mut proxied_streams = self.proxied_streams.lock().await;
357 log::trace!(
358 node_id = self.node_id.0,
359 this_handle_key:?,
360 pair_handle_key:?,
361 all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
362 "REMOVE_PROXIED",
363 );
364 if let Some(removed) = proxied_streams.remove(&this_handle_key) {
365 assert_eq!(removed.original_paired, pair_handle_key);
366 let _ = removed.remove_sender.send(RemoveFromProxyTable::Dropped);
367 removed.proxy_task.detach();
368 }
369 }
370
371 pub(crate) async fn send_proxied(
374 self: &Arc<Self>,
375 handle: Handle,
376 conn: PeerConnRef<'_>,
377 ) -> Result<ZirconHandle, Error> {
378 let raw_handle = handle.raw_handle(); let info = handle_info(handle.as_handle_ref())
380 .with_context(|| format!("Getting handle information for {}", raw_handle))?;
381 let mut proxied_streams = self.proxied_streams.lock().await;
382 log::trace!(
383 node_id = self.node_id.0,
384 handle:?,
385 info:?,
386 all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
387 "SEND_PROXIED",
388 );
389 if let Some(pair) = proxied_streams.remove(&info.pair_handle_key) {
390 drop(proxied_streams);
394 assert_eq!(info.this_handle_key, pair.original_paired);
395 log::trace!(
396 handle:?,
397 orig_pair:? = pair.original_paired;
398 "Send paired proxied"
399 );
400 let drain_stream = conn.alloc_uni().await?.into();
403 let (stream_ref_sender, stream_ref_receiver) = StreamRefSender::new();
404 pair.remove_sender
405 .send(RemoveFromProxyTable::InitiateTransfer {
406 paired_handle: handle,
407 drain_stream,
408 stream_ref_sender,
409 })
410 .map_err(|_| format_err!("Failed to initiate transfer"))?;
411 let stream_ref = stream_ref_receiver
412 .await
413 .with_context(|| format!("waiting for stream_ref for {:?}", raw_handle))?;
414 pair.proxy_task.detach();
415 match info.handle_type {
416 HandleType::Channel(rights) => {
417 Ok(ZirconHandle::Channel(ChannelHandle { stream_ref, rights }))
418 }
419 HandleType::Socket(socket_type, rights) => {
420 Ok(ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights }))
421 }
422 HandleType::EventPair => Ok(ZirconHandle::EventPair(EventPairHandle {
423 stream_ref,
424 rights: EventPairRights::empty(),
425 })),
426 }
427 } else {
428 log::trace!(handle:?; "Send proxied");
430 let (tx, rx) = futures::channel::oneshot::channel();
431 let rx = ProxyTransferInitiationReceiver::new(rx.map_err(move |_| {
432 format_err!(
433 "cancelled transfer via send_proxied {:?}\n{}",
434 info,
435 111 )
437 }));
438 let (stream_writer, stream_reader) = conn.alloc_bidi().await?;
439 let stream_ref = StreamRef::Creating(StreamId { id: stream_writer.id() });
440 Ok(match info.handle_type {
441 HandleType::Channel(rights) => {
442 self.add_proxied(
443 &mut *proxied_streams,
444 info.this_handle_key,
445 info.pair_handle_key,
446 tx,
447 crate::proxy::spawn_send(
448 Channel::from_handle(handle).into_proxied()?,
449 rx,
450 stream_writer.into(),
451 stream_reader.into(),
452 Arc::downgrade(&self),
453 ),
454 );
455 ZirconHandle::Channel(ChannelHandle { stream_ref, rights })
456 }
457 HandleType::Socket(socket_type, rights) => {
458 self.add_proxied(
459 &mut *proxied_streams,
460 info.this_handle_key,
461 info.pair_handle_key,
462 tx,
463 crate::proxy::spawn_send(
464 Socket::from_handle(handle).into_proxied()?,
465 rx,
466 stream_writer.into(),
467 stream_reader.into(),
468 Arc::downgrade(&self),
469 ),
470 );
471 ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights })
472 }
473 HandleType::EventPair => {
474 self.add_proxied(
475 &mut *proxied_streams,
476 info.this_handle_key,
477 info.pair_handle_key,
478 tx,
479 crate::proxy::spawn_send(
480 EventPair::from_handle(handle).into_proxied()?,
481 rx,
482 stream_writer.into(),
483 stream_reader.into(),
484 Arc::downgrade(&self),
485 ),
486 );
487 ZirconHandle::EventPair(EventPairHandle {
488 stream_ref,
489 rights: EventPairRights::empty(),
490 })
491 }
492 })
493 }
494 }
495
496 pub(crate) async fn recv_proxied(
499 self: &Arc<Self>,
500 handle: ZirconHandle,
501 conn: PeerConnRef<'_>,
502 ) -> Result<Handle, Error> {
503 match handle {
504 ZirconHandle::Channel(ChannelHandle { stream_ref, rights }) => {
505 self.recv_proxied_handle(conn, stream_ref, move || Ok(Channel::create()), rights)
506 .await
507 }
508 ZirconHandle::Socket(SocketHandle { stream_ref, socket_type, rights }) => {
509 self.recv_proxied_handle(
510 conn,
511 stream_ref,
512 move || {
513 Ok(match socket_type {
514 SocketType::Stream => Socket::create_stream(),
515 SocketType::Datagram => Socket::create_datagram(),
516 })
517 },
518 rights,
519 )
520 .await
521 }
522 ZirconHandle::EventPair(EventPairHandle { stream_ref, rights }) => {
523 self.recv_proxied_handle(conn, stream_ref, move || Ok(EventPair::create()), rights)
524 .await
525 }
526 }
527 }
528
529 async fn recv_proxied_handle<Hdl, CreateType>(
530 self: &Arc<Self>,
531 conn: PeerConnRef<'_>,
532 stream_ref: StreamRef,
533 create_handles: impl FnOnce() -> Result<(CreateType, CreateType), Error> + 'static,
534 rights: CreateType::Rights,
535 ) -> Result<Handle, Error>
536 where
537 Hdl: 'static + for<'a> crate::proxy::ProxyableRW<'a>,
538 CreateType: 'static
539 + fidl::HandleBased
540 + IntoProxied<Proxied = Hdl>
541 + std::fmt::Debug
542 + crate::handle_info::WithRights,
543 {
544 let (tx, rx) = futures::channel::oneshot::channel();
545 let rx = ProxyTransferInitiationReceiver::new(
546 rx.map_err(move |_| format_err!("cancelled transfer via recv_proxied")),
547 );
548 let (h, p) = crate::proxy::spawn_recv(
549 create_handles,
550 rights,
551 rx,
552 stream_ref,
553 conn,
554 Arc::downgrade(&self),
555 )
556 .await?;
557 if let Some(p) = p {
558 let info = handle_info(h.as_handle_ref())?;
559 self.add_proxied(
560 &mut *self.proxied_streams.lock().await,
561 info.pair_handle_key,
562 info.this_handle_key,
563 tx,
564 p,
565 );
566 }
567 Ok(h)
568 }
569
570 pub(crate) async fn post_transfer(
572 &self,
573 transfer_key: TransferKey,
574 other_end: FoundTransfer,
575 ) -> Result<(), Error> {
576 let mut pending_transfers = self.pending_transfers.lock().await;
577 match pending_transfers.insert(transfer_key, PendingTransfer::Complete(other_end)) {
578 Some(PendingTransfer::Complete(_)) => bail!("Duplicate transfer received"),
579 Some(PendingTransfer::Waiting(w)) => w.wake(),
580 None => (),
581 }
582 Ok(())
583 }
584
585 fn poll_find_transfer(
586 &self,
587 ctx: &mut Context<'_>,
588 transfer_key: TransferKey,
589 lock: &mut MutexTicket<'_, PendingTransferMap>,
590 ) -> Poll<Result<FoundTransfer, Error>> {
591 let mut pending_transfers = ready!(lock.poll(ctx));
592 if let Some(PendingTransfer::Complete(other_end)) = pending_transfers.remove(&transfer_key)
593 {
594 Poll::Ready(Ok(other_end))
595 } else {
596 pending_transfers.insert(transfer_key, PendingTransfer::Waiting(ctx.waker().clone()));
597 Poll::Pending
598 }
599 }
600
601 pub(crate) async fn find_transfer(
603 &self,
604 transfer_key: TransferKey,
605 ) -> Result<FoundTransfer, Error> {
606 let mut lock = MutexTicket::new(&self.pending_transfers);
607 poll_fn(|ctx| self.poll_find_transfer(ctx, transfer_key, &mut lock)).await
608 }
609
610 pub(crate) async fn open_transfer(
613 self: &Arc<Router>,
614 target: NodeId,
615 transfer_key: TransferKey,
616 handle: Handle,
617 ) -> Result<OpenedTransfer, Error> {
618 if target == self.node_id {
619 let info = handle_info(handle.as_handle_ref())?;
623 let mut proxied_streams = self.proxied_streams.lock().await;
624 log::trace!(
625 node_id = self.node_id.0,
626 key:? = transfer_key,
627 info:? = info,
628 all:? = sorted(proxied_streams.keys().map(|x| *x).collect::<Vec<_>>());
629 "OPEN_TRANSFER_REMOVE_PROXIED",
630 );
631 if let Some(removed) = proxied_streams.remove(&info.this_handle_key) {
632 assert_eq!(removed.original_paired, info.pair_handle_key);
633 assert!(removed.remove_sender.send(RemoveFromProxyTable::Dropped).is_ok());
634 removed.proxy_task.detach();
635 }
636 if let Some(removed) = proxied_streams.remove(&info.pair_handle_key) {
637 assert_eq!(removed.original_paired, info.this_handle_key);
638 assert!(removed.remove_sender.send(RemoveFromProxyTable::Dropped).is_ok());
639 removed.proxy_task.detach();
640 }
641 self.post_transfer(transfer_key, FoundTransfer::Fused(handle)).await?;
642 Ok(OpenedTransfer::Fused)
643 } else {
644 if let Some((writer, reader)) =
645 self.client_peer(target).await?.send_open_transfer(transfer_key).await
646 {
647 Ok(OpenedTransfer::Remote(writer, reader, handle))
648 } else {
649 bail!("{:?} failed sending open transfer to {:?}", self.node_id, target)
650 }
651 }
652 }
653
654 pub(crate) async fn client_closed(&self, peer_node_id: NodeId) {
655 self.peers.lock().await.circuit_clients.remove(&peer_node_id);
656 }
657}
658
659async fn run_circuits(
661 router: Weak<Router>,
662 connections: impl futures::Stream<Item = circuit::Connection> + Send,
663 peers: impl futures::Stream<Item = String> + Send,
664) -> Result<(), Error> {
665 let new_peer_fut = {
667 let router = router.clone();
668 async move {
669 let mut peers = pin!(peers);
670 while let Some(peer_node_id) = peers.next().await {
671 let router = match router.upgrade() {
672 Some(x) => x,
673 None => {
674 log::warn!("Router disappeared from under circuit runner");
675 break;
676 }
677 };
678
679 let res = async {
680 let peer_node_id_num = NodeId::from_circuit_string(&peer_node_id)
681 .map_err(|_| format_err!("Invalid node id: {:?}", peer_node_id))?;
682 let mut peers = router.peers.lock().await;
683
684 if peers.circuit_clients.get(&peer_node_id_num).and_then(|x| x.peer()).is_some()
685 {
686 log::warn!(peer:? = peer_node_id; "Re-establishing connection");
687 }
688
689 let (reader, writer_remote) = circuit::stream::stream();
690 let (reader_remote, writer) = circuit::stream::stream();
691 let conn = router
692 .circuit_node
693 .connect_to_peer(&peer_node_id, reader_remote, writer_remote)
694 .await?;
695 let peer = Peer::new_circuit_client(
696 conn,
697 writer,
698 reader,
699 router.service_map.new_local_service_observer().await,
700 &router,
701 peer_node_id_num,
702 )?;
703
704 if let Some(CircuitState::Waiters(waiters)) =
705 peers.circuit_clients.insert(peer_node_id_num, CircuitState::Peer(peer))
706 {
707 for waiter in waiters {
708 let _ = waiter.send(());
709 }
710 }
711
712 Result::<_, Error>::Ok(())
713 }
714 .await;
715
716 if let Err(e) = res {
717 log::warn!(peer:? = peer_node_id; "Attempt to connect to peer failed: {:?}", e);
718 }
719 }
720 }
721 };
722
723 let new_conn_fut = async move {
725 let mut connections = pin!(connections);
726 while let Some(conn) = connections.next().await {
727 let peer_name = conn.from().to_owned();
728 let res = async {
729 let router = router.upgrade().ok_or_else(|| format_err!("router gone"))?;
730 Peer::new_circuit_server(conn, &router).await?;
731 Result::<_, Error>::Ok(())
732 }
733 .await;
734
735 if let Err(e) = res {
736 log::warn!(
737 peer:? = peer_name;
738 "Attempt to receive connection from peer failed: {:?}",
739 e
740 );
741 }
742 }
743 };
744
745 futures::future::join(new_peer_fut, new_conn_fut).await;
746 Ok(())
747}
748
749#[cfg(test)]
750mod tests {
751
752 use super::*;
753 use crate::test_util::*;
754 use circuit::multi_stream::multi_stream_node_connection;
755 use circuit::stream::stream;
756 use circuit::Quality;
757
758 #[fuchsia::test]
759 async fn no_op(run: usize) {
760 let mut node_id_gen = NodeIdGenerator::new("router::no_op", run);
761 node_id_gen.new_router().unwrap();
762 let id = node_id_gen.next().unwrap();
763 assert_eq!(Router::with_node_id(id, None).unwrap().node_id, id);
764 }
765
766 async fn register_test_service(
767 serving_router: Arc<Router>,
768 client_router: Arc<Router>,
769 service: &'static str,
770 ) -> futures::channel::oneshot::Receiver<Channel> {
771 use fuchsia_sync::Mutex;
772 let (send, recv) = futures::channel::oneshot::channel();
773 serving_router
774 .service_map()
775 .register_service(service.to_string(), {
776 let sender = Mutex::new(Some(send));
777 move |chan| {
778 println!("{} got request", service);
779 sender.lock().take().unwrap().send(chan).unwrap();
780 println!("{} forwarded channel", service);
781 Ok(())
782 }
783 })
784 .await;
785 let serving_node_id = serving_router.node_id();
786 println!("{} wait for service to appear @ client", service);
787 let lpc = client_router.new_list_peers_context().await;
788 loop {
789 let peers = lpc.list_peers().await.unwrap();
790 println!("{} got peers {:?}", service, peers);
791 if peers
792 .iter()
793 .find(move |peer| {
794 serving_node_id == peer.node_id
795 && peer
796 .services
797 .iter()
798 .find(move |&advertised_service| advertised_service == service)
799 .is_some()
800 })
801 .is_some()
802 {
803 break;
804 }
805 }
806 recv
807 }
808
809 async fn run_two_node<
810 F: 'static + Clone + Sync + Send + Fn(Arc<Router>, Arc<Router>) -> Fut,
811 Fut: 'static + Send + Future<Output = Result<(), Error>>,
812 >(
813 name: &'static str,
814 run: usize,
815 f: F,
816 ) -> Result<(), Error> {
817 let mut node_id_gen = NodeIdGenerator::new(name, run);
818 let router1 = node_id_gen.new_router()?;
819 let router2 = node_id_gen.new_router()?;
820 let (circuit1_reader, circuit1_writer) = stream();
821 let (circuit2_reader, circuit2_writer) = stream();
822 let (out_1, _) = futures::channel::mpsc::unbounded();
823 let (out_2, _) = futures::channel::mpsc::unbounded();
824
825 let conn_1 = multi_stream_node_connection(
826 router1.circuit_node(),
827 circuit1_reader,
828 circuit2_writer,
829 true,
830 Quality::IN_PROCESS,
831 out_1,
832 "router1".to_owned(),
833 );
834 let conn_2 = multi_stream_node_connection(
835 router2.circuit_node(),
836 circuit2_reader,
837 circuit1_writer,
838 false,
839 Quality::IN_PROCESS,
840 out_2,
841 "router2".to_owned(),
842 );
843 let _fwd = Task::spawn(async move {
844 if let Err(e) = futures::future::try_join(conn_1, conn_2).await {
845 log::trace!("forwarding failed: {:?}", e)
846 }
847 });
848 f(router1, router2).await
849 }
850
851 #[fuchsia::test]
852 async fn no_op_env(run: usize) -> Result<(), Error> {
853 run_two_node("router::no_op_env", run, |_router1, _router2| async { Ok(()) }).await
854 }
855
856 #[fuchsia::test]
857 async fn create_stream(run: usize) -> Result<(), Error> {
858 run_two_node("create_stream", run, |router1, router2| async move {
859 let (_, p) = fidl::Channel::create();
860 println!("create_stream: register service");
861 let s = register_test_service(router2.clone(), router1.clone(), "create_stream").await;
862 println!("create_stream: connect to service");
863 router1.connect_to_service(router2.node_id, "create_stream", p).await?;
864 println!("create_stream: wait for connection");
865 let _ = s.await?;
866 Ok(())
867 })
868 .await
869 }
870
871 #[fuchsia::test]
872 async fn send_datagram_immediately(run: usize) -> Result<(), Error> {
873 run_two_node("send_datagram_immediately", run, |router1, router2| async move {
874 let (c, p) = fidl::Channel::create();
875 println!("send_datagram_immediately: register service");
876 let s = register_test_service(
877 router2.clone(),
878 router1.clone(),
879 "send_datagram_immediately",
880 )
881 .await;
882 println!("send_datagram_immediately: connect to service");
883 router1.connect_to_service(router2.node_id, "send_datagram_immediately", p).await?;
884 println!("send_datagram_immediately: wait for connection");
885 let s = s.await?;
886 let c = fidl::AsyncChannel::from_channel(c);
887 let s = fidl::AsyncChannel::from_channel(s);
888 c.write(&[1, 2, 3, 4, 5], &mut Vec::new())?;
889 let mut buf = fidl::MessageBufEtc::new();
890 println!("send_datagram_immediately: wait for datagram");
891 s.recv_etc_msg(&mut buf).await?;
892 assert_eq!(buf.n_handle_infos(), 0);
893 assert_eq!(buf.bytes(), &[1, 2, 3, 4, 5]);
894 Ok(())
895 })
896 .await
897 }
898
899 #[fuchsia::test]
900 async fn ping_pong(run: usize) -> Result<(), Error> {
901 run_two_node("ping_pong", run, |router1, router2| async move {
902 let (c, p) = fidl::Channel::create();
903 println!("ping_pong: register service");
904 let s = register_test_service(router2.clone(), router1.clone(), "ping_pong").await;
905 println!("ping_pong: connect to service");
906 router1.connect_to_service(router2.node_id, "ping_pong", p).await?;
907 println!("ping_pong: wait for connection");
908 let s = s.await?;
909 let c = fidl::AsyncChannel::from_channel(c);
910 let s = fidl::AsyncChannel::from_channel(s);
911 println!("ping_pong: send ping");
912 c.write(&[1, 2, 3, 4, 5], &mut Vec::new())?;
913 println!("ping_pong: receive ping");
914 let mut buf = fidl::MessageBufEtc::new();
915 s.recv_etc_msg(&mut buf).await?;
916 assert_eq!(buf.n_handle_infos(), 0);
917 assert_eq!(buf.bytes(), &[1, 2, 3, 4, 5]);
918 println!("ping_pong: send pong");
919 s.write(&[9, 8, 7, 6, 5, 4, 3, 2, 1], &mut Vec::new())?;
920 println!("ping_pong: receive pong");
921 let mut buf = fidl::MessageBufEtc::new();
922 c.recv_etc_msg(&mut buf).await?;
923 assert_eq!(buf.n_handle_infos(), 0);
924 assert_eq!(buf.bytes(), &[9, 8, 7, 6, 5, 4, 3, 2, 1]);
925 Ok(())
926 })
927 .await
928 }
929
930 fn ensure_pending(f: &mut (impl Send + Unpin + Future<Output = ()>)) {
931 let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
932 for _ in 0..1000 {
934 assert!(f.poll_unpin(&mut ctx).is_pending());
935 }
936 }
937
938 #[fuchsia::test]
939 async fn concurrent_list_peer_calls_will_error(run: usize) -> Result<(), Error> {
940 let mut node_id_gen = NodeIdGenerator::new("concurrent_list_peer_calls_will_error", run);
941 let n = node_id_gen.new_router().unwrap();
942 let lp = n.new_list_peers_context().await;
943 lp.list_peers().await.unwrap();
944 let mut never_completes = async {
945 lp.list_peers().await.unwrap();
946 }
947 .boxed();
948 ensure_pending(&mut never_completes);
949 lp.list_peers().await.expect_err("Concurrent list peers should fail");
950 ensure_pending(&mut never_completes);
951 Ok(())
952 }
953}