1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45mod channel;
6mod event_pair;
7mod signals;
8mod socket;
910use super::stream::{Frame, StreamReaderBinder, StreamWriter};
11use crate::peer::{FramedStreamReader, PeerConnRef};
12use crate::router::Router;
13use anyhow::{bail, format_err, Error};
14use fidl::Signals;
15use fidl_fuchsia_overnet_protocol::SignalUpdate;
16use futures::future::poll_fn;
17use futures::prelude::*;
18use futures::task::noop_waker_ref;
19use std::sync::{Arc, Weak};
20use std::task::{Context, Poll};
21use zx_status;
2223#[cfg(not(target_os = "fuchsia"))]
24use fuchsia_async::emulated_handle::ChannelProxyProtocol;
2526/// Holds a reference to a router.
27/// We start out `Unused` with a weak reference to the router, but various methods
28/// need said router, and so we can transition to `Used` with a reference when the router
29/// is needed.
30/// Saves some repeated upgrading of weak to arc.
31#[derive(Clone)]
32pub(crate) enum RouterHolder<'a> {
33 Unused(&'a Weak<Router>),
34 Used(Arc<Router>),
35}
3637impl<'a> std::fmt::Debug for RouterHolder<'a> {
38fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39match self {
40 RouterHolder::Unused(_) => f.write_str("Unused"),
41 RouterHolder::Used(r) => write!(f, "Used({:?})", r.node_id()),
42 }
43 }
44}
4546impl<'a> RouterHolder<'a> {
47pub(crate) fn get(&mut self) -> Result<&Arc<Router>, Error> {
48match self {
49 RouterHolder::Used(ref r) => Ok(r),
50 RouterHolder::Unused(r) => {
51*self = RouterHolder::Used(
52 Weak::upgrade(r).ok_or_else(|| format_err!("Router is closed"))?,
53 );
54self.get()
55 }
56 }
57 }
58}
5960/// Perform some IO operation on a handle.
61pub(crate) trait IO<'a>: Send {
62type Proxyable: Proxyable;
63type Output;
64fn new() -> Self;
65fn poll_io(
66&mut self,
67 msg: &mut <Self::Proxyable as Proxyable>::Message,
68 proxyable: &'a Self::Proxyable,
69 fut_ctx: &mut Context<'_>,
70 ) -> Poll<Result<Self::Output, zx_status::Status>>;
71}
7273/// Serializer defines how to read or write a message to a QUIC stream.
74/// They are usually defined in pairs (one reader, one writer).
75/// In some cases those implementations end up being the same and we leverage that to improve
76/// footprint.
77pub(crate) trait Serializer: Send {
78type Message;
79fn new() -> Self;
80fn poll_ser(
81&mut self,
82 msg: &mut Self::Message,
83 bytes: &mut Vec<u8>,
84 conn: PeerConnRef<'_>,
85 router: &mut RouterHolder<'_>,
86 fut_ctx: &mut Context<'_>,
87 ) -> Poll<Result<(), Error>>;
88}
8990/// A proxyable message - defines how to parse/serialize itself, and gets pulled
91/// in by Proxyable to also define how to send/receive itself on the right kind of handle.
92pub(crate) trait Message: Send + Sized + Default + PartialEq + std::fmt::Debug {
93/// How to parse this message type from bytes.
94type Parser: Serializer<Message = Self> + std::fmt::Debug;
95/// How to turn this message into wire bytes.
96type Serializer: Serializer<Message = Self>;
97}
9899/// The result of an IO read - either a message was received, or a signal.
100pub(crate) enum ReadValue {
101 Message,
102 SignalUpdate(SignalUpdate),
103}
104105/// An object that can be proxied.
106pub(crate) trait Proxyable: Send + Sync + Sized + std::fmt::Debug {
107/// The type of message exchanged by this handle.
108 /// This transitively also brings in types encoding how to parse/serialize messages to the
109 /// wire.
110type Message: Message;
111112/// Convert a FIDL handle into a proxyable instance (or fail).
113fn from_fidl_handle(hdl: fidl::Handle) -> Result<Self, Error>;
114/// Collapse this Proxyable instance back to the underlying FIDL handle (or fail).
115fn into_fidl_handle(self) -> Result<fidl::Handle, Error>;
116/// Clear/set signals on this handle's peer.
117fn signal_peer(&self, clear: Signals, set: Signals) -> Result<(), Error>;
118/// Let the channel (if this is one) know what protocol we're using to proxy it.
119#[cfg(not(target_os = "fuchsia"))]
120fn set_channel_proxy_protocol(&self, _proto: ChannelProxyProtocol) {}
121/// Set a reason for this handle to close.
122fn close_with_reason(self, _msg: String) {}
123}
124125pub(crate) trait ProxyableRW<'a>: Proxyable {
126/// A type that can be used for communicating messages from the handle to the proxy code.
127type Reader: 'a + IO<'a, Proxyable = Self, Output = ReadValue>;
128/// A type that can be used for communicating messages from the proxy code to the handle.
129type Writer: 'a + IO<'a, Proxyable = Self, Output = ()>;
130}
131132pub(crate) trait IntoProxied {
133type Proxied: Proxyable;
134fn into_proxied(self) -> Result<Self::Proxied, Error>;
135}
136137/// Wraps a Proxyable, adds some convenience values, and provides a nicer API.
138pub(crate) struct ProxyableHandle<Hdl: Proxyable> {
139 hdl: Hdl,
140 router: Weak<Router>,
141}
142143impl<Hdl: Proxyable> std::fmt::Debug for ProxyableHandle<Hdl> {
144fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145write!(f, "{:?}#{:?}", self.hdl, Weak::upgrade(&self.router).map(|r| r.node_id()))
146 }
147}
148149impl<Hdl: Proxyable> ProxyableHandle<Hdl> {
150pub(crate) fn new(hdl: Hdl, router: Weak<Router>) -> Self {
151Self { hdl, router }
152 }
153154#[cfg(not(target_os = "fuchsia"))]
155pub(crate) fn set_channel_proxy_protocol(&self, proto: ChannelProxyProtocol) {
156self.hdl.set_channel_proxy_protocol(proto)
157 }
158159pub(crate) fn into_fidl_handle(self) -> Result<fidl::Handle, Error> {
160self.hdl.into_fidl_handle()
161 }
162163pub(crate) fn close_with_reason(self, msg: String) {
164self.hdl.close_with_reason(msg);
165 }
166167/// Write `msg` to the handle.
168pub(crate) fn write<'a>(
169&'a self,
170 msg: &'a mut Hdl::Message,
171 ) -> impl 'a + Future<Output = Result<(), zx_status::Status>> + Unpin
172where
173Hdl: ProxyableRW<'a>,
174 {
175self.handle_io(msg, Hdl::Writer::new())
176 }
177178/// Attempt to read one `msg` from the handle.
179 /// Return Ok(Message) if a message was read.
180 /// Return Ok(SignalUpdate) if a signal was instead read.
181 /// Return Err(_) if an error occurred.
182pub(crate) fn read<'a>(
183&'a self,
184 msg: &'a mut Hdl::Message,
185 ) -> impl 'a + Future<Output = Result<ReadValue, zx_status::Status>> + Unpin
186where
187Hdl: ProxyableRW<'a>,
188 {
189self.handle_io(msg, Hdl::Reader::new())
190 }
191192pub(crate) fn router(&self) -> &Weak<Router> {
193&self.router
194 }
195196/// Given a signal update from the wire, apply it to the underlying handle (signalling
197 /// the peer and completing the loop).
198pub(crate) fn apply_signal_update(&self, signal_update: SignalUpdate) -> Result<(), Error> {
199if let Some(assert_signals) = signal_update.assert_signals {
200self.hdl
201 .signal_peer(Signals::empty(), self::signals::from_wire_signals(assert_signals))?
202}
203Ok(())
204 }
205206fn handle_io<'a, I: 'a + IO<'a, Proxyable = Hdl>>(
207&'a self,
208 msg: &'a mut Hdl::Message,
209mut io: I,
210 ) -> impl 'a + Future<Output = Result<<I as IO<'a>>::Output, zx_status::Status>> + Unpin {
211 poll_fn(move |fut_ctx| io.poll_io(msg, &self.hdl, fut_ctx))
212 }
213214/// Drain all remaining messages from this handle and write them to `stream_writer`.
215 /// Assumes that nothing else is writing to the handle, so that getting Poll::Pending on read
216 /// is a signal that all messages have been read.
217pub(crate) async fn drain_to_stream(
218&self,
219 stream_writer: &mut StreamWriter<Hdl::Message>,
220 ) -> Result<(), Error>
221where
222Hdl: for<'a> ProxyableRW<'a>,
223 {
224let mut message = Default::default();
225loop {
226let pr = self.read(&mut message).poll_unpin(&mut Context::from_waker(noop_waker_ref()));
227match pr {
228 Poll::Pending => return Ok(()),
229 Poll::Ready(Err(e)) => return Err(e.into()),
230 Poll::Ready(Ok(ReadValue::Message)) => {
231 stream_writer.send_data(&mut message).await?
232}
233 Poll::Ready(Ok(ReadValue::SignalUpdate(signal_update))) => {
234 stream_writer.send_signal(signal_update).await?
235}
236 }
237 }
238 }
239240/// Drain all messages on a stream into this handle.
241pub(crate) async fn drain_stream_to_handle(
242self,
243 drain_stream: FramedStreamReader,
244 ) -> Result<fidl::Handle, Error>
245where
246Hdl: for<'a> ProxyableRW<'a>,
247 {
248let mut drain_stream = drain_stream.bind(&self);
249loop {
250match drain_stream.next().await? {
251 Frame::Data(message) => self.write(message).await?,
252 Frame::SignalUpdate(signal_update) => self.apply_signal_update(signal_update)?,
253 Frame::EndTransfer => return Ok(self.hdl.into_fidl_handle()?),
254 Frame::Hello => bail!("Hello frame disallowed on drain streams"),
255 Frame::BeginTransfer(_, _) => bail!("BeginTransfer on drain stream"),
256 Frame::AckTransfer => bail!("AckTransfer on drain stream"),
257 Frame::Shutdown(r) => bail!("Stream shutdown during drain: {:?}", r),
258 }
259 }
260 }
261}