overnet_core/proxy/run/
spawn.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Factory functions for proxies - one each for sending a handle and receiving a handle.
6
7use super::super::handle::{IntoProxied, ProxyableHandle, ProxyableRW};
8use super::super::{Proxy, ProxyTransferInitiationReceiver};
9use crate::handle_info::WithRights;
10use crate::peer::{FramedStreamReader, FramedStreamWriter, PeerConnRef};
11use crate::router::{FoundTransfer, OpenedTransfer, Router};
12use anyhow::{Error, format_err};
13use fidl_fuchsia_overnet_protocol::{StreamId, StreamRef, TransferInitiator, TransferWaiter};
14use std::future::Future;
15use std::sync::Weak;
16
17pub(crate) async fn send<Hdl: 'static + for<'a> ProxyableRW<'a>>(
18    hdl: Hdl,
19    initiate_transfer: ProxyTransferInitiationReceiver,
20    stream_writer: FramedStreamWriter,
21    stream_reader: FramedStreamReader,
22    router: Weak<Router>,
23) -> Result<(), Error> {
24    super::main::run_main_loop(
25        Proxy::new(hdl, router),
26        initiate_transfer,
27        stream_writer,
28        None,
29        stream_reader,
30    )
31    .await
32}
33
34// Start receiving from some stream for a channel.
35// Returns a handle, and an optional future that runs the proxying activity (or none if no proxying is occurring).
36pub(crate) async fn recv<Hdl, CreateType>(
37    create_handles: impl FnOnce() -> Result<(CreateType, CreateType), Error>,
38    rights: CreateType::Rights,
39    initiate_transfer: ProxyTransferInitiationReceiver,
40    stream_ref: StreamRef,
41    conn: PeerConnRef<'_>,
42    router: Weak<Router>,
43) -> Result<
44    (fidl::NullableHandle, Option<impl Send + Future<Output = Result<(), Error>> + 'static>),
45    Error,
46>
47where
48    Hdl: 'static + for<'a> ProxyableRW<'a>,
49    CreateType: fidl::HandleBased + IntoProxied<Proxied = Hdl> + std::fmt::Debug + WithRights,
50{
51    Ok(match stream_ref {
52        StreamRef::Creating(StreamId { id: stream_id }) => {
53            let (app_chan, overnet_chan) = create_handles()?;
54            let app_chan = app_chan.with_rights(rights)?;
55            let (stream_writer, stream_reader) = conn.bind_id(stream_id).await?;
56            let overnet_chan = overnet_chan.into_proxied()?;
57            (
58                app_chan.into_handle(),
59                Some(super::main::run_main_loop(
60                    Proxy::new(overnet_chan, router),
61                    initiate_transfer,
62                    stream_writer,
63                    None,
64                    stream_reader,
65                )),
66            )
67        }
68        StreamRef::TransferInitiator(TransferInitiator {
69            stream_id: StreamId { id: stream_id },
70            new_destination_node,
71            transfer_key,
72        }) => {
73            let (app_chan, overnet_chan) = create_handles()?;
74            let app_chan = app_chan.with_rights(rights)?;
75            let initial_stream_reader: FramedStreamReader =
76                conn.bind_uni_id(stream_id).await?.into();
77            let opened_transfer = Weak::upgrade(&router)
78                .ok_or_else(|| format_err!("No router to handle draining stream ref"))?
79                .open_transfer(
80                    new_destination_node.into(),
81                    transfer_key,
82                    overnet_chan.into_handle(),
83                )
84                .await?;
85            match opened_transfer {
86                OpenedTransfer::Fused => {
87                    let app_chan = app_chan.into_proxied()?;
88                    (
89                        ProxyableHandle::new(app_chan, router)
90                            .drain_stream_to_handle(initial_stream_reader)
91                            .await?,
92                        None,
93                    )
94                }
95                OpenedTransfer::Remote(stream_writer, stream_reader, overnet_chan) => (
96                    app_chan.into_handle(),
97                    Some(super::main::run_main_loop(
98                        Proxy::new(Hdl::from_fidl_handle(overnet_chan)?, router),
99                        initiate_transfer,
100                        stream_writer,
101                        Some(initial_stream_reader),
102                        stream_reader,
103                    )),
104                ),
105            }
106        }
107        StreamRef::TransferWaiter(TransferWaiter {
108            stream_id: StreamId { id: stream_id },
109            transfer_key,
110        }) => {
111            let initial_stream_reader: FramedStreamReader =
112                conn.bind_uni_id(stream_id).await?.into();
113            let found_transfer = Weak::upgrade(&router)
114                .ok_or_else(|| format_err!("No router to handle draining stream ref"))?
115                .find_transfer(transfer_key)
116                .await?;
117            match found_transfer {
118                FoundTransfer::Fused(handle) => {
119                    let handle = Hdl::from_fidl_handle(handle)?;
120                    (
121                        ProxyableHandle::new(handle, router)
122                            .drain_stream_to_handle(initial_stream_reader)
123                            .await?,
124                        None,
125                    )
126                }
127                FoundTransfer::Remote(stream_writer, stream_reader) => {
128                    let (app_chan, overnet_chan) = create_handles()?;
129                    (
130                        app_chan.with_rights(rights)?.into_handle(),
131                        Some(super::main::run_main_loop(
132                            Proxy::new(overnet_chan.into_proxied()?, router),
133                            initiate_transfer,
134                            stream_writer,
135                            Some(initial_stream_reader),
136                            stream_reader,
137                        )),
138                    )
139                }
140            }
141        }
142    })
143}