overnet_core/proxy/handle/
mod.rs1mod channel;
6mod event_pair;
7mod signals;
8mod socket;
9
10use super::stream::{Frame, StreamReaderBinder, StreamWriter};
11use crate::peer::{FramedStreamReader, PeerConnRef};
12use crate::router::Router;
13use anyhow::{Error, bail, format_err};
14use fidl::Signals;
15use fidl_fuchsia_overnet_protocol::SignalUpdate;
16use futures::future::poll_fn;
17use futures::prelude::*;
18use std::sync::{Arc, Weak};
19use std::task::{Context, Poll, Waker};
20use zx_status;
21
22#[derive(Clone)]
28pub(crate) enum RouterHolder<'a> {
29 Unused(&'a Weak<Router>),
30 Used(Arc<Router>),
31}
32
33impl<'a> std::fmt::Debug for RouterHolder<'a> {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 RouterHolder::Unused(_) => f.write_str("Unused"),
37 RouterHolder::Used(r) => write!(f, "Used({:?})", r.node_id()),
38 }
39 }
40}
41
42impl<'a> RouterHolder<'a> {
43 pub(crate) fn get(&mut self) -> Result<&Arc<Router>, Error> {
44 match self {
45 RouterHolder::Used(r) => Ok(r),
46 RouterHolder::Unused(r) => {
47 *self = RouterHolder::Used(
48 Weak::upgrade(r).ok_or_else(|| format_err!("Router is closed"))?,
49 );
50 self.get()
51 }
52 }
53 }
54}
55
56pub(crate) trait IO<'a>: Send {
58 type Proxyable: Proxyable;
59 type Output;
60 fn new() -> Self;
61 fn poll_io(
62 &mut self,
63 msg: &mut <Self::Proxyable as Proxyable>::Message,
64 proxyable: &'a Self::Proxyable,
65 fut_ctx: &mut Context<'_>,
66 ) -> Poll<Result<Self::Output, zx_status::Status>>;
67}
68
69pub(crate) trait Serializer: Send {
74 type Message;
75 fn new() -> Self;
76 fn poll_ser(
77 &mut self,
78 msg: &mut Self::Message,
79 bytes: &mut Vec<u8>,
80 conn: PeerConnRef<'_>,
81 router: &mut RouterHolder<'_>,
82 fut_ctx: &mut Context<'_>,
83 ) -> Poll<Result<(), Error>>;
84}
85
86pub(crate) trait Message: Send + Sized + Default + PartialEq + std::fmt::Debug {
89 type Parser: Serializer<Message = Self> + std::fmt::Debug;
91 type Serializer: Serializer<Message = Self>;
93}
94
95pub(crate) enum ReadValue {
97 Message,
98 SignalUpdate(SignalUpdate),
99}
100
101pub(crate) trait Proxyable: Send + Sync + Sized + std::fmt::Debug {
103 type Message: Message;
107
108 fn from_fidl_handle(hdl: fidl::NullableHandle) -> Result<Self, Error>;
110 fn into_fidl_handle(self) -> Result<fidl::NullableHandle, Error>;
112 fn signal_peer(&self, clear: Signals, set: Signals) -> Result<(), Error>;
114 fn close_with_reason(self, _msg: String) {}
116}
117
118pub(crate) trait ProxyableRW<'a>: Proxyable {
119 type Reader: 'a + IO<'a, Proxyable = Self, Output = ReadValue>;
121 type Writer: 'a + IO<'a, Proxyable = Self, Output = ()>;
123}
124
125pub(crate) trait IntoProxied {
126 type Proxied: Proxyable;
127 fn into_proxied(self) -> Result<Self::Proxied, Error>;
128}
129
130pub(crate) struct ProxyableHandle<Hdl: Proxyable> {
132 hdl: Hdl,
133 router: Weak<Router>,
134}
135
136impl<Hdl: Proxyable> std::fmt::Debug for ProxyableHandle<Hdl> {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 write!(f, "{:?}#{:?}", self.hdl, Weak::upgrade(&self.router).map(|r| r.node_id()))
139 }
140}
141
142impl<Hdl: Proxyable> ProxyableHandle<Hdl> {
143 pub(crate) fn new(hdl: Hdl, router: Weak<Router>) -> Self {
144 Self { hdl, router }
145 }
146
147 pub(crate) fn into_fidl_handle(self) -> Result<fidl::NullableHandle, Error> {
148 self.hdl.into_fidl_handle()
149 }
150
151 pub(crate) fn close_with_reason(self, msg: String) {
152 self.hdl.close_with_reason(msg);
153 }
154
155 pub(crate) fn write<'a>(
157 &'a self,
158 msg: &'a mut Hdl::Message,
159 ) -> impl 'a + Future<Output = Result<(), zx_status::Status>> + Unpin
160 where
161 Hdl: ProxyableRW<'a>,
162 {
163 self.handle_io(msg, Hdl::Writer::new())
164 }
165
166 pub(crate) fn read<'a>(
171 &'a self,
172 msg: &'a mut Hdl::Message,
173 ) -> impl 'a + Future<Output = Result<ReadValue, zx_status::Status>> + Unpin
174 where
175 Hdl: ProxyableRW<'a>,
176 {
177 self.handle_io(msg, Hdl::Reader::new())
178 }
179
180 pub(crate) fn router(&self) -> &Weak<Router> {
181 &self.router
182 }
183
184 pub(crate) fn apply_signal_update(&self, signal_update: SignalUpdate) -> Result<(), Error> {
187 if let Some(assert_signals) = signal_update.assert_signals {
188 self.hdl
189 .signal_peer(Signals::empty(), self::signals::from_wire_signals(assert_signals))?
190 }
191 Ok(())
192 }
193
194 fn handle_io<'a, I: 'a + IO<'a, Proxyable = Hdl>>(
195 &'a self,
196 msg: &'a mut Hdl::Message,
197 mut io: I,
198 ) -> impl 'a + Future<Output = Result<<I as IO<'a>>::Output, zx_status::Status>> + Unpin {
199 poll_fn(move |fut_ctx| io.poll_io(msg, &self.hdl, fut_ctx))
200 }
201
202 pub(crate) async fn drain_to_stream(
206 &self,
207 stream_writer: &mut StreamWriter<Hdl::Message>,
208 ) -> Result<(), Error>
209 where
210 Hdl: for<'a> ProxyableRW<'a>,
211 {
212 let mut message = Default::default();
213 loop {
214 let pr = self.read(&mut message).poll_unpin(&mut Context::from_waker(Waker::noop()));
215 match pr {
216 Poll::Pending => return Ok(()),
217 Poll::Ready(Err(e)) => return Err(e.into()),
218 Poll::Ready(Ok(ReadValue::Message)) => {
219 stream_writer.send_data(&mut message).await?
220 }
221 Poll::Ready(Ok(ReadValue::SignalUpdate(signal_update))) => {
222 stream_writer.send_signal(signal_update).await?
223 }
224 }
225 }
226 }
227
228 pub(crate) async fn drain_stream_to_handle(
230 self,
231 drain_stream: FramedStreamReader,
232 ) -> Result<fidl::NullableHandle, Error>
233 where
234 Hdl: for<'a> ProxyableRW<'a>,
235 {
236 let mut drain_stream = drain_stream.bind(&self);
237 loop {
238 match drain_stream.next().await? {
239 Frame::Data(message) => self.write(message).await?,
240 Frame::SignalUpdate(signal_update) => self.apply_signal_update(signal_update)?,
241 Frame::EndTransfer => return Ok(self.hdl.into_fidl_handle()?),
242 Frame::Hello => bail!("Hello frame disallowed on drain streams"),
243 Frame::BeginTransfer(_, _) => bail!("BeginTransfer on drain stream"),
244 Frame::AckTransfer => bail!("AckTransfer on drain stream"),
245 Frame::Shutdown(r) => bail!("Stream shutdown during drain: {:?}", r),
246 }
247 }
248 }
249}