1use super::super::handle::{Message, ProxyableHandle, ProxyableRW, ReadValue};
6use super::super::stream::{Frame, StreamReader, StreamWriter, StreamWriterBinder};
7use super::super::{Proxy, ProxyTransferInitiationReceiver, StreamRefSender};
8use crate::labels::{Endpoint, NodeId, TransferKey, generate_transfer_key};
9use crate::peer::{FramedStreamReader, FramedStreamWriter};
10use crate::router::OpenedTransfer;
11use anyhow::{Error, bail, format_err};
12use futures::future::Either;
13use futures::prelude::*;
14use futures::task::{Context, Poll};
15use std::sync::{Arc, Weak};
16use std::task::Waker;
17use zx_status;
18
19pub(crate) async fn follow<Hdl: 'static + for<'a> ProxyableRW<'a>>(
21 mut proxy: Proxy<Hdl>,
22 initiate_transfer: ProxyTransferInitiationReceiver,
23 stream_writer: StreamWriter<Hdl::Message>,
24 new_destination_node: NodeId,
25 transfer_key: TransferKey,
26 stream_reader: StreamReader<Hdl::Message>,
27) -> Result<(), Error> {
28 futures::future::try_join(stream_reader.expect_shutdown(Ok(())), async move {
29 stream_writer.send_ack_transfer().await?;
30 let hdl = proxy.hdl.take().ok_or_else(|| format_err!("Handle already taken"))?;
31 let router = Weak::upgrade(&hdl.router()).ok_or_else(|| format_err!("Router gone"))?;
32 let hdl = hdl.into_fidl_handle()?;
33 drop(proxy);
34 let r = router.open_transfer(new_destination_node.into(), transfer_key, hdl).await?;
35 match r {
36 OpenedTransfer::Fused => {
37 assert!(initiate_transfer.await.unwrap().is_dropped());
38 Ok(())
39 }
40 OpenedTransfer::Remote(new_writer, new_reader, handle) => {
41 let handle = Hdl::from_fidl_handle(handle)?;
42 make_boxed_main_loop(
43 Proxy::new(handle, Arc::downgrade(&router)),
44 initiate_transfer,
45 new_writer.into(),
46 None,
47 new_reader.into(),
48 )
49 .await?;
50 Ok(())
51 }
52 }
53 })
54 .await?;
55 Ok(())
56}
57
58fn make_boxed_main_loop<Hdl: 'static + for<'a> ProxyableRW<'a>>(
60 proxy: Arc<Proxy<Hdl>>,
61 initiate_transfer: ProxyTransferInitiationReceiver,
62 stream_writer: FramedStreamWriter,
63 initial_stream_reader: Option<FramedStreamReader>,
64 stream_reader: FramedStreamReader,
65) -> std::pin::Pin<Box<dyn Send + Future<Output = Result<(), Error>>>> {
66 super::main::run_main_loop(
67 proxy,
68 initiate_transfer,
69 stream_writer,
70 initial_stream_reader,
71 stream_reader,
72 )
73 .boxed()
74}
75
76pub(crate) async fn initiate<Hdl: 'static + for<'a> ProxyableRW<'a>>(
80 proxy: Proxy<Hdl>,
81 pair: fidl::NullableHandle,
82 mut stream_writer: StreamWriter<Hdl::Message>,
83 mut stream_reader: StreamReader<Hdl::Message>,
84 drain_stream: FramedStreamWriter,
85 stream_ref_sender: StreamRefSender,
86) -> Result<(), Error> {
87 let transfer_key = generate_transfer_key();
88
89 let drain_stream = drain_stream.bind(&proxy.hdl());
90 let drain_stream_id = drain_stream.id();
91 let peer_node_id = drain_stream.conn().peer_node_id();
92
93 futures::future::try_join(
94 drain_handle_to_stream(
95 ProxyableHandle::new(Hdl::from_fidl_handle(pair)?, proxy.hdl().router().clone()),
96 drain_stream,
97 ),
98 async move {
99 let stream_ref_sender = flush_outgoing_messages(
102 &proxy,
103 transfer_key,
104 &mut stream_writer,
105 &mut stream_reader,
106 drain_stream_id,
107 stream_ref_sender,
108 )
109 .await?;
110
111 stream_writer.send_begin_transfer(peer_node_id, transfer_key).await?;
113
114 if let Some(stream_ref_sender) = stream_ref_sender {
115 drain_original_stream(
118 &proxy,
119 transfer_key,
120 stream_writer,
121 stream_reader,
122 drain_stream_id,
123 stream_ref_sender,
124 )
125 .await?;
126 } else {
127 stream_writer.send_ack_transfer().await?;
130 stream_reader.expect_ack_transfer().await?;
131 }
132 Ok(())
133 },
134 )
135 .await?;
136 Ok(())
137}
138
139async fn drain_handle_to_stream<Hdl: 'static + for<'a> ProxyableRW<'a>>(
140 hdl: ProxyableHandle<Hdl>,
141 mut stream_writer: StreamWriter<Hdl::Message>,
142) -> Result<(), Error> {
143 let mut message = Default::default();
144 loop {
145 match hdl.read(&mut message).await {
146 Ok(ReadValue::Message) => stream_writer.send_data(&mut message).await?,
147 Ok(ReadValue::SignalUpdate(signal_update)) => {
148 stream_writer.send_signal(signal_update).await?
149 }
150 Err(zx_status::Status::PEER_CLOSED) => break,
151 Err(x) => return Err(x.into()),
152 }
153 }
154 stream_writer.send_end_transfer().await
155}
156
157#[derive(Debug)]
158enum FlushOutgoingMsg<'a, Msg: Message> {
159 FromChannel,
160 FromStream(Frame<'a, Msg>),
161}
162
163async fn flush_outgoing_messages<Hdl: 'static + for<'a> ProxyableRW<'a>>(
164 proxy: &Proxy<Hdl>,
165 original_transfer_key: TransferKey,
166 stream_writer: &mut StreamWriter<Hdl::Message>,
167 stream_reader: &mut StreamReader<Hdl::Message>,
168 drain_stream_id: u64,
169 stream_ref_sender: StreamRefSender,
170) -> Result<Option<StreamRefSender>, Error> {
171 let mut message = Default::default();
172 let endpoint = stream_reader.conn().endpoint();
173 loop {
174 let msg = match futures::future::select(
175 proxy.read_from_handle(&mut message),
176 stream_reader.next(),
177 )
178 .poll_unpin(&mut Context::from_waker(Waker::noop()))
179 {
180 Poll::Pending => return Ok(Some(stream_ref_sender)),
181 Poll::Ready(Either::Left((x, _))) => {
182 x?;
183 FlushOutgoingMsg::FromChannel
184 }
185 Poll::Ready(Either::Right((msg, _))) => FlushOutgoingMsg::FromStream(msg?),
186 };
187 match msg {
188 FlushOutgoingMsg::FromChannel => {
189 stream_writer.send_data(&mut message).await?;
192 }
193 FlushOutgoingMsg::FromStream(Frame::Data(msg)) => {
194 proxy.write_to_handle(msg).await?;
197 }
198 FlushOutgoingMsg::FromStream(Frame::SignalUpdate(signal_update)) => {
199 proxy.apply_signal_update(signal_update)?;
200 }
201 FlushOutgoingMsg::FromStream(Frame::BeginTransfer(
202 new_destination_node,
203 new_transfer_key,
204 )) => {
205 match endpoint {
210 Endpoint::Client => {
211 stream_ref_sender.draining_initiate(
212 drain_stream_id,
213 new_destination_node,
214 new_transfer_key,
215 )?;
216 }
217 Endpoint::Server => {
218 stream_ref_sender.draining_await(drain_stream_id, original_transfer_key)?;
219 }
220 }
221 proxy.drain_handle_to_stream(stream_writer).await?;
222 return Ok(None);
223 }
224 FlushOutgoingMsg::FromStream(Frame::Hello) => {
225 bail!("Hello frame received after stream established")
226 }
227 FlushOutgoingMsg::FromStream(Frame::AckTransfer) => {
228 bail!("AckTransfer received before BeginTransfer sent")
229 }
230 FlushOutgoingMsg::FromStream(Frame::EndTransfer) => {
231 bail!("EndTransfer received on a regular stream")
232 }
233 FlushOutgoingMsg::FromStream(Frame::Shutdown(r)) => {
234 bail!("Stream shutdown during transfer: {:?}", r)
235 }
236 }
237 }
238}
239
240async fn drain_original_stream<Hdl: 'static + for<'a> ProxyableRW<'a>>(
241 proxy: &Proxy<Hdl>,
242 original_transfer_key: TransferKey,
243 stream_writer: StreamWriter<Hdl::Message>,
244 mut stream_reader: StreamReader<Hdl::Message>,
245 drain_stream_id: u64,
246 stream_ref_sender: StreamRefSender,
247) -> Result<(), Error> {
248 let endpoint = stream_reader.conn().endpoint();
249 loop {
250 let r = stream_reader.next().await;
251 match r {
252 Ok(Frame::Hello) => {
253 bail!("Hello frame received after stream established");
254 }
255 Ok(Frame::Data(mut message)) => {
256 proxy.write_to_handle(&mut message).await?;
259 }
260 Ok(Frame::SignalUpdate(signal_update)) => proxy.apply_signal_update(signal_update)?,
261 Ok(Frame::BeginTransfer(new_destination_node, new_transfer_key)) => {
262 match endpoint {
267 Endpoint::Client => {
268 stream_ref_sender.draining_initiate(
269 drain_stream_id,
270 new_destination_node,
271 new_transfer_key,
272 )?;
273 stream_writer.send_ack_transfer().await?;
274 return stream_reader.expect_ack_transfer().await;
275 }
276 Endpoint::Server => {
277 stream_ref_sender.draining_await(drain_stream_id, original_transfer_key)?;
278 stream_writer.send_ack_transfer().await?;
279 return stream_reader.expect_ack_transfer().await;
280 }
281 }
282 }
283 Ok(Frame::AckTransfer) => {
284 stream_writer.send_shutdown(Ok(())).await?;
285 return stream_ref_sender.draining_await(drain_stream_id, original_transfer_key);
286 }
287 Ok(Frame::EndTransfer) => bail!("EndTransfer received on a regular stream"),
288 Ok(Frame::Shutdown(r)) => bail!("Stream shutdown during transfer: {:?}", r),
289 Err(e) => return Err(e),
290 }
291 }
292}