overnet_core/proxy/handle/
channel.rs1use super::{
6 IO, IntoProxied, Message, Proxyable, ProxyableRW, ReadValue, RouterHolder, Serializer,
7};
8use crate::coding::{decode_fidl, encode_fidl};
9use crate::peer::PeerConnRef;
10use anyhow::{Context as _, Error};
11use fidl::{AsHandleRef, AsyncChannel, HandleBased, Peered, Signals};
12use fidl_fuchsia_overnet_protocol::{ZirconChannelMessage, ZirconHandle};
13use futures::prelude::*;
14use futures::ready;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use zx_status;
18
19pub(crate) struct Channel {
20 chan: AsyncChannel,
21}
22
23impl std::fmt::Debug for Channel {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 self.chan.fmt(f)
26 }
27}
28
29impl Proxyable for Channel {
30 type Message = ChannelMessage;
31
32 fn from_fidl_handle(hdl: fidl::NullableHandle) -> Result<Self, Error> {
33 Ok(fidl::Channel::from_handle(hdl).into_proxied()?)
34 }
35
36 fn into_fidl_handle(self) -> Result<fidl::NullableHandle, Error> {
37 Ok(self.chan.into_zx_channel().into_handle())
38 }
39
40 fn signal_peer(&self, clear: Signals, set: Signals) -> Result<(), Error> {
41 let chan: &fidl::Channel = self.chan.as_ref();
42 chan.signal_peer(clear, set)?;
43 Ok(())
44 }
45
46 #[cfg(not(target_os = "fuchsia"))]
47 fn close_with_reason(self, msg: String) {
48 self.chan.close_with_reason(msg);
49 }
50}
51
52impl<'a> ProxyableRW<'a> for Channel {
53 type Reader = ChannelReader<'a>;
54 type Writer = ChannelWriter;
55}
56
57impl IntoProxied for fidl::Channel {
58 type Proxied = Channel;
59 fn into_proxied(self) -> Result<Channel, Error> {
60 Ok(Channel { chan: AsyncChannel::from_channel(self) })
61 }
62}
63
64pub(crate) struct ChannelReader<'a> {
65 collector: super::signals::Collector<'a>,
66}
67
68impl<'a> IO<'a> for ChannelReader<'a> {
69 type Proxyable = Channel;
70 type Output = ReadValue;
71 fn new() -> ChannelReader<'a> {
72 ChannelReader { collector: Default::default() }
73 }
74 fn poll_io(
75 &mut self,
76 msg: &mut ChannelMessage,
77 channel: &'a Channel,
78 fut_ctx: &mut Context<'_>,
79 ) -> Poll<Result<ReadValue, zx_status::Status>> {
80 let read_result = channel.chan.read(fut_ctx, &mut msg.bytes, &mut msg.handles);
81 self.collector.after_read(fut_ctx, channel.chan.as_handle_ref(), read_result, false)
82 }
83}
84
85pub(crate) struct ChannelWriter;
86
87impl IO<'_> for ChannelWriter {
88 type Proxyable = Channel;
89 type Output = ();
90 fn new() -> ChannelWriter {
91 ChannelWriter
92 }
93 fn poll_io(
94 &mut self,
95 msg: &mut ChannelMessage,
96 channel: &Channel,
97 _: &mut Context<'_>,
98 ) -> Poll<Result<(), zx_status::Status>> {
99 Poll::Ready(Ok(channel.chan.write(&msg.bytes, &mut msg.handles)?))
100 }
101}
102
103#[derive(Default, Debug)]
104pub(crate) struct ChannelMessage {
105 bytes: Vec<u8>,
106 handles: Vec<fidl::NullableHandle>,
107}
108
109impl Message for ChannelMessage {
110 type Parser = ChannelMessageParser;
111 type Serializer = ChannelMessageSerializer;
112}
113
114impl PartialEq for ChannelMessage {
115 fn eq(&self, rhs: &Self) -> bool {
116 if !self.handles.is_empty() {
117 return false;
118 }
119 if !rhs.handles.is_empty() {
120 return false;
121 }
122 return self.bytes == rhs.bytes;
123 }
124}
125
126pub(crate) enum ChannelMessageParser {
127 New,
128 Pending {
129 bytes: Vec<u8>,
130 handles: Pin<
131 Box<dyn 'static + Send + Future<Output = Result<Vec<fidl::NullableHandle>, Error>>>,
132 >,
133 },
134 Done,
135}
136
137impl std::fmt::Debug for ChannelMessageParser {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 match self {
140 ChannelMessageParser::New => "New",
141 ChannelMessageParser::Pending { .. } => "Pending",
142 ChannelMessageParser::Done => "Done",
143 }
144 .fmt(f)
145 }
146}
147
148impl Serializer for ChannelMessageParser {
149 type Message = ChannelMessage;
150 fn new() -> Self {
151 Self::New
152 }
153 fn poll_ser(
154 &mut self,
155 msg: &mut Self::Message,
156 serialized: &mut Vec<u8>,
157 conn: PeerConnRef<'_>,
158 router: &mut RouterHolder<'_>,
159 fut_ctx: &mut Context<'_>,
160 ) -> Poll<Result<(), Error>> {
161 log::trace!(msg:?, serialized:?, self:?; "ChannelMessageParser::poll_ser",);
162 match self {
163 ChannelMessageParser::New => {
164 let ZirconChannelMessage { mut bytes, handles: unbound_handles } =
165 decode_fidl(serialized)?;
166 if unbound_handles.is_empty() {
168 msg.handles.clear();
169 std::mem::swap(&mut msg.bytes, &mut bytes);
170 *self = ChannelMessageParser::Done;
171 return Poll::Ready(Ok(()));
172 }
173 let closure_conn = conn.into_peer_conn();
174 let closure_router = router.get()?.clone();
175 *self = ChannelMessageParser::Pending {
176 bytes,
177 handles: async move {
178 let mut handles = Vec::new();
179 for hdl in unbound_handles.into_iter() {
180 handles.push(
181 closure_router
182 .clone()
183 .recv_proxied(hdl, closure_conn.as_ref())
184 .await?,
185 );
186 }
187 Ok(handles)
188 }
189 .boxed(),
190 };
191 self.poll_ser(msg, serialized, conn, router, fut_ctx)
192 }
193 ChannelMessageParser::Pending { bytes, handles } => {
194 let mut handles = ready!(handles.as_mut().poll(fut_ctx))?;
195 std::mem::swap(&mut msg.handles, &mut handles);
196 std::mem::swap(&mut msg.bytes, bytes);
197 *self = ChannelMessageParser::Done;
198 Poll::Ready(Ok(()))
199 }
200 ChannelMessageParser::Done => unreachable!(),
201 }
202 }
203}
204
205pub(crate) enum ChannelMessageSerializer {
206 New,
207 Pending(Pin<Box<dyn 'static + Send + Future<Output = Result<Vec<ZirconHandle>, Error>>>>),
208 Done,
209}
210
211impl Serializer for ChannelMessageSerializer {
212 type Message = ChannelMessage;
213 fn new() -> Self {
214 Self::New
215 }
216 fn poll_ser(
217 &mut self,
218 msg: &mut Self::Message,
219 serialized: &mut Vec<u8>,
220 conn: PeerConnRef<'_>,
221 router: &mut RouterHolder<'_>,
222 fut_ctx: &mut Context<'_>,
223 ) -> Poll<Result<(), Error>> {
224 let self_val = match self {
225 ChannelMessageSerializer::New => "New",
226 ChannelMessageSerializer::Pending { .. } => "Pending",
227 ChannelMessageSerializer::Done => "Done",
228 };
229 log::trace!(msg:?, serialized:?, self = self_val; "ChannelMessageSerializer::poll_ser");
230 match self {
231 ChannelMessageSerializer::New => {
232 let handles = std::mem::replace(&mut msg.handles, Vec::new());
233 if handles.is_empty() {
235 *serialized = encode_fidl(&mut ZirconChannelMessage {
236 bytes: std::mem::replace(&mut msg.bytes, Vec::new()),
237 handles: Vec::new(),
238 })?;
239 *self = ChannelMessageSerializer::Done;
240 return Poll::Ready(Ok(()));
241 }
242 let closure_conn = conn.into_peer_conn();
243 let closure_router = router.get()?.clone();
244 *self = ChannelMessageSerializer::Pending(
245 async move {
246 let mut send_handles = Vec::new();
247 for handle in handles {
248 let raw_handle = handle.raw_handle();
250 send_handles.push(
251 closure_router
252 .send_proxied(handle, closure_conn.as_ref())
253 .await
254 .with_context(|| format!("Sending handle {:?}", raw_handle))?,
255 );
256 }
257 Ok(send_handles)
258 }
259 .boxed(),
260 );
261 self.poll_ser(msg, serialized, conn, router, fut_ctx)
262 }
263 ChannelMessageSerializer::Pending(handles) => {
264 let handles = ready!(handles.as_mut().poll(fut_ctx))?;
265 *serialized = encode_fidl(&mut ZirconChannelMessage {
266 bytes: std::mem::replace(&mut msg.bytes, Vec::new()),
267 handles,
268 })?;
269 *self = ChannelMessageSerializer::Done;
270 Poll::Ready(Ok(()))
271 }
272 ChannelMessageSerializer::Done => unreachable!(),
273 }
274 }
275}