overnet_core/peer/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod framed_stream;
6
7pub(crate) use self::framed_stream::{
8    FrameType, FramedStreamReadResult, FramedStreamReader, FramedStreamWriter,
9};
10use crate::coding::{decode_fidl, encode_fidl};
11use crate::future_help::Observer;
12use crate::labels::{Endpoint, NodeId, TransferKey};
13use crate::router::{FoundTransfer, Router};
14use anyhow::{bail, format_err, Context as _, Error};
15use fidl::{Channel, HandleBased};
16use fidl_fuchsia_overnet_protocol::{
17    ChannelHandle, ConfigRequest, ConfigResponse, ConnectToService, ConnectToServiceOptions,
18    OpenTransfer, PeerDescription, PeerMessage, PeerReply, StreamId, ZirconHandle,
19};
20use fuchsia_async::{Task, TimeoutExt};
21use futures::channel::{mpsc, oneshot};
22use futures::lock::Mutex;
23use futures::prelude::*;
24use std::sync::{Arc, Weak};
25use std::time::Duration;
26
27#[derive(Debug)]
28struct Config {}
29
30impl Config {
31    fn negotiate(_request: ConfigRequest) -> (Self, ConfigResponse) {
32        (Config {}, ConfigResponse::default())
33    }
34
35    fn from_response(_response: ConfigResponse) -> Self {
36        Config {}
37    }
38}
39
40#[derive(Debug)]
41enum ClientPeerCommand {
42    ConnectToService(ConnectToService),
43    OpenTransfer(u64, TransferKey, oneshot::Sender<()>),
44}
45
46#[derive(Clone)]
47pub(crate) struct PeerConn {
48    conn: circuit::Connection,
49    node_id: NodeId,
50}
51
52impl std::fmt::Debug for PeerConn {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        self.as_ref().fmt(f)
55    }
56}
57
58impl PeerConn {
59    pub fn from_circuit(conn: circuit::Connection, node_id: NodeId) -> Self {
60        PeerConn { conn, node_id }
61    }
62
63    pub fn as_ref(&self) -> PeerConnRef<'_> {
64        PeerConnRef { conn: &self.conn, node_id: self.node_id }
65    }
66
67    pub fn node_id(&self) -> NodeId {
68        self.node_id
69    }
70}
71
72#[derive(Clone, Copy)]
73pub(crate) struct PeerConnRef<'a> {
74    conn: &'a circuit::Connection,
75    node_id: NodeId,
76}
77
78impl<'a> std::fmt::Debug for PeerConnRef<'a> {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        write!(f, "PeerConn({})", self.node_id.0)
81    }
82}
83
84impl<'a> PeerConnRef<'a> {
85    pub fn from_circuit(conn: &'a circuit::Connection, node_id: NodeId) -> Self {
86        PeerConnRef { conn, node_id }
87    }
88
89    pub fn into_peer_conn(&self) -> PeerConn {
90        PeerConn { conn: self.conn.clone(), node_id: self.node_id }
91    }
92
93    pub fn endpoint(&self) -> Endpoint {
94        if self.conn.is_client() {
95            Endpoint::Client
96        } else {
97            Endpoint::Server
98        }
99    }
100
101    pub fn peer_node_id(&self) -> NodeId {
102        self.node_id
103    }
104
105    pub async fn alloc_uni(&self) -> Result<FramedStreamWriter, Error> {
106        let (circuit_reader, writer) = circuit::stream::stream();
107        let (_reader, circuit_writer) = circuit::stream::stream();
108        let id = self.conn.alloc_stream(circuit_reader, circuit_writer).await?;
109        Ok(FramedStreamWriter::from_circuit(writer, id, self.conn.clone(), self.node_id))
110    }
111
112    pub async fn alloc_bidi(&self) -> Result<(FramedStreamWriter, FramedStreamReader), Error> {
113        let (circuit_reader, writer) = circuit::stream::stream();
114        let (reader, circuit_writer) = circuit::stream::stream();
115        let id = self.conn.alloc_stream(circuit_reader, circuit_writer).await?;
116        Ok((
117            FramedStreamWriter::from_circuit(writer, id, self.conn.clone(), self.node_id),
118            FramedStreamReader::from_circuit(reader, self.conn.clone(), self.node_id),
119        ))
120    }
121
122    pub async fn bind_uni_id(&self, id: u64) -> Result<FramedStreamReader, Error> {
123        Ok(FramedStreamReader::from_circuit(
124            self.conn
125                .bind_stream(id)
126                .await
127                .ok_or_else(|| format_err!("Stream id {} unavailable", id))?
128                .0,
129            self.conn.clone(),
130            self.node_id,
131        ))
132    }
133
134    pub async fn bind_id(
135        &self,
136        id: u64,
137    ) -> Result<(FramedStreamWriter, FramedStreamReader), Error> {
138        let (r, w) = self
139            .conn
140            .bind_stream(id)
141            .await
142            .ok_or_else(|| format_err!("Stream id {} unavailable", id))?;
143        Ok((
144            FramedStreamWriter::from_circuit(w, id, self.conn.clone(), self.node_id),
145            FramedStreamReader::from_circuit(r, self.conn.clone(), self.node_id),
146        ))
147    }
148}
149
150pub(crate) struct Peer {
151    endpoint: Endpoint,
152    conn: PeerConn,
153    commands: Option<mpsc::Sender<ClientPeerCommand>>,
154    _task: Task<()>,
155}
156
157impl std::fmt::Debug for Peer {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        self.debug_id().fmt(f)
160    }
161}
162
163/// Error from the run loops for a peer (client or server) - captures a little semantic detail
164/// to help direct reactions to this peer disappearing.
165#[derive(Debug)]
166enum RunnerError {
167    RouterGone,
168    ConnectionClosed(Option<String>),
169    BadFrameType { _frame_type: FrameType },
170    HandshakeError { _error: Error },
171    ServiceError { _error: Error },
172}
173
174impl Peer {
175    pub(crate) fn node_id(&self) -> NodeId {
176        self.conn.node_id()
177    }
178
179    pub(crate) fn debug_id(&self) -> impl std::fmt::Debug + std::cmp::PartialEq {
180        (self.node_id(), self.endpoint)
181    }
182
183    /// Construct a new client peer - spawns tasks to handle making control stream requests, and
184    /// publishing link metadata
185    pub(crate) fn new_circuit_client(
186        conn: circuit::Connection,
187        conn_stream_writer: circuit::stream::Writer,
188        conn_stream_reader: circuit::stream::Reader,
189        service_observer: Observer<Vec<String>>,
190        router: &Arc<Router>,
191        peer_node_id: NodeId,
192    ) -> Result<Arc<Self>, Error> {
193        let node_id =
194            NodeId::from_circuit_string(conn.from()).map_err(|_| format_err!("Invalid node ID"))?;
195        log::trace!(node_id = router.node_id().0, peer = node_id.0; "NEW CLIENT",);
196        let (command_sender, command_receiver) = mpsc::channel(1);
197        let weak_router = Arc::downgrade(router);
198
199        let client_conn_fut = client_conn_stream(
200            Arc::downgrade(router),
201            node_id,
202            conn_stream_writer,
203            conn_stream_reader,
204            conn.clone(),
205            command_receiver,
206            service_observer,
207        );
208
209        Ok(Arc::new(Self {
210            endpoint: Endpoint::Client,
211            commands: Some(command_sender),
212            _task: Task::spawn(Peer::runner(Endpoint::Client, weak_router.clone(), async move {
213                let result = client_conn_fut.await;
214                if let Some(router) = weak_router.upgrade() {
215                    router.client_closed(peer_node_id).await;
216                }
217                result
218            })),
219            conn: PeerConn::from_circuit(conn, node_id),
220        }))
221    }
222
223    /// Construct a new server peer - spawns tasks to handle responding to control stream requests
224    pub(crate) async fn new_circuit_server(
225        conn: circuit::Connection,
226        router: &Arc<Router>,
227    ) -> Result<(), Error> {
228        let node_id =
229            NodeId::from_circuit_string(conn.from()).map_err(|_| format_err!("Invalid node ID"))?;
230        log::trace!(node_id = router.node_id().0, peer = node_id.0; "NEW SERVER",);
231        let (conn_stream_reader, conn_stream_writer) = conn
232            .bind_stream(0)
233            .await
234            .ok_or_else(|| format_err!("Could not establish connection"))?;
235        Task::spawn(Peer::runner(
236            Endpoint::Server,
237            Arc::downgrade(router),
238            server_conn_stream(
239                node_id,
240                conn_stream_writer,
241                conn_stream_reader,
242                conn.clone(),
243                Arc::downgrade(router),
244            ),
245        ))
246        .detach();
247        Ok(())
248    }
249
250    async fn runner(
251        endpoint: Endpoint,
252        router: Weak<Router>,
253        f: impl Future<Output = Result<(), RunnerError>>,
254    ) {
255        let result = f.await;
256        let get_router_node_id = || {
257            Weak::upgrade(&router).map(|r| format!("{:?}", r.node_id())).unwrap_or_else(String::new)
258        };
259        if let Err(e) = &result {
260            match e {
261                RunnerError::ConnectionClosed(Some(s)) => log::debug!(
262                    node_id:% = get_router_node_id(),
263                    endpoint:?;
264                    "connection closed (reason: {s})"
265                ),
266                RunnerError::ConnectionClosed(None) => log::debug!(
267                    node_id:% = get_router_node_id(),
268                    endpoint:?;
269                    "connection closed"
270                ),
271                _ => log::warn!(
272                    node_id:% = get_router_node_id(),
273                    endpoint:?;
274                    "runner error: {:?}",
275                    e
276                ),
277            }
278        } else {
279            log::trace!(
280                node_id:% = get_router_node_id(),
281                endpoint:?;
282                "finished successfully",
283            );
284        }
285    }
286
287    pub async fn new_stream(
288        &self,
289        service: &str,
290        chan: Channel,
291        router: &Arc<Router>,
292    ) -> Result<(), Error> {
293        if let ZirconHandle::Channel(ChannelHandle { stream_ref, rights }) =
294            router.send_proxied(chan.into_handle(), self.peer_conn_ref()).await?
295        {
296            self.commands
297                .as_ref()
298                .unwrap()
299                .clone()
300                .send(ClientPeerCommand::ConnectToService(ConnectToService {
301                    service_name: service.to_string(),
302                    stream_ref,
303                    rights,
304                    options: ConnectToServiceOptions::default(),
305                }))
306                .await?;
307            Ok(())
308        } else {
309            unreachable!();
310        }
311    }
312
313    pub async fn send_open_transfer(
314        &self,
315        transfer_key: TransferKey,
316    ) -> Option<(FramedStreamWriter, FramedStreamReader)> {
317        let io = self.peer_conn_ref().alloc_bidi().await.ok()?;
318        let (tx, rx) = oneshot::channel();
319        self.commands
320            .as_ref()
321            .unwrap()
322            .clone()
323            .send(ClientPeerCommand::OpenTransfer(io.0.id(), transfer_key, tx))
324            .await
325            .ok()?;
326        rx.await.ok()?;
327        Some(io)
328    }
329
330    fn peer_conn_ref(&self) -> PeerConnRef<'_> {
331        self.conn.as_ref()
332    }
333}
334
335const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
336
337async fn client_handshake(
338    my_node_id: NodeId,
339    peer_node_id: NodeId,
340    writer: circuit::stream::Writer,
341    reader: circuit::stream::Reader,
342    conn: circuit::Connection,
343) -> Result<(FramedStreamWriter, FramedStreamReader), Error> {
344    log::trace!(
345        my_node_id = my_node_id.0,
346        clipeer = peer_node_id.0;
347        "client connection stream started",
348    );
349    // Send FIDL header
350    log::trace!(
351        my_node_id = my_node_id.0,
352        clipeer:? = peer_node_id;
353        "send fidl header"
354    );
355    let msg = [0, 0, 0, fidl::encoding::MAGIC_NUMBER_INITIAL];
356    writer.write(msg.len(), |buf| {
357        buf[..msg.len()].copy_from_slice(&msg);
358        Ok(msg.len())
359    })?;
360    async move {
361        log::trace!(my_node_id = my_node_id.0, clipeer = peer_node_id.0; "send config request");
362        // Send config request
363        let mut conn_stream_writer =
364            FramedStreamWriter::from_circuit(writer, 0, conn.clone(), peer_node_id);
365        let conn_stream_reader_fut = async move {
366            // Receive FIDL header
367            log::trace!(my_node_id = my_node_id.0, clipeer = peer_node_id.0; "read fidl header");
368            reader.read(4, |_| Ok(((), 4))).await?;
369            Result::<_, Error>::Ok(FramedStreamReader::from_circuit(reader, conn, peer_node_id))
370        }
371        .boxed();
372
373        conn_stream_writer
374            .send(FrameType::Data, &encode_fidl(&mut ConfigRequest::default().clone())?)
375            .await?;
376        // Await config response
377        log::trace!(my_node_id = my_node_id.0, clipeer = peer_node_id.0; "read config");
378        let mut conn_stream_reader = conn_stream_reader_fut.await?;
379        let _ = Config::from_response(match conn_stream_reader.next().await? {
380            FramedStreamReadResult::Frame(FrameType::Data, mut bytes) => decode_fidl(&mut bytes)?,
381            FramedStreamReadResult::Frame(_, _) => {
382                bail!("Failed to read config response (wrong frame type)")
383            }
384            FramedStreamReadResult::Closed(Some(s)) => {
385                bail!("Failed to read config response ({s})")
386            }
387            FramedStreamReadResult::Closed(None) => bail!("Failed to read config response"),
388        });
389        log::trace!(my_node_id = my_node_id.0, clipeer = peer_node_id.0; "handshake completed");
390
391        Ok((conn_stream_writer, conn_stream_reader))
392    }
393    .on_timeout(QUIC_CONNECTION_TIMEOUT, || Err(format_err!("timeout performing handshake")))
394    .await
395}
396
397struct TrackClientConnection {
398    router: Weak<Router>,
399    node_id: NodeId,
400}
401
402impl TrackClientConnection {
403    async fn new(router: &Arc<Router>, node_id: NodeId) -> TrackClientConnection {
404        router.service_map().add_client_connection(node_id).await;
405        TrackClientConnection { router: Arc::downgrade(router), node_id }
406    }
407}
408
409impl Drop for TrackClientConnection {
410    fn drop(&mut self) {
411        if let Some(router) = Weak::upgrade(&self.router) {
412            let node_id = self.node_id;
413            Task::spawn(
414                async move { router.service_map().remove_client_connection(node_id).await },
415            )
416            .detach();
417        }
418    }
419}
420
421async fn client_conn_stream(
422    router: Weak<Router>,
423    peer_node_id: NodeId,
424    writer: circuit::stream::Writer,
425    reader: circuit::stream::Reader,
426    conn: circuit::Connection,
427    mut commands: mpsc::Receiver<ClientPeerCommand>,
428    mut services: Observer<Vec<String>>,
429) -> Result<(), RunnerError> {
430    let get_router = move || Weak::upgrade(&router).ok_or_else(|| RunnerError::RouterGone);
431    let my_node_id = get_router()?.node_id();
432
433    let (conn_stream_writer, mut conn_stream_reader) =
434        client_handshake(my_node_id, peer_node_id, writer, reader, conn)
435            .map_err(|e| RunnerError::HandshakeError { _error: e })
436            .await?;
437
438    let _track_connection = TrackClientConnection::new(&get_router()?, peer_node_id).await;
439
440    let conn_stream_writer = &Mutex::new(conn_stream_writer);
441
442    let _: ((), (), ()) = futures::future::try_join3(
443        async move {
444            while let Some(command) = commands.next().await {
445                log::trace!(
446                    my_node_id = my_node_id.0,
447                    clipeer = peer_node_id.0;
448                    "handle command: {:?}",
449                    command
450                );
451                client_conn_handle_command(command, &mut *conn_stream_writer.lock().await).await?;
452            }
453            log::trace!(my_node_id = my_node_id.0, clipeer = peer_node_id.0; "done commands");
454            Ok(())
455        }
456        .map_err(|e| RunnerError::ServiceError { _error: e }),
457        async move {
458            loop {
459                let (frame_type, mut bytes) = match conn_stream_reader
460                    .next()
461                    .await
462                    .map_err(|e| RunnerError::ServiceError { _error: e })?
463                {
464                    FramedStreamReadResult::Frame(frame_type, bytes) => (frame_type, bytes),
465                    FramedStreamReadResult::Closed(s) => {
466                        return Err(RunnerError::ConnectionClosed(s))
467                    }
468                };
469                match frame_type {
470                    FrameType::Hello | FrameType::Control | FrameType::Signal => {
471                        return Err(RunnerError::BadFrameType { _frame_type: frame_type });
472                    }
473                    FrameType::Data => {
474                        client_conn_handle_incoming_frame(my_node_id, peer_node_id, &mut bytes)
475                            .await
476                            .map_err(|e| RunnerError::ServiceError { _error: e })?;
477                    }
478                }
479            }
480        },
481        async move {
482            loop {
483                let services = services.next().await;
484                log::trace!(
485                    my_node_id = my_node_id.0,
486                    clipeer = peer_node_id.0;
487                    "Send update node description with services: {:?}",
488                    services
489                );
490                conn_stream_writer
491                    .lock()
492                    .await
493                    .send(
494                        FrameType::Data,
495                        &encode_fidl(&mut PeerMessage::UpdateNodeDescription(PeerDescription {
496                            services,
497                            ..Default::default()
498                        }))?,
499                    )
500                    .await?;
501            }
502        }
503        .map_err(|e| RunnerError::ServiceError { _error: e }),
504    )
505    .await?;
506
507    Ok(())
508}
509
510async fn client_conn_handle_command(
511    command: ClientPeerCommand,
512    conn_stream_writer: &mut FramedStreamWriter,
513) -> Result<(), Error> {
514    match command {
515        ClientPeerCommand::ConnectToService(conn) => {
516            conn_stream_writer
517                .send(FrameType::Data, &encode_fidl(&mut PeerMessage::ConnectToService(conn))?)
518                .await?;
519        }
520        ClientPeerCommand::OpenTransfer(stream_id, transfer_key, sent) => {
521            conn_stream_writer
522                .send(
523                    FrameType::Data,
524                    &encode_fidl(&mut PeerMessage::OpenTransfer(OpenTransfer {
525                        stream_id: StreamId { id: stream_id },
526                        transfer_key,
527                    }))?,
528                )
529                .await?;
530            let _ = sent.send(());
531        }
532    }
533    Ok(())
534}
535
536async fn client_conn_handle_incoming_frame(
537    my_node_id: NodeId,
538    peer_node_id: NodeId,
539    bytes: &mut [u8],
540) -> Result<(), Error> {
541    let msg: PeerReply = decode_fidl(bytes)?;
542    log::trace!(my_node_id = my_node_id.0, clipeer = peer_node_id.0; "got reply {:?}", msg);
543    match msg {
544        PeerReply::UpdateLinkStatusAck(_) => {
545            // XXX(raggi): prior code attempted to send to a None under a lock
546            // here, seemingly unused, but may have influenced total ordering?
547        }
548    }
549    Ok(())
550}
551
552async fn server_handshake(
553    my_node_id: NodeId,
554    node_id: NodeId,
555    writer: circuit::stream::Writer,
556    reader: circuit::stream::Reader,
557    conn: circuit::Connection,
558) -> Result<(FramedStreamWriter, FramedStreamReader), Error> {
559    // Receive FIDL header
560    log::trace!(my_node_id = my_node_id.0, svrpeer = node_id.0; "read fidl header");
561    reader.read(4, |_| Ok(((), 4))).await.context("reading FIDL header")?;
562    // Send FIDL header
563    log::trace!(
564        my_node_id = my_node_id.0,
565        svrpeer:? = node_id;
566        "send fidl header"
567    );
568    let handshake = [0, 0, 0, fidl::encoding::MAGIC_NUMBER_INITIAL];
569    writer.write(handshake.len(), |buf| {
570        buf[..handshake.len()].copy_from_slice(&handshake);
571        Ok(handshake.len())
572    })?;
573    let mut conn_stream_reader = FramedStreamReader::from_circuit(reader, conn.clone(), node_id);
574    let mut conn_stream_writer = FramedStreamWriter::from_circuit(writer, 0, conn.clone(), node_id);
575    // Await config request
576    log::trace!(my_node_id = my_node_id.0, svrpeer = node_id.0; "read config");
577    let (_, mut response) = Config::negotiate(match conn_stream_reader.next().await? {
578        FramedStreamReadResult::Frame(FrameType::Data, mut bytes) => decode_fidl(&mut bytes)?,
579        FramedStreamReadResult::Frame(_, _) => {
580            bail!("Failed to read config response (wrong frame type)")
581        }
582        FramedStreamReadResult::Closed(Some(s)) => {
583            bail!("Failed to read config response (Connection closed: {s})")
584        }
585        FramedStreamReadResult::Closed(None) => {
586            bail!("Failed to read config response (Connection closed)")
587        }
588    });
589    // Send config response
590    log::trace!(my_node_id = my_node_id.0, svrpeer = node_id.0; "send config");
591    conn_stream_writer.send(FrameType::Data, &encode_fidl(&mut response)?).await?;
592    Ok((conn_stream_writer, conn_stream_reader))
593}
594
595async fn server_conn_stream(
596    node_id: NodeId,
597    writer: circuit::stream::Writer,
598    reader: circuit::stream::Reader,
599    conn: circuit::Connection,
600    router: Weak<Router>,
601) -> Result<(), RunnerError> {
602    let my_node_id = Weak::upgrade(&router).ok_or_else(|| RunnerError::RouterGone)?.node_id();
603    let (conn_stream_writer, mut conn_stream_reader) =
604        server_handshake(my_node_id, node_id, writer, reader, conn)
605            .map_err(|e| RunnerError::HandshakeError { _error: e })
606            .await?;
607
608    loop {
609        log::trace!(my_node_id = my_node_id.0, svrpeer = node_id.0; "await message");
610        let (frame_type, mut bytes) = match conn_stream_reader
611            .next()
612            .await
613            .map_err(|e| RunnerError::ServiceError { _error: e })?
614        {
615            FramedStreamReadResult::Frame(frame_type, bytes) => (frame_type, bytes),
616            FramedStreamReadResult::Closed(s) => return Err(RunnerError::ConnectionClosed(s)),
617        };
618
619        let router = Weak::upgrade(&router).ok_or_else(|| RunnerError::RouterGone)?;
620        match frame_type {
621            FrameType::Hello | FrameType::Control | FrameType::Signal => {
622                return Err(RunnerError::BadFrameType { _frame_type: frame_type });
623            }
624            FrameType::Data => {
625                let msg: PeerMessage =
626                    decode_fidl(&mut bytes).map_err(|e| RunnerError::ServiceError { _error: e })?;
627                log::trace!(
628                    my_node_id = my_node_id.0,
629                    svrpeer = node_id.0;
630                    "Got peer request: {:?}",
631                    msg
632                );
633                match msg {
634                    PeerMessage::ConnectToService(ConnectToService {
635                        service_name,
636                        stream_ref,
637                        rights,
638                        options: _,
639                    }) => {
640                        let app_channel = Channel::from_handle(
641                            router
642                                .recv_proxied(
643                                    ZirconHandle::Channel(ChannelHandle { stream_ref, rights }),
644                                    conn_stream_writer.conn(),
645                                )
646                                .map_err(|e| RunnerError::ServiceError { _error: e })
647                                .await?,
648                        );
649                        router
650                            .service_map()
651                            .connect(&service_name, app_channel)
652                            .map_err(|e| RunnerError::ServiceError { _error: e })
653                            .await?;
654                    }
655                    PeerMessage::UpdateNodeDescription(PeerDescription { services, .. }) => {
656                        router
657                            .service_map()
658                            .update_node(node_id, services.unwrap_or(vec![]))
659                            .map_err(|e| RunnerError::ServiceError { _error: e })
660                            .await?;
661                    }
662                    PeerMessage::OpenTransfer(OpenTransfer {
663                        stream_id: StreamId { id: stream_id },
664                        transfer_key,
665                    }) => {
666                        let (tx, rx) = conn_stream_writer
667                            .conn()
668                            .bind_id(stream_id)
669                            .await
670                            .map_err(|e| RunnerError::ServiceError { _error: e })?;
671                        router
672                            .post_transfer(transfer_key, FoundTransfer::Remote(tx, rx))
673                            .map_err(|e| RunnerError::ServiceError { _error: e })
674                            .await?;
675                    }
676                }
677            }
678        }
679    }
680}