overnet_core/proxy/run/
spawn.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Factory functions for proxies - one each for sending a handle and receiving a handle.
6
7use super::super::handle::{IntoProxied, ProxyableHandle, ProxyableRW};
8use super::super::{Proxy, ProxyTransferInitiationReceiver};
9use crate::handle_info::WithRights;
10use crate::peer::{FramedStreamReader, FramedStreamWriter, PeerConnRef};
11use crate::router::{FoundTransfer, OpenedTransfer, Router};
12use anyhow::{format_err, Error};
13use fidl_fuchsia_overnet_protocol::{StreamId, StreamRef, TransferInitiator, TransferWaiter};
14use std::future::Future;
15use std::sync::Weak;
16
17pub(crate) async fn send<Hdl: 'static + for<'a> ProxyableRW<'a>>(
18    hdl: Hdl,
19    initiate_transfer: ProxyTransferInitiationReceiver,
20    stream_writer: FramedStreamWriter,
21    stream_reader: FramedStreamReader,
22    router: Weak<Router>,
23) -> Result<(), Error> {
24    super::main::run_main_loop(
25        Proxy::new(hdl, router),
26        initiate_transfer,
27        stream_writer,
28        None,
29        stream_reader,
30    )
31    .await
32}
33
34// Start receiving from some stream for a channel.
35// Returns a handle, and an optional future that runs the proxying activity (or none if no proxying is occurring).
36pub(crate) async fn recv<Hdl, CreateType>(
37    create_handles: impl FnOnce() -> Result<(CreateType, CreateType), Error>,
38    rights: CreateType::Rights,
39    initiate_transfer: ProxyTransferInitiationReceiver,
40    stream_ref: StreamRef,
41    conn: PeerConnRef<'_>,
42    router: Weak<Router>,
43) -> Result<(fidl::Handle, Option<impl Send + Future<Output = Result<(), Error>>>), Error>
44where
45    Hdl: 'static + for<'a> ProxyableRW<'a>,
46    CreateType: fidl::HandleBased + IntoProxied<Proxied = Hdl> + std::fmt::Debug + WithRights,
47{
48    Ok(match stream_ref {
49        StreamRef::Creating(StreamId { id: stream_id }) => {
50            let (app_chan, overnet_chan) = create_handles()?;
51            let app_chan = app_chan.with_rights(rights)?;
52            let (stream_writer, stream_reader) = conn.bind_id(stream_id).await?;
53            let overnet_chan = overnet_chan.into_proxied()?;
54            (
55                app_chan.into_handle(),
56                Some(super::main::run_main_loop(
57                    Proxy::new(overnet_chan, router),
58                    initiate_transfer,
59                    stream_writer,
60                    None,
61                    stream_reader,
62                )),
63            )
64        }
65        StreamRef::TransferInitiator(TransferInitiator {
66            stream_id: StreamId { id: stream_id },
67            new_destination_node,
68            transfer_key,
69        }) => {
70            let (app_chan, overnet_chan) = create_handles()?;
71            let app_chan = app_chan.with_rights(rights)?;
72            let initial_stream_reader: FramedStreamReader =
73                conn.bind_uni_id(stream_id).await?.into();
74            let opened_transfer = Weak::upgrade(&router)
75                .ok_or_else(|| format_err!("No router to handle draining stream ref"))?
76                .open_transfer(
77                    new_destination_node.into(),
78                    transfer_key,
79                    overnet_chan.into_handle(),
80                )
81                .await?;
82            match opened_transfer {
83                OpenedTransfer::Fused => {
84                    let app_chan = app_chan.into_proxied()?;
85                    (
86                        ProxyableHandle::new(app_chan, router)
87                            .drain_stream_to_handle(initial_stream_reader)
88                            .await?,
89                        None,
90                    )
91                }
92                OpenedTransfer::Remote(stream_writer, stream_reader, overnet_chan) => (
93                    app_chan.into_handle(),
94                    Some(super::main::run_main_loop(
95                        Proxy::new(Hdl::from_fidl_handle(overnet_chan)?, router),
96                        initiate_transfer,
97                        stream_writer,
98                        Some(initial_stream_reader),
99                        stream_reader,
100                    )),
101                ),
102            }
103        }
104        StreamRef::TransferWaiter(TransferWaiter {
105            stream_id: StreamId { id: stream_id },
106            transfer_key,
107        }) => {
108            let initial_stream_reader: FramedStreamReader =
109                conn.bind_uni_id(stream_id).await?.into();
110            let found_transfer = Weak::upgrade(&router)
111                .ok_or_else(|| format_err!("No router to handle draining stream ref"))?
112                .find_transfer(transfer_key)
113                .await?;
114            match found_transfer {
115                FoundTransfer::Fused(handle) => {
116                    let handle = Hdl::from_fidl_handle(handle)?;
117                    (
118                        ProxyableHandle::new(handle, router)
119                            .drain_stream_to_handle(initial_stream_reader)
120                            .await?,
121                        None,
122                    )
123                }
124                FoundTransfer::Remote(stream_writer, stream_reader) => {
125                    let (app_chan, overnet_chan) = create_handles()?;
126                    (
127                        app_chan.with_rights(rights)?.into_handle(),
128                        Some(super::main::run_main_loop(
129                            Proxy::new(overnet_chan.into_proxied()?, router),
130                            initiate_transfer,
131                            stream_writer,
132                            Some(initial_stream_reader),
133                            stream_reader,
134                        )),
135                    )
136                }
137            }
138        }
139    })
140}