overnet_core/proxy/run/
xfer.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::handle::{Message, ProxyableHandle, ProxyableRW, ReadValue};
6use super::super::stream::{Frame, StreamReader, StreamWriter, StreamWriterBinder};
7use super::super::{Proxy, ProxyTransferInitiationReceiver, StreamRefSender};
8use crate::labels::{generate_transfer_key, Endpoint, NodeId, TransferKey};
9use crate::peer::{FramedStreamReader, FramedStreamWriter};
10use crate::router::OpenedTransfer;
11use anyhow::{bail, format_err, Error};
12use futures::future::Either;
13use futures::prelude::*;
14use futures::task::{noop_waker_ref, Context, Poll};
15use std::sync::{Arc, Weak};
16use zx_status;
17
18// Follow a transfer that was initated elsewhere to the destination.
19pub(crate) async fn follow<Hdl: 'static + for<'a> ProxyableRW<'a>>(
20    mut proxy: Proxy<Hdl>,
21    initiate_transfer: ProxyTransferInitiationReceiver,
22    stream_writer: StreamWriter<Hdl::Message>,
23    new_destination_node: NodeId,
24    transfer_key: TransferKey,
25    stream_reader: StreamReader<Hdl::Message>,
26) -> Result<(), Error> {
27    futures::future::try_join(stream_reader.expect_shutdown(Ok(())), async move {
28        stream_writer.send_ack_transfer().await?;
29        let hdl = proxy.hdl.take().ok_or_else(|| format_err!("Handle already taken"))?;
30        let router = Weak::upgrade(&hdl.router()).ok_or_else(|| format_err!("Router gone"))?;
31        let hdl = hdl.into_fidl_handle()?;
32        drop(proxy);
33        let r = router.open_transfer(new_destination_node.into(), transfer_key, hdl).await?;
34        match r {
35            OpenedTransfer::Fused => {
36                assert!(initiate_transfer.await.unwrap().is_dropped());
37                Ok(())
38            }
39            OpenedTransfer::Remote(new_writer, new_reader, handle) => {
40                let handle = Hdl::from_fidl_handle(handle)?;
41                make_boxed_main_loop(
42                    Proxy::new(handle, Arc::downgrade(&router)),
43                    initiate_transfer,
44                    new_writer.into(),
45                    None,
46                    new_reader.into(),
47                )
48                .await?;
49                Ok(())
50            }
51        }
52    })
53    .await?;
54    Ok(())
55}
56
57// This needs to be split out to avoid the compiler infinite looping.
58fn make_boxed_main_loop<Hdl: 'static + for<'a> ProxyableRW<'a>>(
59    proxy: Arc<Proxy<Hdl>>,
60    initiate_transfer: ProxyTransferInitiationReceiver,
61    stream_writer: FramedStreamWriter,
62    initial_stream_reader: Option<FramedStreamReader>,
63    stream_reader: FramedStreamReader,
64) -> std::pin::Pin<Box<dyn Send + Future<Output = Result<(), Error>>>> {
65    super::main::run_main_loop(
66        proxy,
67        initiate_transfer,
68        stream_writer,
69        initial_stream_reader,
70        stream_reader,
71    )
72    .boxed()
73}
74
75// Initiate a transfer. The other end of a handle we're already proxying (`pair`) has been sent
76// to a new endpoint. A drain stream has been established to flush already received messages to the
77// new endpoint.
78pub(crate) async fn initiate<Hdl: 'static + for<'a> ProxyableRW<'a>>(
79    proxy: Proxy<Hdl>,
80    pair: fidl::Handle,
81    mut stream_writer: StreamWriter<Hdl::Message>,
82    mut stream_reader: StreamReader<Hdl::Message>,
83    drain_stream: FramedStreamWriter,
84    stream_ref_sender: StreamRefSender,
85) -> Result<(), Error> {
86    let transfer_key = generate_transfer_key();
87
88    let drain_stream = drain_stream.bind(&proxy.hdl());
89    let drain_stream_id = drain_stream.id();
90    let peer_node_id = drain_stream.conn().peer_node_id();
91
92    futures::future::try_join(
93        drain_handle_to_stream(
94            ProxyableHandle::new(Hdl::from_fidl_handle(pair)?, proxy.hdl().router().clone()),
95            drain_stream,
96        ),
97        async move {
98            // Before we can send a BeginTransfer message we need to flush out any messages we intended to
99            // send.
100            let stream_ref_sender = flush_outgoing_messages(
101                &proxy,
102                transfer_key,
103                &mut stream_writer,
104                &mut stream_reader,
105                drain_stream_id,
106                stream_ref_sender,
107            )
108            .await?;
109
110            // Send the BeginTransfer.
111            stream_writer.send_begin_transfer(peer_node_id, transfer_key).await?;
112
113            if let Some(stream_ref_sender) = stream_ref_sender {
114                // Now we need to read any incoming messages from the original stream and prepare to send
115                // them to the drain stream.
116                drain_original_stream(
117                    &proxy,
118                    transfer_key,
119                    stream_writer,
120                    stream_reader,
121                    drain_stream_id,
122                    stream_ref_sender,
123                )
124                .await?;
125            } else {
126                // This implies we got a BeginTransfer during the channel drain
127                // and consequently need to ack it (after our BeginTransfer was sent)
128                stream_writer.send_ack_transfer().await?;
129                stream_reader.expect_ack_transfer().await?;
130            }
131            Ok(())
132        },
133    )
134    .await?;
135    Ok(())
136}
137
138async fn drain_handle_to_stream<Hdl: 'static + for<'a> ProxyableRW<'a>>(
139    hdl: ProxyableHandle<Hdl>,
140    mut stream_writer: StreamWriter<Hdl::Message>,
141) -> Result<(), Error> {
142    let mut message = Default::default();
143    loop {
144        match hdl.read(&mut message).await {
145            Ok(ReadValue::Message) => stream_writer.send_data(&mut message).await?,
146            Ok(ReadValue::SignalUpdate(signal_update)) => {
147                stream_writer.send_signal(signal_update).await?
148            }
149            Err(zx_status::Status::PEER_CLOSED) => break,
150            Err(x) => return Err(x.into()),
151        }
152    }
153    stream_writer.send_end_transfer().await
154}
155
156#[derive(Debug)]
157enum FlushOutgoingMsg<'a, Msg: Message> {
158    FromChannel,
159    FromStream(Frame<'a, Msg>),
160}
161
162async fn flush_outgoing_messages<Hdl: 'static + for<'a> ProxyableRW<'a>>(
163    proxy: &Proxy<Hdl>,
164    original_transfer_key: TransferKey,
165    stream_writer: &mut StreamWriter<Hdl::Message>,
166    stream_reader: &mut StreamReader<Hdl::Message>,
167    drain_stream_id: u64,
168    stream_ref_sender: StreamRefSender,
169) -> Result<Option<StreamRefSender>, Error> {
170    let mut message = Default::default();
171    let endpoint = stream_reader.conn().endpoint();
172    loop {
173        let msg = match futures::future::select(
174            proxy.read_from_handle(&mut message),
175            stream_reader.next(),
176        )
177        .poll_unpin(&mut Context::from_waker(noop_waker_ref()))
178        {
179            Poll::Pending => return Ok(Some(stream_ref_sender)),
180            Poll::Ready(Either::Left((x, _))) => {
181                x?;
182                FlushOutgoingMsg::FromChannel
183            }
184            Poll::Ready(Either::Right((msg, _))) => FlushOutgoingMsg::FromStream(msg?),
185        };
186        match msg {
187            FlushOutgoingMsg::FromChannel => {
188                // Message was read from the channel (it's not empty yet)... so we send it out on
189                // the stream.
190                stream_writer.send_data(&mut message).await?;
191            }
192            FlushOutgoingMsg::FromStream(Frame::Data(msg)) => {
193                // We received an incoming message - place it on the handle pair for a moment until
194                // we can send it to the drain stream.
195                proxy.write_to_handle(msg).await?;
196            }
197            FlushOutgoingMsg::FromStream(Frame::SignalUpdate(signal_update)) => {
198                proxy.apply_signal_update(signal_update)?;
199            }
200            FlushOutgoingMsg::FromStream(Frame::BeginTransfer(
201                new_destination_node,
202                new_transfer_key,
203            )) => {
204                // Uh oh! The other end has independently decided to transfer ownership of this
205                // handle. We use the quic endpoint to determine behavior (such that each end makes
206                // a consistent decision) - clients start a new stream to the target, servers await
207                // that stream, and then we just need to drain messages.
208                match endpoint {
209                    Endpoint::Client => {
210                        stream_ref_sender.draining_initiate(
211                            drain_stream_id,
212                            new_destination_node,
213                            new_transfer_key,
214                        )?;
215                    }
216                    Endpoint::Server => {
217                        stream_ref_sender.draining_await(drain_stream_id, original_transfer_key)?;
218                    }
219                }
220                proxy.drain_handle_to_stream(stream_writer).await?;
221                return Ok(None);
222            }
223            FlushOutgoingMsg::FromStream(Frame::Hello) => {
224                bail!("Hello frame received after stream established")
225            }
226            FlushOutgoingMsg::FromStream(Frame::AckTransfer) => {
227                bail!("AckTransfer received before BeginTransfer sent")
228            }
229            FlushOutgoingMsg::FromStream(Frame::EndTransfer) => {
230                bail!("EndTransfer received on a regular stream")
231            }
232            FlushOutgoingMsg::FromStream(Frame::Shutdown(r)) => {
233                bail!("Stream shutdown during transfer: {:?}", r)
234            }
235        }
236    }
237}
238
239async fn drain_original_stream<Hdl: 'static + for<'a> ProxyableRW<'a>>(
240    proxy: &Proxy<Hdl>,
241    original_transfer_key: TransferKey,
242    stream_writer: StreamWriter<Hdl::Message>,
243    mut stream_reader: StreamReader<Hdl::Message>,
244    drain_stream_id: u64,
245    stream_ref_sender: StreamRefSender,
246) -> Result<(), Error> {
247    let endpoint = stream_reader.conn().endpoint();
248    loop {
249        let r = stream_reader.next().await;
250        match r {
251            Ok(Frame::Hello) => {
252                bail!("Hello frame received after stream established");
253            }
254            Ok(Frame::Data(mut message)) => {
255                // We received an incoming message - place it on the handle pair for a moment until
256                // we can send it to the drain stream.
257                proxy.write_to_handle(&mut message).await?;
258            }
259            Ok(Frame::SignalUpdate(signal_update)) => proxy.apply_signal_update(signal_update)?,
260            Ok(Frame::BeginTransfer(new_destination_node, new_transfer_key)) => {
261                // Uh oh! The other end has independently decided to transfer ownership of this
262                // handle. We use the quic endpoint to determine behavior (such that each end makes
263                // a consistent decision) - clients start a new stream to the target, servers await
264                // that stream. We've flushed messages, so we need to do an ack dance too.
265                match endpoint {
266                    Endpoint::Client => {
267                        stream_ref_sender.draining_initiate(
268                            drain_stream_id,
269                            new_destination_node,
270                            new_transfer_key,
271                        )?;
272                        stream_writer.send_ack_transfer().await?;
273                        return stream_reader.expect_ack_transfer().await;
274                    }
275                    Endpoint::Server => {
276                        stream_ref_sender.draining_await(drain_stream_id, original_transfer_key)?;
277                        stream_writer.send_ack_transfer().await?;
278                        return stream_reader.expect_ack_transfer().await;
279                    }
280                }
281            }
282            Ok(Frame::AckTransfer) => {
283                stream_writer.send_shutdown(Ok(())).await?;
284                return stream_ref_sender.draining_await(drain_stream_id, original_transfer_key);
285            }
286            Ok(Frame::EndTransfer) => bail!("EndTransfer received on a regular stream"),
287            Ok(Frame::Shutdown(r)) => bail!("Stream shutdown during transfer: {:?}", r),
288            Err(e) => return Err(e),
289        }
290    }
291}