1mod framed_stream;
6
7pub(crate) use self::framed_stream::{
8 FrameType, FramedStreamReadResult, FramedStreamReader, FramedStreamWriter,
9};
10use crate::coding::{decode_fidl, encode_fidl};
11use crate::future_help::Observer;
12use crate::labels::{Endpoint, NodeId, TransferKey};
13use crate::router::{FoundTransfer, Router};
14use anyhow::{bail, format_err, Context as _, Error};
15use fidl::{Channel, HandleBased};
16use fidl_fuchsia_overnet_protocol::{
17 ChannelHandle, ConfigRequest, ConfigResponse, ConnectToService, ConnectToServiceOptions,
18 OpenTransfer, PeerDescription, PeerMessage, PeerReply, StreamId, ZirconHandle,
19};
20use fuchsia_async::{Task, TimeoutExt};
21use futures::channel::{mpsc, oneshot};
22use futures::lock::Mutex;
23use futures::prelude::*;
24use std::sync::{Arc, Weak};
25use std::time::Duration;
26
27#[derive(Debug)]
28struct Config {}
29
30impl Config {
31 fn negotiate(_request: ConfigRequest) -> (Self, ConfigResponse) {
32 (Config {}, ConfigResponse::default())
33 }
34
35 fn from_response(_response: ConfigResponse) -> Self {
36 Config {}
37 }
38}
39
40#[derive(Debug)]
41enum ClientPeerCommand {
42 ConnectToService(ConnectToService),
43 OpenTransfer(u64, TransferKey, oneshot::Sender<()>),
44}
45
46#[derive(Clone)]
47pub(crate) struct PeerConn {
48 conn: circuit::Connection,
49 node_id: NodeId,
50}
51
52impl std::fmt::Debug for PeerConn {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 self.as_ref().fmt(f)
55 }
56}
57
58impl PeerConn {
59 pub fn from_circuit(conn: circuit::Connection, node_id: NodeId) -> Self {
60 PeerConn { conn, node_id }
61 }
62
63 pub fn as_ref(&self) -> PeerConnRef<'_> {
64 PeerConnRef { conn: &self.conn, node_id: self.node_id }
65 }
66
67 pub fn node_id(&self) -> NodeId {
68 self.node_id
69 }
70}
71
72#[derive(Clone, Copy)]
73pub(crate) struct PeerConnRef<'a> {
74 conn: &'a circuit::Connection,
75 node_id: NodeId,
76}
77
78impl<'a> std::fmt::Debug for PeerConnRef<'a> {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 write!(f, "PeerConn({})", self.node_id.0)
81 }
82}
83
84impl<'a> PeerConnRef<'a> {
85 pub fn from_circuit(conn: &'a circuit::Connection, node_id: NodeId) -> Self {
86 PeerConnRef { conn, node_id }
87 }
88
89 pub fn into_peer_conn(&self) -> PeerConn {
90 PeerConn { conn: self.conn.clone(), node_id: self.node_id }
91 }
92
93 pub fn endpoint(&self) -> Endpoint {
94 if self.conn.is_client() {
95 Endpoint::Client
96 } else {
97 Endpoint::Server
98 }
99 }
100
101 pub fn peer_node_id(&self) -> NodeId {
102 self.node_id
103 }
104
105 pub async fn alloc_uni(&self) -> Result<FramedStreamWriter, Error> {
106 let (circuit_reader, writer) = circuit::stream::stream();
107 let (_reader, circuit_writer) = circuit::stream::stream();
108 let id = self.conn.alloc_stream(circuit_reader, circuit_writer).await?;
109 Ok(FramedStreamWriter::from_circuit(writer, id, self.conn.clone(), self.node_id))
110 }
111
112 pub async fn alloc_bidi(&self) -> Result<(FramedStreamWriter, FramedStreamReader), Error> {
113 let (circuit_reader, writer) = circuit::stream::stream();
114 let (reader, circuit_writer) = circuit::stream::stream();
115 let id = self.conn.alloc_stream(circuit_reader, circuit_writer).await?;
116 Ok((
117 FramedStreamWriter::from_circuit(writer, id, self.conn.clone(), self.node_id),
118 FramedStreamReader::from_circuit(reader, self.conn.clone(), self.node_id),
119 ))
120 }
121
122 pub async fn bind_uni_id(&self, id: u64) -> Result<FramedStreamReader, Error> {
123 Ok(FramedStreamReader::from_circuit(
124 self.conn
125 .bind_stream(id)
126 .await
127 .ok_or_else(|| format_err!("Stream id {} unavailable", id))?
128 .0,
129 self.conn.clone(),
130 self.node_id,
131 ))
132 }
133
134 pub async fn bind_id(
135 &self,
136 id: u64,
137 ) -> Result<(FramedStreamWriter, FramedStreamReader), Error> {
138 let (r, w) = self
139 .conn
140 .bind_stream(id)
141 .await
142 .ok_or_else(|| format_err!("Stream id {} unavailable", id))?;
143 Ok((
144 FramedStreamWriter::from_circuit(w, id, self.conn.clone(), self.node_id),
145 FramedStreamReader::from_circuit(r, self.conn.clone(), self.node_id),
146 ))
147 }
148}
149
150pub(crate) struct Peer {
151 endpoint: Endpoint,
152 conn: PeerConn,
153 commands: Option<mpsc::Sender<ClientPeerCommand>>,
154 _task: Task<()>,
155}
156
157impl std::fmt::Debug for Peer {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 self.debug_id().fmt(f)
160 }
161}
162
163#[derive(Debug)]
166enum RunnerError {
167 RouterGone,
168 ConnectionClosed(Option<String>),
169 BadFrameType { _frame_type: FrameType },
170 HandshakeError { _error: Error },
171 ServiceError { _error: Error },
172}
173
174impl Peer {
175 pub(crate) fn node_id(&self) -> NodeId {
176 self.conn.node_id()
177 }
178
179 pub(crate) fn debug_id(&self) -> impl std::fmt::Debug + std::cmp::PartialEq {
180 (self.node_id(), self.endpoint)
181 }
182
183 pub(crate) fn new_circuit_client(
186 conn: circuit::Connection,
187 conn_stream_writer: circuit::stream::Writer,
188 conn_stream_reader: circuit::stream::Reader,
189 service_observer: Observer<Vec<String>>,
190 router: &Arc<Router>,
191 peer_node_id: NodeId,
192 ) -> Result<Arc<Self>, Error> {
193 let node_id =
194 NodeId::from_circuit_string(conn.from()).map_err(|_| format_err!("Invalid node ID"))?;
195 log::trace!(node_id = router.node_id().0, peer = node_id.0; "NEW CLIENT",);
196 let (command_sender, command_receiver) = mpsc::channel(1);
197 let weak_router = Arc::downgrade(router);
198
199 let client_conn_fut = client_conn_stream(
200 Arc::downgrade(router),
201 node_id,
202 conn_stream_writer,
203 conn_stream_reader,
204 conn.clone(),
205 command_receiver,
206 service_observer,
207 );
208
209 Ok(Arc::new(Self {
210 endpoint: Endpoint::Client,
211 commands: Some(command_sender),
212 _task: Task::spawn(Peer::runner(Endpoint::Client, weak_router.clone(), async move {
213 let result = client_conn_fut.await;
214 if let Some(router) = weak_router.upgrade() {
215 router.client_closed(peer_node_id).await;
216 }
217 result
218 })),
219 conn: PeerConn::from_circuit(conn, node_id),
220 }))
221 }
222
223 pub(crate) async fn new_circuit_server(
225 conn: circuit::Connection,
226 router: &Arc<Router>,
227 ) -> Result<(), Error> {
228 let node_id =
229 NodeId::from_circuit_string(conn.from()).map_err(|_| format_err!("Invalid node ID"))?;
230 log::trace!(node_id = router.node_id().0, peer = node_id.0; "NEW SERVER",);
231 let (conn_stream_reader, conn_stream_writer) = conn
232 .bind_stream(0)
233 .await
234 .ok_or_else(|| format_err!("Could not establish connection"))?;
235 Task::spawn(Peer::runner(
236 Endpoint::Server,
237 Arc::downgrade(router),
238 server_conn_stream(
239 node_id,
240 conn_stream_writer,
241 conn_stream_reader,
242 conn.clone(),
243 Arc::downgrade(router),
244 ),
245 ))
246 .detach();
247 Ok(())
248 }
249
250 async fn runner(
251 endpoint: Endpoint,
252 router: Weak<Router>,
253 f: impl Future<Output = Result<(), RunnerError>>,
254 ) {
255 let result = f.await;
256 let get_router_node_id = || {
257 Weak::upgrade(&router).map(|r| format!("{:?}", r.node_id())).unwrap_or_else(String::new)
258 };
259 if let Err(e) = &result {
260 match e {
261 RunnerError::ConnectionClosed(Some(s)) => log::debug!(
262 node_id:% = get_router_node_id(),
263 endpoint:?;
264 "connection closed (reason: {s})"
265 ),
266 RunnerError::ConnectionClosed(None) => log::debug!(
267 node_id:% = get_router_node_id(),
268 endpoint:?;
269 "connection closed"
270 ),
271 _ => log::warn!(
272 node_id:% = get_router_node_id(),
273 endpoint:?;
274 "runner error: {:?}",
275 e
276 ),
277 }
278 } else {
279 log::trace!(
280 node_id:% = get_router_node_id(),
281 endpoint:?;
282 "finished successfully",
283 );
284 }
285 }
286
287 pub async fn new_stream(
288 &self,
289 service: &str,
290 chan: Channel,
291 router: &Arc<Router>,
292 ) -> Result<(), Error> {
293 if let ZirconHandle::Channel(ChannelHandle { stream_ref, rights }) =
294 router.send_proxied(chan.into_handle(), self.peer_conn_ref()).await?
295 {
296 self.commands
297 .as_ref()
298 .unwrap()
299 .clone()
300 .send(ClientPeerCommand::ConnectToService(ConnectToService {
301 service_name: service.to_string(),
302 stream_ref,
303 rights,
304 options: ConnectToServiceOptions::default(),
305 }))
306 .await?;
307 Ok(())
308 } else {
309 unreachable!();
310 }
311 }
312
313 pub async fn send_open_transfer(
314 &self,
315 transfer_key: TransferKey,
316 ) -> Option<(FramedStreamWriter, FramedStreamReader)> {
317 let io = self.peer_conn_ref().alloc_bidi().await.ok()?;
318 let (tx, rx) = oneshot::channel();
319 self.commands
320 .as_ref()
321 .unwrap()
322 .clone()
323 .send(ClientPeerCommand::OpenTransfer(io.0.id(), transfer_key, tx))
324 .await
325 .ok()?;
326 rx.await.ok()?;
327 Some(io)
328 }
329
330 fn peer_conn_ref(&self) -> PeerConnRef<'_> {
331 self.conn.as_ref()
332 }
333}
334
335const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
336
337async fn client_handshake(
338 my_node_id: NodeId,
339 peer_node_id: NodeId,
340 writer: circuit::stream::Writer,
341 reader: circuit::stream::Reader,
342 conn: circuit::Connection,
343) -> Result<(FramedStreamWriter, FramedStreamReader), Error> {
344 log::trace!(
345 my_node_id = my_node_id.0,
346 clipeer = peer_node_id.0;
347 "client connection stream started",
348 );
349 log::trace!(
351 my_node_id = my_node_id.0,
352 clipeer:? = peer_node_id;
353 "send fidl header"
354 );
355 let msg = [0, 0, 0, fidl::encoding::MAGIC_NUMBER_INITIAL];
356 writer.write(msg.len(), |buf| {
357 buf[..msg.len()].copy_from_slice(&msg);
358 Ok(msg.len())
359 })?;
360 async move {
361 log::trace!(my_node_id = my_node_id.0, clipeer = peer_node_id.0; "send config request");
362 let mut conn_stream_writer =
364 FramedStreamWriter::from_circuit(writer, 0, conn.clone(), peer_node_id);
365 let conn_stream_reader_fut = async move {
366 log::trace!(my_node_id = my_node_id.0, clipeer = peer_node_id.0; "read fidl header");
368 reader.read(4, |_| Ok(((), 4))).await?;
369 Result::<_, Error>::Ok(FramedStreamReader::from_circuit(reader, conn, peer_node_id))
370 }
371 .boxed();
372
373 conn_stream_writer
374 .send(FrameType::Data, &encode_fidl(&mut ConfigRequest::default().clone())?)
375 .await?;
376 log::trace!(my_node_id = my_node_id.0, clipeer = peer_node_id.0; "read config");
378 let mut conn_stream_reader = conn_stream_reader_fut.await?;
379 let _ = Config::from_response(match conn_stream_reader.next().await? {
380 FramedStreamReadResult::Frame(FrameType::Data, mut bytes) => decode_fidl(&mut bytes)?,
381 FramedStreamReadResult::Frame(_, _) => {
382 bail!("Failed to read config response (wrong frame type)")
383 }
384 FramedStreamReadResult::Closed(Some(s)) => {
385 bail!("Failed to read config response ({s})")
386 }
387 FramedStreamReadResult::Closed(None) => bail!("Failed to read config response"),
388 });
389 log::trace!(my_node_id = my_node_id.0, clipeer = peer_node_id.0; "handshake completed");
390
391 Ok((conn_stream_writer, conn_stream_reader))
392 }
393 .on_timeout(QUIC_CONNECTION_TIMEOUT, || Err(format_err!("timeout performing handshake")))
394 .await
395}
396
397struct TrackClientConnection {
398 router: Weak<Router>,
399 node_id: NodeId,
400}
401
402impl TrackClientConnection {
403 async fn new(router: &Arc<Router>, node_id: NodeId) -> TrackClientConnection {
404 router.service_map().add_client_connection(node_id).await;
405 TrackClientConnection { router: Arc::downgrade(router), node_id }
406 }
407}
408
409impl Drop for TrackClientConnection {
410 fn drop(&mut self) {
411 if let Some(router) = Weak::upgrade(&self.router) {
412 let node_id = self.node_id;
413 Task::spawn(
414 async move { router.service_map().remove_client_connection(node_id).await },
415 )
416 .detach();
417 }
418 }
419}
420
421async fn client_conn_stream(
422 router: Weak<Router>,
423 peer_node_id: NodeId,
424 writer: circuit::stream::Writer,
425 reader: circuit::stream::Reader,
426 conn: circuit::Connection,
427 mut commands: mpsc::Receiver<ClientPeerCommand>,
428 mut services: Observer<Vec<String>>,
429) -> Result<(), RunnerError> {
430 let get_router = move || Weak::upgrade(&router).ok_or_else(|| RunnerError::RouterGone);
431 let my_node_id = get_router()?.node_id();
432
433 let (conn_stream_writer, mut conn_stream_reader) =
434 client_handshake(my_node_id, peer_node_id, writer, reader, conn)
435 .map_err(|e| RunnerError::HandshakeError { _error: e })
436 .await?;
437
438 let _track_connection = TrackClientConnection::new(&get_router()?, peer_node_id).await;
439
440 let conn_stream_writer = &Mutex::new(conn_stream_writer);
441
442 let _: ((), (), ()) = futures::future::try_join3(
443 async move {
444 while let Some(command) = commands.next().await {
445 log::trace!(
446 my_node_id = my_node_id.0,
447 clipeer = peer_node_id.0;
448 "handle command: {:?}",
449 command
450 );
451 client_conn_handle_command(command, &mut *conn_stream_writer.lock().await).await?;
452 }
453 log::trace!(my_node_id = my_node_id.0, clipeer = peer_node_id.0; "done commands");
454 Ok(())
455 }
456 .map_err(|e| RunnerError::ServiceError { _error: e }),
457 async move {
458 loop {
459 let (frame_type, mut bytes) = match conn_stream_reader
460 .next()
461 .await
462 .map_err(|e| RunnerError::ServiceError { _error: e })?
463 {
464 FramedStreamReadResult::Frame(frame_type, bytes) => (frame_type, bytes),
465 FramedStreamReadResult::Closed(s) => {
466 return Err(RunnerError::ConnectionClosed(s))
467 }
468 };
469 match frame_type {
470 FrameType::Hello | FrameType::Control | FrameType::Signal => {
471 return Err(RunnerError::BadFrameType { _frame_type: frame_type });
472 }
473 FrameType::Data => {
474 client_conn_handle_incoming_frame(my_node_id, peer_node_id, &mut bytes)
475 .await
476 .map_err(|e| RunnerError::ServiceError { _error: e })?;
477 }
478 }
479 }
480 },
481 async move {
482 loop {
483 let services = services.next().await;
484 log::trace!(
485 my_node_id = my_node_id.0,
486 clipeer = peer_node_id.0;
487 "Send update node description with services: {:?}",
488 services
489 );
490 conn_stream_writer
491 .lock()
492 .await
493 .send(
494 FrameType::Data,
495 &encode_fidl(&mut PeerMessage::UpdateNodeDescription(PeerDescription {
496 services,
497 ..Default::default()
498 }))?,
499 )
500 .await?;
501 }
502 }
503 .map_err(|e| RunnerError::ServiceError { _error: e }),
504 )
505 .await?;
506
507 Ok(())
508}
509
510async fn client_conn_handle_command(
511 command: ClientPeerCommand,
512 conn_stream_writer: &mut FramedStreamWriter,
513) -> Result<(), Error> {
514 match command {
515 ClientPeerCommand::ConnectToService(conn) => {
516 conn_stream_writer
517 .send(FrameType::Data, &encode_fidl(&mut PeerMessage::ConnectToService(conn))?)
518 .await?;
519 }
520 ClientPeerCommand::OpenTransfer(stream_id, transfer_key, sent) => {
521 conn_stream_writer
522 .send(
523 FrameType::Data,
524 &encode_fidl(&mut PeerMessage::OpenTransfer(OpenTransfer {
525 stream_id: StreamId { id: stream_id },
526 transfer_key,
527 }))?,
528 )
529 .await?;
530 let _ = sent.send(());
531 }
532 }
533 Ok(())
534}
535
536async fn client_conn_handle_incoming_frame(
537 my_node_id: NodeId,
538 peer_node_id: NodeId,
539 bytes: &mut [u8],
540) -> Result<(), Error> {
541 let msg: PeerReply = decode_fidl(bytes)?;
542 log::trace!(my_node_id = my_node_id.0, clipeer = peer_node_id.0; "got reply {:?}", msg);
543 match msg {
544 PeerReply::UpdateLinkStatusAck(_) => {
545 }
548 }
549 Ok(())
550}
551
552async fn server_handshake(
553 my_node_id: NodeId,
554 node_id: NodeId,
555 writer: circuit::stream::Writer,
556 reader: circuit::stream::Reader,
557 conn: circuit::Connection,
558) -> Result<(FramedStreamWriter, FramedStreamReader), Error> {
559 log::trace!(my_node_id = my_node_id.0, svrpeer = node_id.0; "read fidl header");
561 reader.read(4, |_| Ok(((), 4))).await.context("reading FIDL header")?;
562 log::trace!(
564 my_node_id = my_node_id.0,
565 svrpeer:? = node_id;
566 "send fidl header"
567 );
568 let handshake = [0, 0, 0, fidl::encoding::MAGIC_NUMBER_INITIAL];
569 writer.write(handshake.len(), |buf| {
570 buf[..handshake.len()].copy_from_slice(&handshake);
571 Ok(handshake.len())
572 })?;
573 let mut conn_stream_reader = FramedStreamReader::from_circuit(reader, conn.clone(), node_id);
574 let mut conn_stream_writer = FramedStreamWriter::from_circuit(writer, 0, conn.clone(), node_id);
575 log::trace!(my_node_id = my_node_id.0, svrpeer = node_id.0; "read config");
577 let (_, mut response) = Config::negotiate(match conn_stream_reader.next().await? {
578 FramedStreamReadResult::Frame(FrameType::Data, mut bytes) => decode_fidl(&mut bytes)?,
579 FramedStreamReadResult::Frame(_, _) => {
580 bail!("Failed to read config response (wrong frame type)")
581 }
582 FramedStreamReadResult::Closed(Some(s)) => {
583 bail!("Failed to read config response (Connection closed: {s})")
584 }
585 FramedStreamReadResult::Closed(None) => {
586 bail!("Failed to read config response (Connection closed)")
587 }
588 });
589 log::trace!(my_node_id = my_node_id.0, svrpeer = node_id.0; "send config");
591 conn_stream_writer.send(FrameType::Data, &encode_fidl(&mut response)?).await?;
592 Ok((conn_stream_writer, conn_stream_reader))
593}
594
595async fn server_conn_stream(
596 node_id: NodeId,
597 writer: circuit::stream::Writer,
598 reader: circuit::stream::Reader,
599 conn: circuit::Connection,
600 router: Weak<Router>,
601) -> Result<(), RunnerError> {
602 let my_node_id = Weak::upgrade(&router).ok_or_else(|| RunnerError::RouterGone)?.node_id();
603 let (conn_stream_writer, mut conn_stream_reader) =
604 server_handshake(my_node_id, node_id, writer, reader, conn)
605 .map_err(|e| RunnerError::HandshakeError { _error: e })
606 .await?;
607
608 loop {
609 log::trace!(my_node_id = my_node_id.0, svrpeer = node_id.0; "await message");
610 let (frame_type, mut bytes) = match conn_stream_reader
611 .next()
612 .await
613 .map_err(|e| RunnerError::ServiceError { _error: e })?
614 {
615 FramedStreamReadResult::Frame(frame_type, bytes) => (frame_type, bytes),
616 FramedStreamReadResult::Closed(s) => return Err(RunnerError::ConnectionClosed(s)),
617 };
618
619 let router = Weak::upgrade(&router).ok_or_else(|| RunnerError::RouterGone)?;
620 match frame_type {
621 FrameType::Hello | FrameType::Control | FrameType::Signal => {
622 return Err(RunnerError::BadFrameType { _frame_type: frame_type });
623 }
624 FrameType::Data => {
625 let msg: PeerMessage =
626 decode_fidl(&mut bytes).map_err(|e| RunnerError::ServiceError { _error: e })?;
627 log::trace!(
628 my_node_id = my_node_id.0,
629 svrpeer = node_id.0;
630 "Got peer request: {:?}",
631 msg
632 );
633 match msg {
634 PeerMessage::ConnectToService(ConnectToService {
635 service_name,
636 stream_ref,
637 rights,
638 options: _,
639 }) => {
640 let app_channel = Channel::from_handle(
641 router
642 .recv_proxied(
643 ZirconHandle::Channel(ChannelHandle { stream_ref, rights }),
644 conn_stream_writer.conn(),
645 )
646 .map_err(|e| RunnerError::ServiceError { _error: e })
647 .await?,
648 );
649 router
650 .service_map()
651 .connect(&service_name, app_channel)
652 .map_err(|e| RunnerError::ServiceError { _error: e })
653 .await?;
654 }
655 PeerMessage::UpdateNodeDescription(PeerDescription { services, .. }) => {
656 router
657 .service_map()
658 .update_node(node_id, services.unwrap_or(vec![]))
659 .map_err(|e| RunnerError::ServiceError { _error: e })
660 .await?;
661 }
662 PeerMessage::OpenTransfer(OpenTransfer {
663 stream_id: StreamId { id: stream_id },
664 transfer_key,
665 }) => {
666 let (tx, rx) = conn_stream_writer
667 .conn()
668 .bind_id(stream_id)
669 .await
670 .map_err(|e| RunnerError::ServiceError { _error: e })?;
671 router
672 .post_transfer(transfer_key, FoundTransfer::Remote(tx, rx))
673 .map_err(|e| RunnerError::ServiceError { _error: e })
674 .await?;
675 }
676 }
677 }
678 }
679 }
680}