overnet_core/proxy/run/
spawn.rs1use super::super::handle::{IntoProxied, ProxyableHandle, ProxyableRW};
8use super::super::{Proxy, ProxyTransferInitiationReceiver};
9use crate::handle_info::WithRights;
10use crate::peer::{FramedStreamReader, FramedStreamWriter, PeerConnRef};
11use crate::router::{FoundTransfer, OpenedTransfer, Router};
12use anyhow::{Error, format_err};
13use fidl_fuchsia_overnet_protocol::{StreamId, StreamRef, TransferInitiator, TransferWaiter};
14use std::future::Future;
15use std::sync::Weak;
16
17pub(crate) async fn send<Hdl: 'static + for<'a> ProxyableRW<'a>>(
18 hdl: Hdl,
19 initiate_transfer: ProxyTransferInitiationReceiver,
20 stream_writer: FramedStreamWriter,
21 stream_reader: FramedStreamReader,
22 router: Weak<Router>,
23) -> Result<(), Error> {
24 super::main::run_main_loop(
25 Proxy::new(hdl, router),
26 initiate_transfer,
27 stream_writer,
28 None,
29 stream_reader,
30 )
31 .await
32}
33
34pub(crate) async fn recv<Hdl, CreateType>(
37 create_handles: impl FnOnce() -> Result<(CreateType, CreateType), Error>,
38 rights: CreateType::Rights,
39 initiate_transfer: ProxyTransferInitiationReceiver,
40 stream_ref: StreamRef,
41 conn: PeerConnRef<'_>,
42 router: Weak<Router>,
43) -> Result<
44 (fidl::NullableHandle, Option<impl Send + Future<Output = Result<(), Error>> + 'static>),
45 Error,
46>
47where
48 Hdl: 'static + for<'a> ProxyableRW<'a>,
49 CreateType: fidl::HandleBased + IntoProxied<Proxied = Hdl> + std::fmt::Debug + WithRights,
50{
51 Ok(match stream_ref {
52 StreamRef::Creating(StreamId { id: stream_id }) => {
53 let (app_chan, overnet_chan) = create_handles()?;
54 let app_chan = app_chan.with_rights(rights)?;
55 let (stream_writer, stream_reader) = conn.bind_id(stream_id).await?;
56 let overnet_chan = overnet_chan.into_proxied()?;
57 (
58 app_chan.into_handle(),
59 Some(super::main::run_main_loop(
60 Proxy::new(overnet_chan, router),
61 initiate_transfer,
62 stream_writer,
63 None,
64 stream_reader,
65 )),
66 )
67 }
68 StreamRef::TransferInitiator(TransferInitiator {
69 stream_id: StreamId { id: stream_id },
70 new_destination_node,
71 transfer_key,
72 }) => {
73 let (app_chan, overnet_chan) = create_handles()?;
74 let app_chan = app_chan.with_rights(rights)?;
75 let initial_stream_reader: FramedStreamReader =
76 conn.bind_uni_id(stream_id).await?.into();
77 let opened_transfer = Weak::upgrade(&router)
78 .ok_or_else(|| format_err!("No router to handle draining stream ref"))?
79 .open_transfer(
80 new_destination_node.into(),
81 transfer_key,
82 overnet_chan.into_handle(),
83 )
84 .await?;
85 match opened_transfer {
86 OpenedTransfer::Fused => {
87 let app_chan = app_chan.into_proxied()?;
88 (
89 ProxyableHandle::new(app_chan, router)
90 .drain_stream_to_handle(initial_stream_reader)
91 .await?,
92 None,
93 )
94 }
95 OpenedTransfer::Remote(stream_writer, stream_reader, overnet_chan) => (
96 app_chan.into_handle(),
97 Some(super::main::run_main_loop(
98 Proxy::new(Hdl::from_fidl_handle(overnet_chan)?, router),
99 initiate_transfer,
100 stream_writer,
101 Some(initial_stream_reader),
102 stream_reader,
103 )),
104 ),
105 }
106 }
107 StreamRef::TransferWaiter(TransferWaiter {
108 stream_id: StreamId { id: stream_id },
109 transfer_key,
110 }) => {
111 let initial_stream_reader: FramedStreamReader =
112 conn.bind_uni_id(stream_id).await?.into();
113 let found_transfer = Weak::upgrade(&router)
114 .ok_or_else(|| format_err!("No router to handle draining stream ref"))?
115 .find_transfer(transfer_key)
116 .await?;
117 match found_transfer {
118 FoundTransfer::Fused(handle) => {
119 let handle = Hdl::from_fidl_handle(handle)?;
120 (
121 ProxyableHandle::new(handle, router)
122 .drain_stream_to_handle(initial_stream_reader)
123 .await?,
124 None,
125 )
126 }
127 FoundTransfer::Remote(stream_writer, stream_reader) => {
128 let (app_chan, overnet_chan) = create_handles()?;
129 (
130 app_chan.with_rights(rights)?.into_handle(),
131 Some(super::main::run_main_loop(
132 Proxy::new(overnet_chan.into_proxied()?, router),
133 initiate_transfer,
134 stream_writer,
135 Some(initial_stream_reader),
136 stream_reader,
137 )),
138 )
139 }
140 }
141 }
142 })
143}