1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45mod channel;
6mod event_pair;
7mod signals;
8mod socket;
910use super::stream::{Frame, StreamReaderBinder, StreamWriter};
11use crate::peer::{FramedStreamReader, PeerConnRef};
12use crate::router::Router;
13use anyhow::{bail, format_err, Error};
14use fidl::Signals;
15use fidl_fuchsia_overnet_protocol::SignalUpdate;
16use futures::future::poll_fn;
17use futures::prelude::*;
18use futures::task::noop_waker_ref;
19use std::sync::{Arc, Weak};
20use std::task::{Context, Poll};
21use zx_status;
2223/// Holds a reference to a router.
24/// We start out `Unused` with a weak reference to the router, but various methods
25/// need said router, and so we can transition to `Used` with a reference when the router
26/// is needed.
27/// Saves some repeated upgrading of weak to arc.
28#[derive(Clone)]
29pub(crate) enum RouterHolder<'a> {
30 Unused(&'a Weak<Router>),
31 Used(Arc<Router>),
32}
3334impl<'a> std::fmt::Debug for RouterHolder<'a> {
35fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36match self {
37 RouterHolder::Unused(_) => f.write_str("Unused"),
38 RouterHolder::Used(r) => write!(f, "Used({:?})", r.node_id()),
39 }
40 }
41}
4243impl<'a> RouterHolder<'a> {
44pub(crate) fn get(&mut self) -> Result<&Arc<Router>, Error> {
45match self {
46 RouterHolder::Used(ref r) => Ok(r),
47 RouterHolder::Unused(r) => {
48*self = RouterHolder::Used(
49 Weak::upgrade(r).ok_or_else(|| format_err!("Router is closed"))?,
50 );
51self.get()
52 }
53 }
54 }
55}
5657/// Perform some IO operation on a handle.
58pub(crate) trait IO<'a>: Send {
59type Proxyable: Proxyable;
60type Output;
61fn new() -> Self;
62fn poll_io(
63&mut self,
64 msg: &mut <Self::Proxyable as Proxyable>::Message,
65 proxyable: &'a Self::Proxyable,
66 fut_ctx: &mut Context<'_>,
67 ) -> Poll<Result<Self::Output, zx_status::Status>>;
68}
6970/// Serializer defines how to read or write a message to a QUIC stream.
71/// They are usually defined in pairs (one reader, one writer).
72/// In some cases those implementations end up being the same and we leverage that to improve
73/// footprint.
74pub(crate) trait Serializer: Send {
75type Message;
76fn new() -> Self;
77fn poll_ser(
78&mut self,
79 msg: &mut Self::Message,
80 bytes: &mut Vec<u8>,
81 conn: PeerConnRef<'_>,
82 router: &mut RouterHolder<'_>,
83 fut_ctx: &mut Context<'_>,
84 ) -> Poll<Result<(), Error>>;
85}
8687/// A proxyable message - defines how to parse/serialize itself, and gets pulled
88/// in by Proxyable to also define how to send/receive itself on the right kind of handle.
89pub(crate) trait Message: Send + Sized + Default + PartialEq + std::fmt::Debug {
90/// How to parse this message type from bytes.
91type Parser: Serializer<Message = Self> + std::fmt::Debug;
92/// How to turn this message into wire bytes.
93type Serializer: Serializer<Message = Self>;
94}
9596/// The result of an IO read - either a message was received, or a signal.
97pub(crate) enum ReadValue {
98 Message,
99 SignalUpdate(SignalUpdate),
100}
101102/// An object that can be proxied.
103pub(crate) trait Proxyable: Send + Sync + Sized + std::fmt::Debug {
104/// The type of message exchanged by this handle.
105 /// This transitively also brings in types encoding how to parse/serialize messages to the
106 /// wire.
107type Message: Message;
108109/// Convert a FIDL handle into a proxyable instance (or fail).
110fn from_fidl_handle(hdl: fidl::Handle) -> Result<Self, Error>;
111/// Collapse this Proxyable instance back to the underlying FIDL handle (or fail).
112fn into_fidl_handle(self) -> Result<fidl::Handle, Error>;
113/// Clear/set signals on this handle's peer.
114fn signal_peer(&self, clear: Signals, set: Signals) -> Result<(), Error>;
115/// Set a reason for this handle to close.
116fn close_with_reason(self, _msg: String) {}
117}
118119pub(crate) trait ProxyableRW<'a>: Proxyable {
120/// A type that can be used for communicating messages from the handle to the proxy code.
121type Reader: 'a + IO<'a, Proxyable = Self, Output = ReadValue>;
122/// A type that can be used for communicating messages from the proxy code to the handle.
123type Writer: 'a + IO<'a, Proxyable = Self, Output = ()>;
124}
125126pub(crate) trait IntoProxied {
127type Proxied: Proxyable;
128fn into_proxied(self) -> Result<Self::Proxied, Error>;
129}
130131/// Wraps a Proxyable, adds some convenience values, and provides a nicer API.
132pub(crate) struct ProxyableHandle<Hdl: Proxyable> {
133 hdl: Hdl,
134 router: Weak<Router>,
135}
136137impl<Hdl: Proxyable> std::fmt::Debug for ProxyableHandle<Hdl> {
138fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139write!(f, "{:?}#{:?}", self.hdl, Weak::upgrade(&self.router).map(|r| r.node_id()))
140 }
141}
142143impl<Hdl: Proxyable> ProxyableHandle<Hdl> {
144pub(crate) fn new(hdl: Hdl, router: Weak<Router>) -> Self {
145Self { hdl, router }
146 }
147148pub(crate) fn into_fidl_handle(self) -> Result<fidl::Handle, Error> {
149self.hdl.into_fidl_handle()
150 }
151152pub(crate) fn close_with_reason(self, msg: String) {
153self.hdl.close_with_reason(msg);
154 }
155156/// Write `msg` to the handle.
157pub(crate) fn write<'a>(
158&'a self,
159 msg: &'a mut Hdl::Message,
160 ) -> impl 'a + Future<Output = Result<(), zx_status::Status>> + Unpin
161where
162Hdl: ProxyableRW<'a>,
163 {
164self.handle_io(msg, Hdl::Writer::new())
165 }
166167/// Attempt to read one `msg` from the handle.
168 /// Return Ok(Message) if a message was read.
169 /// Return Ok(SignalUpdate) if a signal was instead read.
170 /// Return Err(_) if an error occurred.
171pub(crate) fn read<'a>(
172&'a self,
173 msg: &'a mut Hdl::Message,
174 ) -> impl 'a + Future<Output = Result<ReadValue, zx_status::Status>> + Unpin
175where
176Hdl: ProxyableRW<'a>,
177 {
178self.handle_io(msg, Hdl::Reader::new())
179 }
180181pub(crate) fn router(&self) -> &Weak<Router> {
182&self.router
183 }
184185/// Given a signal update from the wire, apply it to the underlying handle (signalling
186 /// the peer and completing the loop).
187pub(crate) fn apply_signal_update(&self, signal_update: SignalUpdate) -> Result<(), Error> {
188if let Some(assert_signals) = signal_update.assert_signals {
189self.hdl
190 .signal_peer(Signals::empty(), self::signals::from_wire_signals(assert_signals))?
191}
192Ok(())
193 }
194195fn handle_io<'a, I: 'a + IO<'a, Proxyable = Hdl>>(
196&'a self,
197 msg: &'a mut Hdl::Message,
198mut io: I,
199 ) -> impl 'a + Future<Output = Result<<I as IO<'a>>::Output, zx_status::Status>> + Unpin {
200 poll_fn(move |fut_ctx| io.poll_io(msg, &self.hdl, fut_ctx))
201 }
202203/// Drain all remaining messages from this handle and write them to `stream_writer`.
204 /// Assumes that nothing else is writing to the handle, so that getting Poll::Pending on read
205 /// is a signal that all messages have been read.
206pub(crate) async fn drain_to_stream(
207&self,
208 stream_writer: &mut StreamWriter<Hdl::Message>,
209 ) -> Result<(), Error>
210where
211Hdl: for<'a> ProxyableRW<'a>,
212 {
213let mut message = Default::default();
214loop {
215let pr = self.read(&mut message).poll_unpin(&mut Context::from_waker(noop_waker_ref()));
216match pr {
217 Poll::Pending => return Ok(()),
218 Poll::Ready(Err(e)) => return Err(e.into()),
219 Poll::Ready(Ok(ReadValue::Message)) => {
220 stream_writer.send_data(&mut message).await?
221}
222 Poll::Ready(Ok(ReadValue::SignalUpdate(signal_update))) => {
223 stream_writer.send_signal(signal_update).await?
224}
225 }
226 }
227 }
228229/// Drain all messages on a stream into this handle.
230pub(crate) async fn drain_stream_to_handle(
231self,
232 drain_stream: FramedStreamReader,
233 ) -> Result<fidl::Handle, Error>
234where
235Hdl: for<'a> ProxyableRW<'a>,
236 {
237let mut drain_stream = drain_stream.bind(&self);
238loop {
239match drain_stream.next().await? {
240 Frame::Data(message) => self.write(message).await?,
241 Frame::SignalUpdate(signal_update) => self.apply_signal_update(signal_update)?,
242 Frame::EndTransfer => return Ok(self.hdl.into_fidl_handle()?),
243 Frame::Hello => bail!("Hello frame disallowed on drain streams"),
244 Frame::BeginTransfer(_, _) => bail!("BeginTransfer on drain stream"),
245 Frame::AckTransfer => bail!("AckTransfer on drain stream"),
246 Frame::Shutdown(r) => bail!("Stream shutdown during drain: {:?}", r),
247 }
248 }
249 }
250}