overnet_core/proxy/handle/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod channel;
6mod event_pair;
7mod signals;
8mod socket;
9
10use super::stream::{Frame, StreamReaderBinder, StreamWriter};
11use crate::peer::{FramedStreamReader, PeerConnRef};
12use crate::router::Router;
13use anyhow::{Error, bail, format_err};
14use fidl::Signals;
15use fidl_fuchsia_overnet_protocol::SignalUpdate;
16use futures::future::poll_fn;
17use futures::prelude::*;
18use std::sync::{Arc, Weak};
19use std::task::{Context, Poll, Waker};
20use zx_status;
21
22/// Holds a reference to a router.
23/// We start out `Unused` with a weak reference to the router, but various methods
24/// need said router, and so we can transition to `Used` with a reference when the router
25/// is needed.
26/// Saves some repeated upgrading of weak to arc.
27#[derive(Clone)]
28pub(crate) enum RouterHolder<'a> {
29    Unused(&'a Weak<Router>),
30    Used(Arc<Router>),
31}
32
33impl<'a> std::fmt::Debug for RouterHolder<'a> {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            RouterHolder::Unused(_) => f.write_str("Unused"),
37            RouterHolder::Used(r) => write!(f, "Used({:?})", r.node_id()),
38        }
39    }
40}
41
42impl<'a> RouterHolder<'a> {
43    pub(crate) fn get(&mut self) -> Result<&Arc<Router>, Error> {
44        match self {
45            RouterHolder::Used(r) => Ok(r),
46            RouterHolder::Unused(r) => {
47                *self = RouterHolder::Used(
48                    Weak::upgrade(r).ok_or_else(|| format_err!("Router is closed"))?,
49                );
50                self.get()
51            }
52        }
53    }
54}
55
56/// Perform some IO operation on a handle.
57pub(crate) trait IO<'a>: Send {
58    type Proxyable: Proxyable;
59    type Output;
60    fn new() -> Self;
61    fn poll_io(
62        &mut self,
63        msg: &mut <Self::Proxyable as Proxyable>::Message,
64        proxyable: &'a Self::Proxyable,
65        fut_ctx: &mut Context<'_>,
66    ) -> Poll<Result<Self::Output, zx_status::Status>>;
67}
68
69/// Serializer defines how to read or write a message to a QUIC stream.
70/// They are usually defined in pairs (one reader, one writer).
71/// In some cases those implementations end up being the same and we leverage that to improve
72/// footprint.
73pub(crate) trait Serializer: Send {
74    type Message;
75    fn new() -> Self;
76    fn poll_ser(
77        &mut self,
78        msg: &mut Self::Message,
79        bytes: &mut Vec<u8>,
80        conn: PeerConnRef<'_>,
81        router: &mut RouterHolder<'_>,
82        fut_ctx: &mut Context<'_>,
83    ) -> Poll<Result<(), Error>>;
84}
85
86/// A proxyable message - defines how to parse/serialize itself, and gets pulled
87/// in by Proxyable to also define how to send/receive itself on the right kind of handle.
88pub(crate) trait Message: Send + Sized + Default + PartialEq + std::fmt::Debug {
89    /// How to parse this message type from bytes.
90    type Parser: Serializer<Message = Self> + std::fmt::Debug;
91    /// How to turn this message into wire bytes.
92    type Serializer: Serializer<Message = Self>;
93}
94
95/// The result of an IO read - either a message was received, or a signal.
96pub(crate) enum ReadValue {
97    Message,
98    SignalUpdate(SignalUpdate),
99}
100
101/// An object that can be proxied.
102pub(crate) trait Proxyable: Send + Sync + Sized + std::fmt::Debug {
103    /// The type of message exchanged by this handle.
104    /// This transitively also brings in types encoding how to parse/serialize messages to the
105    /// wire.
106    type Message: Message;
107
108    /// Convert a FIDL handle into a proxyable instance (or fail).
109    fn from_fidl_handle(hdl: fidl::NullableHandle) -> Result<Self, Error>;
110    /// Collapse this Proxyable instance back to the underlying FIDL handle (or fail).
111    fn into_fidl_handle(self) -> Result<fidl::NullableHandle, Error>;
112    /// Clear/set signals on this handle's peer.
113    fn signal_peer(&self, clear: Signals, set: Signals) -> Result<(), Error>;
114    /// Set a reason for this handle to close.
115    fn close_with_reason(self, _msg: String) {}
116}
117
118pub(crate) trait ProxyableRW<'a>: Proxyable {
119    /// A type that can be used for communicating messages from the handle to the proxy code.
120    type Reader: 'a + IO<'a, Proxyable = Self, Output = ReadValue>;
121    /// A type that can be used for communicating messages from the proxy code to the handle.
122    type Writer: 'a + IO<'a, Proxyable = Self, Output = ()>;
123}
124
125pub(crate) trait IntoProxied {
126    type Proxied: Proxyable;
127    fn into_proxied(self) -> Result<Self::Proxied, Error>;
128}
129
130/// Wraps a Proxyable, adds some convenience values, and provides a nicer API.
131pub(crate) struct ProxyableHandle<Hdl: Proxyable> {
132    hdl: Hdl,
133    router: Weak<Router>,
134}
135
136impl<Hdl: Proxyable> std::fmt::Debug for ProxyableHandle<Hdl> {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        write!(f, "{:?}#{:?}", self.hdl, Weak::upgrade(&self.router).map(|r| r.node_id()))
139    }
140}
141
142impl<Hdl: Proxyable> ProxyableHandle<Hdl> {
143    pub(crate) fn new(hdl: Hdl, router: Weak<Router>) -> Self {
144        Self { hdl, router }
145    }
146
147    pub(crate) fn into_fidl_handle(self) -> Result<fidl::NullableHandle, Error> {
148        self.hdl.into_fidl_handle()
149    }
150
151    pub(crate) fn close_with_reason(self, msg: String) {
152        self.hdl.close_with_reason(msg);
153    }
154
155    /// Write `msg` to the handle.
156    pub(crate) fn write<'a>(
157        &'a self,
158        msg: &'a mut Hdl::Message,
159    ) -> impl 'a + Future<Output = Result<(), zx_status::Status>> + Unpin
160    where
161        Hdl: ProxyableRW<'a>,
162    {
163        self.handle_io(msg, Hdl::Writer::new())
164    }
165
166    /// Attempt to read one `msg` from the handle.
167    /// Return Ok(Message) if a message was read.
168    /// Return Ok(SignalUpdate) if a signal was instead read.
169    /// Return Err(_) if an error occurred.
170    pub(crate) fn read<'a>(
171        &'a self,
172        msg: &'a mut Hdl::Message,
173    ) -> impl 'a + Future<Output = Result<ReadValue, zx_status::Status>> + Unpin
174    where
175        Hdl: ProxyableRW<'a>,
176    {
177        self.handle_io(msg, Hdl::Reader::new())
178    }
179
180    pub(crate) fn router(&self) -> &Weak<Router> {
181        &self.router
182    }
183
184    /// Given a signal update from the wire, apply it to the underlying handle (signalling
185    /// the peer and completing the loop).
186    pub(crate) fn apply_signal_update(&self, signal_update: SignalUpdate) -> Result<(), Error> {
187        if let Some(assert_signals) = signal_update.assert_signals {
188            self.hdl
189                .signal_peer(Signals::empty(), self::signals::from_wire_signals(assert_signals))?
190        }
191        Ok(())
192    }
193
194    fn handle_io<'a, I: 'a + IO<'a, Proxyable = Hdl>>(
195        &'a self,
196        msg: &'a mut Hdl::Message,
197        mut io: I,
198    ) -> impl 'a + Future<Output = Result<<I as IO<'a>>::Output, zx_status::Status>> + Unpin {
199        poll_fn(move |fut_ctx| io.poll_io(msg, &self.hdl, fut_ctx))
200    }
201
202    /// Drain all remaining messages from this handle and write them to `stream_writer`.
203    /// Assumes that nothing else is writing to the handle, so that getting Poll::Pending on read
204    /// is a signal that all messages have been read.
205    pub(crate) async fn drain_to_stream(
206        &self,
207        stream_writer: &mut StreamWriter<Hdl::Message>,
208    ) -> Result<(), Error>
209    where
210        Hdl: for<'a> ProxyableRW<'a>,
211    {
212        let mut message = Default::default();
213        loop {
214            let pr = self.read(&mut message).poll_unpin(&mut Context::from_waker(Waker::noop()));
215            match pr {
216                Poll::Pending => return Ok(()),
217                Poll::Ready(Err(e)) => return Err(e.into()),
218                Poll::Ready(Ok(ReadValue::Message)) => {
219                    stream_writer.send_data(&mut message).await?
220                }
221                Poll::Ready(Ok(ReadValue::SignalUpdate(signal_update))) => {
222                    stream_writer.send_signal(signal_update).await?
223                }
224            }
225        }
226    }
227
228    /// Drain all messages on a stream into this handle.
229    pub(crate) async fn drain_stream_to_handle(
230        self,
231        drain_stream: FramedStreamReader,
232    ) -> Result<fidl::NullableHandle, Error>
233    where
234        Hdl: for<'a> ProxyableRW<'a>,
235    {
236        let mut drain_stream = drain_stream.bind(&self);
237        loop {
238            match drain_stream.next().await? {
239                Frame::Data(message) => self.write(message).await?,
240                Frame::SignalUpdate(signal_update) => self.apply_signal_update(signal_update)?,
241                Frame::EndTransfer => return Ok(self.hdl.into_fidl_handle()?),
242                Frame::Hello => bail!("Hello frame disallowed on drain streams"),
243                Frame::BeginTransfer(_, _) => bail!("BeginTransfer on drain stream"),
244                Frame::AckTransfer => bail!("AckTransfer on drain stream"),
245                Frame::Shutdown(r) => bail!("Stream shutdown during drain: {:?}", r),
246            }
247        }
248    }
249}