1use super::super::handle::{Message, ProxyableHandle, ProxyableRW, ReadValue};
6use super::super::stream::{Frame, StreamReader, StreamWriter, StreamWriterBinder};
7use super::super::{Proxy, ProxyTransferInitiationReceiver, StreamRefSender};
8use crate::labels::{generate_transfer_key, Endpoint, NodeId, TransferKey};
9use crate::peer::{FramedStreamReader, FramedStreamWriter};
10use crate::router::OpenedTransfer;
11use anyhow::{bail, format_err, Error};
12use futures::future::Either;
13use futures::prelude::*;
14use futures::task::{noop_waker_ref, Context, Poll};
15use std::sync::{Arc, Weak};
16use zx_status;
17
18pub(crate) async fn follow<Hdl: 'static + for<'a> ProxyableRW<'a>>(
20 mut proxy: Proxy<Hdl>,
21 initiate_transfer: ProxyTransferInitiationReceiver,
22 stream_writer: StreamWriter<Hdl::Message>,
23 new_destination_node: NodeId,
24 transfer_key: TransferKey,
25 stream_reader: StreamReader<Hdl::Message>,
26) -> Result<(), Error> {
27 futures::future::try_join(stream_reader.expect_shutdown(Ok(())), async move {
28 stream_writer.send_ack_transfer().await?;
29 let hdl = proxy.hdl.take().ok_or_else(|| format_err!("Handle already taken"))?;
30 let router = Weak::upgrade(&hdl.router()).ok_or_else(|| format_err!("Router gone"))?;
31 let hdl = hdl.into_fidl_handle()?;
32 drop(proxy);
33 let r = router.open_transfer(new_destination_node.into(), transfer_key, hdl).await?;
34 match r {
35 OpenedTransfer::Fused => {
36 assert!(initiate_transfer.await.unwrap().is_dropped());
37 Ok(())
38 }
39 OpenedTransfer::Remote(new_writer, new_reader, handle) => {
40 let handle = Hdl::from_fidl_handle(handle)?;
41 make_boxed_main_loop(
42 Proxy::new(handle, Arc::downgrade(&router)),
43 initiate_transfer,
44 new_writer.into(),
45 None,
46 new_reader.into(),
47 )
48 .await?;
49 Ok(())
50 }
51 }
52 })
53 .await?;
54 Ok(())
55}
56
57fn make_boxed_main_loop<Hdl: 'static + for<'a> ProxyableRW<'a>>(
59 proxy: Arc<Proxy<Hdl>>,
60 initiate_transfer: ProxyTransferInitiationReceiver,
61 stream_writer: FramedStreamWriter,
62 initial_stream_reader: Option<FramedStreamReader>,
63 stream_reader: FramedStreamReader,
64) -> std::pin::Pin<Box<dyn Send + Future<Output = Result<(), Error>>>> {
65 super::main::run_main_loop(
66 proxy,
67 initiate_transfer,
68 stream_writer,
69 initial_stream_reader,
70 stream_reader,
71 )
72 .boxed()
73}
74
75pub(crate) async fn initiate<Hdl: 'static + for<'a> ProxyableRW<'a>>(
79 proxy: Proxy<Hdl>,
80 pair: fidl::Handle,
81 mut stream_writer: StreamWriter<Hdl::Message>,
82 mut stream_reader: StreamReader<Hdl::Message>,
83 drain_stream: FramedStreamWriter,
84 stream_ref_sender: StreamRefSender,
85) -> Result<(), Error> {
86 let transfer_key = generate_transfer_key();
87
88 let drain_stream = drain_stream.bind(&proxy.hdl());
89 let drain_stream_id = drain_stream.id();
90 let peer_node_id = drain_stream.conn().peer_node_id();
91
92 futures::future::try_join(
93 drain_handle_to_stream(
94 ProxyableHandle::new(Hdl::from_fidl_handle(pair)?, proxy.hdl().router().clone()),
95 drain_stream,
96 ),
97 async move {
98 let stream_ref_sender = flush_outgoing_messages(
101 &proxy,
102 transfer_key,
103 &mut stream_writer,
104 &mut stream_reader,
105 drain_stream_id,
106 stream_ref_sender,
107 )
108 .await?;
109
110 stream_writer.send_begin_transfer(peer_node_id, transfer_key).await?;
112
113 if let Some(stream_ref_sender) = stream_ref_sender {
114 drain_original_stream(
117 &proxy,
118 transfer_key,
119 stream_writer,
120 stream_reader,
121 drain_stream_id,
122 stream_ref_sender,
123 )
124 .await?;
125 } else {
126 stream_writer.send_ack_transfer().await?;
129 stream_reader.expect_ack_transfer().await?;
130 }
131 Ok(())
132 },
133 )
134 .await?;
135 Ok(())
136}
137
138async fn drain_handle_to_stream<Hdl: 'static + for<'a> ProxyableRW<'a>>(
139 hdl: ProxyableHandle<Hdl>,
140 mut stream_writer: StreamWriter<Hdl::Message>,
141) -> Result<(), Error> {
142 let mut message = Default::default();
143 loop {
144 match hdl.read(&mut message).await {
145 Ok(ReadValue::Message) => stream_writer.send_data(&mut message).await?,
146 Ok(ReadValue::SignalUpdate(signal_update)) => {
147 stream_writer.send_signal(signal_update).await?
148 }
149 Err(zx_status::Status::PEER_CLOSED) => break,
150 Err(x) => return Err(x.into()),
151 }
152 }
153 stream_writer.send_end_transfer().await
154}
155
156#[derive(Debug)]
157enum FlushOutgoingMsg<'a, Msg: Message> {
158 FromChannel,
159 FromStream(Frame<'a, Msg>),
160}
161
162async fn flush_outgoing_messages<Hdl: 'static + for<'a> ProxyableRW<'a>>(
163 proxy: &Proxy<Hdl>,
164 original_transfer_key: TransferKey,
165 stream_writer: &mut StreamWriter<Hdl::Message>,
166 stream_reader: &mut StreamReader<Hdl::Message>,
167 drain_stream_id: u64,
168 stream_ref_sender: StreamRefSender,
169) -> Result<Option<StreamRefSender>, Error> {
170 let mut message = Default::default();
171 let endpoint = stream_reader.conn().endpoint();
172 loop {
173 let msg = match futures::future::select(
174 proxy.read_from_handle(&mut message),
175 stream_reader.next(),
176 )
177 .poll_unpin(&mut Context::from_waker(noop_waker_ref()))
178 {
179 Poll::Pending => return Ok(Some(stream_ref_sender)),
180 Poll::Ready(Either::Left((x, _))) => {
181 x?;
182 FlushOutgoingMsg::FromChannel
183 }
184 Poll::Ready(Either::Right((msg, _))) => FlushOutgoingMsg::FromStream(msg?),
185 };
186 match msg {
187 FlushOutgoingMsg::FromChannel => {
188 stream_writer.send_data(&mut message).await?;
191 }
192 FlushOutgoingMsg::FromStream(Frame::Data(msg)) => {
193 proxy.write_to_handle(msg).await?;
196 }
197 FlushOutgoingMsg::FromStream(Frame::SignalUpdate(signal_update)) => {
198 proxy.apply_signal_update(signal_update)?;
199 }
200 FlushOutgoingMsg::FromStream(Frame::BeginTransfer(
201 new_destination_node,
202 new_transfer_key,
203 )) => {
204 match endpoint {
209 Endpoint::Client => {
210 stream_ref_sender.draining_initiate(
211 drain_stream_id,
212 new_destination_node,
213 new_transfer_key,
214 )?;
215 }
216 Endpoint::Server => {
217 stream_ref_sender.draining_await(drain_stream_id, original_transfer_key)?;
218 }
219 }
220 proxy.drain_handle_to_stream(stream_writer).await?;
221 return Ok(None);
222 }
223 FlushOutgoingMsg::FromStream(Frame::Hello) => {
224 bail!("Hello frame received after stream established")
225 }
226 FlushOutgoingMsg::FromStream(Frame::AckTransfer) => {
227 bail!("AckTransfer received before BeginTransfer sent")
228 }
229 FlushOutgoingMsg::FromStream(Frame::EndTransfer) => {
230 bail!("EndTransfer received on a regular stream")
231 }
232 FlushOutgoingMsg::FromStream(Frame::Shutdown(r)) => {
233 bail!("Stream shutdown during transfer: {:?}", r)
234 }
235 }
236 }
237}
238
239async fn drain_original_stream<Hdl: 'static + for<'a> ProxyableRW<'a>>(
240 proxy: &Proxy<Hdl>,
241 original_transfer_key: TransferKey,
242 stream_writer: StreamWriter<Hdl::Message>,
243 mut stream_reader: StreamReader<Hdl::Message>,
244 drain_stream_id: u64,
245 stream_ref_sender: StreamRefSender,
246) -> Result<(), Error> {
247 let endpoint = stream_reader.conn().endpoint();
248 loop {
249 let r = stream_reader.next().await;
250 match r {
251 Ok(Frame::Hello) => {
252 bail!("Hello frame received after stream established");
253 }
254 Ok(Frame::Data(mut message)) => {
255 proxy.write_to_handle(&mut message).await?;
258 }
259 Ok(Frame::SignalUpdate(signal_update)) => proxy.apply_signal_update(signal_update)?,
260 Ok(Frame::BeginTransfer(new_destination_node, new_transfer_key)) => {
261 match endpoint {
266 Endpoint::Client => {
267 stream_ref_sender.draining_initiate(
268 drain_stream_id,
269 new_destination_node,
270 new_transfer_key,
271 )?;
272 stream_writer.send_ack_transfer().await?;
273 return stream_reader.expect_ack_transfer().await;
274 }
275 Endpoint::Server => {
276 stream_ref_sender.draining_await(drain_stream_id, original_transfer_key)?;
277 stream_writer.send_ack_transfer().await?;
278 return stream_reader.expect_ack_transfer().await;
279 }
280 }
281 }
282 Ok(Frame::AckTransfer) => {
283 stream_writer.send_shutdown(Ok(())).await?;
284 return stream_ref_sender.draining_await(drain_stream_id, original_transfer_key);
285 }
286 Ok(Frame::EndTransfer) => bail!("EndTransfer received on a regular stream"),
287 Ok(Frame::Shutdown(r)) => bail!("Stream shutdown during transfer: {:?}", r),
288 Err(e) => return Err(e),
289 }
290 }
291}