overnet_core/proxy/handle/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod channel;
6mod event_pair;
7mod signals;
8mod socket;
9
10use super::stream::{Frame, StreamReaderBinder, StreamWriter};
11use crate::peer::{FramedStreamReader, PeerConnRef};
12use crate::router::Router;
13use anyhow::{bail, format_err, Error};
14use fidl::Signals;
15use fidl_fuchsia_overnet_protocol::SignalUpdate;
16use futures::future::poll_fn;
17use futures::prelude::*;
18use futures::task::noop_waker_ref;
19use std::sync::{Arc, Weak};
20use std::task::{Context, Poll};
21use zx_status;
22
23#[cfg(not(target_os = "fuchsia"))]
24use fuchsia_async::emulated_handle::ChannelProxyProtocol;
25
26/// Holds a reference to a router.
27/// We start out `Unused` with a weak reference to the router, but various methods
28/// need said router, and so we can transition to `Used` with a reference when the router
29/// is needed.
30/// Saves some repeated upgrading of weak to arc.
31#[derive(Clone)]
32pub(crate) enum RouterHolder<'a> {
33    Unused(&'a Weak<Router>),
34    Used(Arc<Router>),
35}
36
37impl<'a> std::fmt::Debug for RouterHolder<'a> {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            RouterHolder::Unused(_) => f.write_str("Unused"),
41            RouterHolder::Used(r) => write!(f, "Used({:?})", r.node_id()),
42        }
43    }
44}
45
46impl<'a> RouterHolder<'a> {
47    pub(crate) fn get(&mut self) -> Result<&Arc<Router>, Error> {
48        match self {
49            RouterHolder::Used(ref r) => Ok(r),
50            RouterHolder::Unused(r) => {
51                *self = RouterHolder::Used(
52                    Weak::upgrade(r).ok_or_else(|| format_err!("Router is closed"))?,
53                );
54                self.get()
55            }
56        }
57    }
58}
59
60/// Perform some IO operation on a handle.
61pub(crate) trait IO<'a>: Send {
62    type Proxyable: Proxyable;
63    type Output;
64    fn new() -> Self;
65    fn poll_io(
66        &mut self,
67        msg: &mut <Self::Proxyable as Proxyable>::Message,
68        proxyable: &'a Self::Proxyable,
69        fut_ctx: &mut Context<'_>,
70    ) -> Poll<Result<Self::Output, zx_status::Status>>;
71}
72
73/// Serializer defines how to read or write a message to a QUIC stream.
74/// They are usually defined in pairs (one reader, one writer).
75/// In some cases those implementations end up being the same and we leverage that to improve
76/// footprint.
77pub(crate) trait Serializer: Send {
78    type Message;
79    fn new() -> Self;
80    fn poll_ser(
81        &mut self,
82        msg: &mut Self::Message,
83        bytes: &mut Vec<u8>,
84        conn: PeerConnRef<'_>,
85        router: &mut RouterHolder<'_>,
86        fut_ctx: &mut Context<'_>,
87    ) -> Poll<Result<(), Error>>;
88}
89
90/// A proxyable message - defines how to parse/serialize itself, and gets pulled
91/// in by Proxyable to also define how to send/receive itself on the right kind of handle.
92pub(crate) trait Message: Send + Sized + Default + PartialEq + std::fmt::Debug {
93    /// How to parse this message type from bytes.
94    type Parser: Serializer<Message = Self> + std::fmt::Debug;
95    /// How to turn this message into wire bytes.
96    type Serializer: Serializer<Message = Self>;
97}
98
99/// The result of an IO read - either a message was received, or a signal.
100pub(crate) enum ReadValue {
101    Message,
102    SignalUpdate(SignalUpdate),
103}
104
105/// An object that can be proxied.
106pub(crate) trait Proxyable: Send + Sync + Sized + std::fmt::Debug {
107    /// The type of message exchanged by this handle.
108    /// This transitively also brings in types encoding how to parse/serialize messages to the
109    /// wire.
110    type Message: Message;
111
112    /// Convert a FIDL handle into a proxyable instance (or fail).
113    fn from_fidl_handle(hdl: fidl::Handle) -> Result<Self, Error>;
114    /// Collapse this Proxyable instance back to the underlying FIDL handle (or fail).
115    fn into_fidl_handle(self) -> Result<fidl::Handle, Error>;
116    /// Clear/set signals on this handle's peer.
117    fn signal_peer(&self, clear: Signals, set: Signals) -> Result<(), Error>;
118    /// Let the channel (if this is one) know what protocol we're using to proxy it.
119    #[cfg(not(target_os = "fuchsia"))]
120    fn set_channel_proxy_protocol(&self, _proto: ChannelProxyProtocol) {}
121    /// Set a reason for this handle to close.
122    fn close_with_reason(self, _msg: String) {}
123}
124
125pub(crate) trait ProxyableRW<'a>: Proxyable {
126    /// A type that can be used for communicating messages from the handle to the proxy code.
127    type Reader: 'a + IO<'a, Proxyable = Self, Output = ReadValue>;
128    /// A type that can be used for communicating messages from the proxy code to the handle.
129    type Writer: 'a + IO<'a, Proxyable = Self, Output = ()>;
130}
131
132pub(crate) trait IntoProxied {
133    type Proxied: Proxyable;
134    fn into_proxied(self) -> Result<Self::Proxied, Error>;
135}
136
137/// Wraps a Proxyable, adds some convenience values, and provides a nicer API.
138pub(crate) struct ProxyableHandle<Hdl: Proxyable> {
139    hdl: Hdl,
140    router: Weak<Router>,
141}
142
143impl<Hdl: Proxyable> std::fmt::Debug for ProxyableHandle<Hdl> {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        write!(f, "{:?}#{:?}", self.hdl, Weak::upgrade(&self.router).map(|r| r.node_id()))
146    }
147}
148
149impl<Hdl: Proxyable> ProxyableHandle<Hdl> {
150    pub(crate) fn new(hdl: Hdl, router: Weak<Router>) -> Self {
151        Self { hdl, router }
152    }
153
154    #[cfg(not(target_os = "fuchsia"))]
155    pub(crate) fn set_channel_proxy_protocol(&self, proto: ChannelProxyProtocol) {
156        self.hdl.set_channel_proxy_protocol(proto)
157    }
158
159    pub(crate) fn into_fidl_handle(self) -> Result<fidl::Handle, Error> {
160        self.hdl.into_fidl_handle()
161    }
162
163    pub(crate) fn close_with_reason(self, msg: String) {
164        self.hdl.close_with_reason(msg);
165    }
166
167    /// Write `msg` to the handle.
168    pub(crate) fn write<'a>(
169        &'a self,
170        msg: &'a mut Hdl::Message,
171    ) -> impl 'a + Future<Output = Result<(), zx_status::Status>> + Unpin
172    where
173        Hdl: ProxyableRW<'a>,
174    {
175        self.handle_io(msg, Hdl::Writer::new())
176    }
177
178    /// Attempt to read one `msg` from the handle.
179    /// Return Ok(Message) if a message was read.
180    /// Return Ok(SignalUpdate) if a signal was instead read.
181    /// Return Err(_) if an error occurred.
182    pub(crate) fn read<'a>(
183        &'a self,
184        msg: &'a mut Hdl::Message,
185    ) -> impl 'a + Future<Output = Result<ReadValue, zx_status::Status>> + Unpin
186    where
187        Hdl: ProxyableRW<'a>,
188    {
189        self.handle_io(msg, Hdl::Reader::new())
190    }
191
192    pub(crate) fn router(&self) -> &Weak<Router> {
193        &self.router
194    }
195
196    /// Given a signal update from the wire, apply it to the underlying handle (signalling
197    /// the peer and completing the loop).
198    pub(crate) fn apply_signal_update(&self, signal_update: SignalUpdate) -> Result<(), Error> {
199        if let Some(assert_signals) = signal_update.assert_signals {
200            self.hdl
201                .signal_peer(Signals::empty(), self::signals::from_wire_signals(assert_signals))?
202        }
203        Ok(())
204    }
205
206    fn handle_io<'a, I: 'a + IO<'a, Proxyable = Hdl>>(
207        &'a self,
208        msg: &'a mut Hdl::Message,
209        mut io: I,
210    ) -> impl 'a + Future<Output = Result<<I as IO<'a>>::Output, zx_status::Status>> + Unpin {
211        poll_fn(move |fut_ctx| io.poll_io(msg, &self.hdl, fut_ctx))
212    }
213
214    /// Drain all remaining messages from this handle and write them to `stream_writer`.
215    /// Assumes that nothing else is writing to the handle, so that getting Poll::Pending on read
216    /// is a signal that all messages have been read.
217    pub(crate) async fn drain_to_stream(
218        &self,
219        stream_writer: &mut StreamWriter<Hdl::Message>,
220    ) -> Result<(), Error>
221    where
222        Hdl: for<'a> ProxyableRW<'a>,
223    {
224        let mut message = Default::default();
225        loop {
226            let pr = self.read(&mut message).poll_unpin(&mut Context::from_waker(noop_waker_ref()));
227            match pr {
228                Poll::Pending => return Ok(()),
229                Poll::Ready(Err(e)) => return Err(e.into()),
230                Poll::Ready(Ok(ReadValue::Message)) => {
231                    stream_writer.send_data(&mut message).await?
232                }
233                Poll::Ready(Ok(ReadValue::SignalUpdate(signal_update))) => {
234                    stream_writer.send_signal(signal_update).await?
235                }
236            }
237        }
238    }
239
240    /// Drain all messages on a stream into this handle.
241    pub(crate) async fn drain_stream_to_handle(
242        self,
243        drain_stream: FramedStreamReader,
244    ) -> Result<fidl::Handle, Error>
245    where
246        Hdl: for<'a> ProxyableRW<'a>,
247    {
248        let mut drain_stream = drain_stream.bind(&self);
249        loop {
250            match drain_stream.next().await? {
251                Frame::Data(message) => self.write(message).await?,
252                Frame::SignalUpdate(signal_update) => self.apply_signal_update(signal_update)?,
253                Frame::EndTransfer => return Ok(self.hdl.into_fidl_handle()?),
254                Frame::Hello => bail!("Hello frame disallowed on drain streams"),
255                Frame::BeginTransfer(_, _) => bail!("BeginTransfer on drain stream"),
256                Frame::AckTransfer => bail!("AckTransfer on drain stream"),
257                Frame::Shutdown(r) => bail!("Stream shutdown during drain: {:?}", r),
258            }
259        }
260    }
261}