overnet_core/proxy/run/
spawn.rs
1use super::super::handle::{IntoProxied, ProxyableHandle, ProxyableRW};
8use super::super::{Proxy, ProxyTransferInitiationReceiver};
9use crate::handle_info::WithRights;
10use crate::peer::{FramedStreamReader, FramedStreamWriter, PeerConnRef};
11use crate::router::{FoundTransfer, OpenedTransfer, Router};
12use anyhow::{format_err, Error};
13use fidl_fuchsia_overnet_protocol::{StreamId, StreamRef, TransferInitiator, TransferWaiter};
14use std::future::Future;
15use std::sync::Weak;
16
17pub(crate) async fn send<Hdl: 'static + for<'a> ProxyableRW<'a>>(
18 hdl: Hdl,
19 initiate_transfer: ProxyTransferInitiationReceiver,
20 stream_writer: FramedStreamWriter,
21 stream_reader: FramedStreamReader,
22 router: Weak<Router>,
23) -> Result<(), Error> {
24 super::main::run_main_loop(
25 Proxy::new(hdl, router),
26 initiate_transfer,
27 stream_writer,
28 None,
29 stream_reader,
30 )
31 .await
32}
33
34pub(crate) async fn recv<Hdl, CreateType>(
37 create_handles: impl FnOnce() -> Result<(CreateType, CreateType), Error>,
38 rights: CreateType::Rights,
39 initiate_transfer: ProxyTransferInitiationReceiver,
40 stream_ref: StreamRef,
41 conn: PeerConnRef<'_>,
42 router: Weak<Router>,
43) -> Result<(fidl::Handle, Option<impl Send + Future<Output = Result<(), Error>>>), Error>
44where
45 Hdl: 'static + for<'a> ProxyableRW<'a>,
46 CreateType: fidl::HandleBased + IntoProxied<Proxied = Hdl> + std::fmt::Debug + WithRights,
47{
48 Ok(match stream_ref {
49 StreamRef::Creating(StreamId { id: stream_id }) => {
50 let (app_chan, overnet_chan) = create_handles()?;
51 let app_chan = app_chan.with_rights(rights)?;
52 let (stream_writer, stream_reader) = conn.bind_id(stream_id).await?;
53 let overnet_chan = overnet_chan.into_proxied()?;
54 (
55 app_chan.into_handle(),
56 Some(super::main::run_main_loop(
57 Proxy::new(overnet_chan, router),
58 initiate_transfer,
59 stream_writer,
60 None,
61 stream_reader,
62 )),
63 )
64 }
65 StreamRef::TransferInitiator(TransferInitiator {
66 stream_id: StreamId { id: stream_id },
67 new_destination_node,
68 transfer_key,
69 }) => {
70 let (app_chan, overnet_chan) = create_handles()?;
71 let app_chan = app_chan.with_rights(rights)?;
72 let initial_stream_reader: FramedStreamReader =
73 conn.bind_uni_id(stream_id).await?.into();
74 let opened_transfer = Weak::upgrade(&router)
75 .ok_or_else(|| format_err!("No router to handle draining stream ref"))?
76 .open_transfer(
77 new_destination_node.into(),
78 transfer_key,
79 overnet_chan.into_handle(),
80 )
81 .await?;
82 match opened_transfer {
83 OpenedTransfer::Fused => {
84 let app_chan = app_chan.into_proxied()?;
85 (
86 ProxyableHandle::new(app_chan, router)
87 .drain_stream_to_handle(initial_stream_reader)
88 .await?,
89 None,
90 )
91 }
92 OpenedTransfer::Remote(stream_writer, stream_reader, overnet_chan) => (
93 app_chan.into_handle(),
94 Some(super::main::run_main_loop(
95 Proxy::new(Hdl::from_fidl_handle(overnet_chan)?, router),
96 initiate_transfer,
97 stream_writer,
98 Some(initial_stream_reader),
99 stream_reader,
100 )),
101 ),
102 }
103 }
104 StreamRef::TransferWaiter(TransferWaiter {
105 stream_id: StreamId { id: stream_id },
106 transfer_key,
107 }) => {
108 let initial_stream_reader: FramedStreamReader =
109 conn.bind_uni_id(stream_id).await?.into();
110 let found_transfer = Weak::upgrade(&router)
111 .ok_or_else(|| format_err!("No router to handle draining stream ref"))?
112 .find_transfer(transfer_key)
113 .await?;
114 match found_transfer {
115 FoundTransfer::Fused(handle) => {
116 let handle = Hdl::from_fidl_handle(handle)?;
117 (
118 ProxyableHandle::new(handle, router)
119 .drain_stream_to_handle(initial_stream_reader)
120 .await?,
121 None,
122 )
123 }
124 FoundTransfer::Remote(stream_writer, stream_reader) => {
125 let (app_chan, overnet_chan) = create_handles()?;
126 (
127 app_chan.with_rights(rights)?.into_handle(),
128 Some(super::main::run_main_loop(
129 Proxy::new(overnet_chan.into_proxied()?, router),
130 initiate_transfer,
131 stream_writer,
132 Some(initial_stream_reader),
133 stream_reader,
134 )),
135 )
136 }
137 }
138 }
139 })
140}