overnet_core/proxy/run/
xfer.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::handle::{Message, ProxyableHandle, ProxyableRW, ReadValue};
6use super::super::stream::{Frame, StreamReader, StreamWriter, StreamWriterBinder};
7use super::super::{Proxy, ProxyTransferInitiationReceiver, StreamRefSender};
8use crate::labels::{Endpoint, NodeId, TransferKey, generate_transfer_key};
9use crate::peer::{FramedStreamReader, FramedStreamWriter};
10use crate::router::OpenedTransfer;
11use anyhow::{Error, bail, format_err};
12use futures::future::Either;
13use futures::prelude::*;
14use futures::task::{Context, Poll};
15use std::sync::{Arc, Weak};
16use std::task::Waker;
17use zx_status;
18
19// Follow a transfer that was initated elsewhere to the destination.
20pub(crate) async fn follow<Hdl: 'static + for<'a> ProxyableRW<'a>>(
21    mut proxy: Proxy<Hdl>,
22    initiate_transfer: ProxyTransferInitiationReceiver,
23    stream_writer: StreamWriter<Hdl::Message>,
24    new_destination_node: NodeId,
25    transfer_key: TransferKey,
26    stream_reader: StreamReader<Hdl::Message>,
27) -> Result<(), Error> {
28    futures::future::try_join(stream_reader.expect_shutdown(Ok(())), async move {
29        stream_writer.send_ack_transfer().await?;
30        let hdl = proxy.hdl.take().ok_or_else(|| format_err!("Handle already taken"))?;
31        let router = Weak::upgrade(&hdl.router()).ok_or_else(|| format_err!("Router gone"))?;
32        let hdl = hdl.into_fidl_handle()?;
33        drop(proxy);
34        let r = router.open_transfer(new_destination_node.into(), transfer_key, hdl).await?;
35        match r {
36            OpenedTransfer::Fused => {
37                assert!(initiate_transfer.await.unwrap().is_dropped());
38                Ok(())
39            }
40            OpenedTransfer::Remote(new_writer, new_reader, handle) => {
41                let handle = Hdl::from_fidl_handle(handle)?;
42                make_boxed_main_loop(
43                    Proxy::new(handle, Arc::downgrade(&router)),
44                    initiate_transfer,
45                    new_writer.into(),
46                    None,
47                    new_reader.into(),
48                )
49                .await?;
50                Ok(())
51            }
52        }
53    })
54    .await?;
55    Ok(())
56}
57
58// This needs to be split out to avoid the compiler infinite looping.
59fn make_boxed_main_loop<Hdl: 'static + for<'a> ProxyableRW<'a>>(
60    proxy: Arc<Proxy<Hdl>>,
61    initiate_transfer: ProxyTransferInitiationReceiver,
62    stream_writer: FramedStreamWriter,
63    initial_stream_reader: Option<FramedStreamReader>,
64    stream_reader: FramedStreamReader,
65) -> std::pin::Pin<Box<dyn Send + Future<Output = Result<(), Error>>>> {
66    super::main::run_main_loop(
67        proxy,
68        initiate_transfer,
69        stream_writer,
70        initial_stream_reader,
71        stream_reader,
72    )
73    .boxed()
74}
75
76// Initiate a transfer. The other end of a handle we're already proxying (`pair`) has been sent
77// to a new endpoint. A drain stream has been established to flush already received messages to the
78// new endpoint.
79pub(crate) async fn initiate<Hdl: 'static + for<'a> ProxyableRW<'a>>(
80    proxy: Proxy<Hdl>,
81    pair: fidl::NullableHandle,
82    mut stream_writer: StreamWriter<Hdl::Message>,
83    mut stream_reader: StreamReader<Hdl::Message>,
84    drain_stream: FramedStreamWriter,
85    stream_ref_sender: StreamRefSender,
86) -> Result<(), Error> {
87    let transfer_key = generate_transfer_key();
88
89    let drain_stream = drain_stream.bind(&proxy.hdl());
90    let drain_stream_id = drain_stream.id();
91    let peer_node_id = drain_stream.conn().peer_node_id();
92
93    futures::future::try_join(
94        drain_handle_to_stream(
95            ProxyableHandle::new(Hdl::from_fidl_handle(pair)?, proxy.hdl().router().clone()),
96            drain_stream,
97        ),
98        async move {
99            // Before we can send a BeginTransfer message we need to flush out any messages we intended to
100            // send.
101            let stream_ref_sender = flush_outgoing_messages(
102                &proxy,
103                transfer_key,
104                &mut stream_writer,
105                &mut stream_reader,
106                drain_stream_id,
107                stream_ref_sender,
108            )
109            .await?;
110
111            // Send the BeginTransfer.
112            stream_writer.send_begin_transfer(peer_node_id, transfer_key).await?;
113
114            if let Some(stream_ref_sender) = stream_ref_sender {
115                // Now we need to read any incoming messages from the original stream and prepare to send
116                // them to the drain stream.
117                drain_original_stream(
118                    &proxy,
119                    transfer_key,
120                    stream_writer,
121                    stream_reader,
122                    drain_stream_id,
123                    stream_ref_sender,
124                )
125                .await?;
126            } else {
127                // This implies we got a BeginTransfer during the channel drain
128                // and consequently need to ack it (after our BeginTransfer was sent)
129                stream_writer.send_ack_transfer().await?;
130                stream_reader.expect_ack_transfer().await?;
131            }
132            Ok(())
133        },
134    )
135    .await?;
136    Ok(())
137}
138
139async fn drain_handle_to_stream<Hdl: 'static + for<'a> ProxyableRW<'a>>(
140    hdl: ProxyableHandle<Hdl>,
141    mut stream_writer: StreamWriter<Hdl::Message>,
142) -> Result<(), Error> {
143    let mut message = Default::default();
144    loop {
145        match hdl.read(&mut message).await {
146            Ok(ReadValue::Message) => stream_writer.send_data(&mut message).await?,
147            Ok(ReadValue::SignalUpdate(signal_update)) => {
148                stream_writer.send_signal(signal_update).await?
149            }
150            Err(zx_status::Status::PEER_CLOSED) => break,
151            Err(x) => return Err(x.into()),
152        }
153    }
154    stream_writer.send_end_transfer().await
155}
156
157#[derive(Debug)]
158enum FlushOutgoingMsg<'a, Msg: Message> {
159    FromChannel,
160    FromStream(Frame<'a, Msg>),
161}
162
163async fn flush_outgoing_messages<Hdl: 'static + for<'a> ProxyableRW<'a>>(
164    proxy: &Proxy<Hdl>,
165    original_transfer_key: TransferKey,
166    stream_writer: &mut StreamWriter<Hdl::Message>,
167    stream_reader: &mut StreamReader<Hdl::Message>,
168    drain_stream_id: u64,
169    stream_ref_sender: StreamRefSender,
170) -> Result<Option<StreamRefSender>, Error> {
171    let mut message = Default::default();
172    let endpoint = stream_reader.conn().endpoint();
173    loop {
174        let msg = match futures::future::select(
175            proxy.read_from_handle(&mut message),
176            stream_reader.next(),
177        )
178        .poll_unpin(&mut Context::from_waker(Waker::noop()))
179        {
180            Poll::Pending => return Ok(Some(stream_ref_sender)),
181            Poll::Ready(Either::Left((x, _))) => {
182                x?;
183                FlushOutgoingMsg::FromChannel
184            }
185            Poll::Ready(Either::Right((msg, _))) => FlushOutgoingMsg::FromStream(msg?),
186        };
187        match msg {
188            FlushOutgoingMsg::FromChannel => {
189                // Message was read from the channel (it's not empty yet)... so we send it out on
190                // the stream.
191                stream_writer.send_data(&mut message).await?;
192            }
193            FlushOutgoingMsg::FromStream(Frame::Data(msg)) => {
194                // We received an incoming message - place it on the handle pair for a moment until
195                // we can send it to the drain stream.
196                proxy.write_to_handle(msg).await?;
197            }
198            FlushOutgoingMsg::FromStream(Frame::SignalUpdate(signal_update)) => {
199                proxy.apply_signal_update(signal_update)?;
200            }
201            FlushOutgoingMsg::FromStream(Frame::BeginTransfer(
202                new_destination_node,
203                new_transfer_key,
204            )) => {
205                // Uh oh! The other end has independently decided to transfer ownership of this
206                // handle. We use the quic endpoint to determine behavior (such that each end makes
207                // a consistent decision) - clients start a new stream to the target, servers await
208                // that stream, and then we just need to drain messages.
209                match endpoint {
210                    Endpoint::Client => {
211                        stream_ref_sender.draining_initiate(
212                            drain_stream_id,
213                            new_destination_node,
214                            new_transfer_key,
215                        )?;
216                    }
217                    Endpoint::Server => {
218                        stream_ref_sender.draining_await(drain_stream_id, original_transfer_key)?;
219                    }
220                }
221                proxy.drain_handle_to_stream(stream_writer).await?;
222                return Ok(None);
223            }
224            FlushOutgoingMsg::FromStream(Frame::Hello) => {
225                bail!("Hello frame received after stream established")
226            }
227            FlushOutgoingMsg::FromStream(Frame::AckTransfer) => {
228                bail!("AckTransfer received before BeginTransfer sent")
229            }
230            FlushOutgoingMsg::FromStream(Frame::EndTransfer) => {
231                bail!("EndTransfer received on a regular stream")
232            }
233            FlushOutgoingMsg::FromStream(Frame::Shutdown(r)) => {
234                bail!("Stream shutdown during transfer: {:?}", r)
235            }
236        }
237    }
238}
239
240async fn drain_original_stream<Hdl: 'static + for<'a> ProxyableRW<'a>>(
241    proxy: &Proxy<Hdl>,
242    original_transfer_key: TransferKey,
243    stream_writer: StreamWriter<Hdl::Message>,
244    mut stream_reader: StreamReader<Hdl::Message>,
245    drain_stream_id: u64,
246    stream_ref_sender: StreamRefSender,
247) -> Result<(), Error> {
248    let endpoint = stream_reader.conn().endpoint();
249    loop {
250        let r = stream_reader.next().await;
251        match r {
252            Ok(Frame::Hello) => {
253                bail!("Hello frame received after stream established");
254            }
255            Ok(Frame::Data(mut message)) => {
256                // We received an incoming message - place it on the handle pair for a moment until
257                // we can send it to the drain stream.
258                proxy.write_to_handle(&mut message).await?;
259            }
260            Ok(Frame::SignalUpdate(signal_update)) => proxy.apply_signal_update(signal_update)?,
261            Ok(Frame::BeginTransfer(new_destination_node, new_transfer_key)) => {
262                // Uh oh! The other end has independently decided to transfer ownership of this
263                // handle. We use the quic endpoint to determine behavior (such that each end makes
264                // a consistent decision) - clients start a new stream to the target, servers await
265                // that stream. We've flushed messages, so we need to do an ack dance too.
266                match endpoint {
267                    Endpoint::Client => {
268                        stream_ref_sender.draining_initiate(
269                            drain_stream_id,
270                            new_destination_node,
271                            new_transfer_key,
272                        )?;
273                        stream_writer.send_ack_transfer().await?;
274                        return stream_reader.expect_ack_transfer().await;
275                    }
276                    Endpoint::Server => {
277                        stream_ref_sender.draining_await(drain_stream_id, original_transfer_key)?;
278                        stream_writer.send_ack_transfer().await?;
279                        return stream_reader.expect_ack_transfer().await;
280                    }
281                }
282            }
283            Ok(Frame::AckTransfer) => {
284                stream_writer.send_shutdown(Ok(())).await?;
285                return stream_ref_sender.draining_await(drain_stream_id, original_transfer_key);
286            }
287            Ok(Frame::EndTransfer) => bail!("EndTransfer received on a regular stream"),
288            Ok(Frame::Shutdown(r)) => bail!("Stream shutdown during transfer: {:?}", r),
289            Err(e) => return Err(e),
290        }
291    }
292}