overnet_core/proxy/handle/
channel.rs
1use super::{
6 IntoProxied, Message, Proxyable, ProxyableRW, ReadValue, RouterHolder, Serializer, IO,
7};
8use crate::coding::{decode_fidl, encode_fidl};
9use crate::peer::PeerConnRef;
10use anyhow::{Context as _, Error};
11use fidl::{AsHandleRef, AsyncChannel, HandleBased, Peered, Signals};
12use fidl_fuchsia_overnet_protocol::{ZirconChannelMessage, ZirconHandle};
13use futures::prelude::*;
14use futures::ready;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use zx_status;
18
19#[cfg(not(target_os = "fuchsia"))]
20use fuchsia_async::emulated_handle::ChannelProxyProtocol;
21
22pub(crate) struct Channel {
23 chan: AsyncChannel,
24}
25
26impl std::fmt::Debug for Channel {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 self.chan.fmt(f)
29 }
30}
31
32impl Proxyable for Channel {
33 type Message = ChannelMessage;
34
35 fn from_fidl_handle(hdl: fidl::Handle) -> Result<Self, Error> {
36 Ok(fidl::Channel::from_handle(hdl).into_proxied()?)
37 }
38
39 fn into_fidl_handle(self) -> Result<fidl::Handle, Error> {
40 Ok(self.chan.into_zx_channel().into_handle())
41 }
42
43 fn signal_peer(&self, clear: Signals, set: Signals) -> Result<(), Error> {
44 let chan: &fidl::Channel = self.chan.as_ref();
45 chan.signal_peer(clear, set)?;
46 Ok(())
47 }
48
49 #[cfg(not(target_os = "fuchsia"))]
50 fn set_channel_proxy_protocol(&self, proto: ChannelProxyProtocol) {
51 self.chan.set_channel_proxy_protocol(proto);
52 }
53
54 #[cfg(not(target_os = "fuchsia"))]
55 fn close_with_reason(self, msg: String) {
56 self.chan.close_with_reason(msg);
57 }
58}
59
60impl<'a> ProxyableRW<'a> for Channel {
61 type Reader = ChannelReader<'a>;
62 type Writer = ChannelWriter;
63}
64
65impl IntoProxied for fidl::Channel {
66 type Proxied = Channel;
67 fn into_proxied(self) -> Result<Channel, Error> {
68 Ok(Channel { chan: AsyncChannel::from_channel(self) })
69 }
70}
71
72pub(crate) struct ChannelReader<'a> {
73 collector: super::signals::Collector<'a>,
74}
75
76impl<'a> IO<'a> for ChannelReader<'a> {
77 type Proxyable = Channel;
78 type Output = ReadValue;
79 fn new() -> ChannelReader<'a> {
80 ChannelReader { collector: Default::default() }
81 }
82 fn poll_io(
83 &mut self,
84 msg: &mut ChannelMessage,
85 channel: &'a Channel,
86 fut_ctx: &mut Context<'_>,
87 ) -> Poll<Result<ReadValue, zx_status::Status>> {
88 let read_result = channel.chan.read(fut_ctx, &mut msg.bytes, &mut msg.handles);
89 self.collector.after_read(fut_ctx, channel.chan.as_handle_ref(), read_result, false)
90 }
91}
92
93pub(crate) struct ChannelWriter;
94
95impl IO<'_> for ChannelWriter {
96 type Proxyable = Channel;
97 type Output = ();
98 fn new() -> ChannelWriter {
99 ChannelWriter
100 }
101 fn poll_io(
102 &mut self,
103 msg: &mut ChannelMessage,
104 channel: &Channel,
105 _: &mut Context<'_>,
106 ) -> Poll<Result<(), zx_status::Status>> {
107 Poll::Ready(Ok(channel.chan.write(&msg.bytes, &mut msg.handles)?))
108 }
109}
110
111#[derive(Default, Debug)]
112pub(crate) struct ChannelMessage {
113 bytes: Vec<u8>,
114 handles: Vec<fidl::Handle>,
115}
116
117impl Message for ChannelMessage {
118 type Parser = ChannelMessageParser;
119 type Serializer = ChannelMessageSerializer;
120}
121
122impl PartialEq for ChannelMessage {
123 fn eq(&self, rhs: &Self) -> bool {
124 if !self.handles.is_empty() {
125 return false;
126 }
127 if !rhs.handles.is_empty() {
128 return false;
129 }
130 return self.bytes == rhs.bytes;
131 }
132}
133
134pub(crate) enum ChannelMessageParser {
135 New,
136 Pending {
137 bytes: Vec<u8>,
138 handles: Pin<Box<dyn 'static + Send + Future<Output = Result<Vec<fidl::Handle>, Error>>>>,
139 },
140 Done,
141}
142
143impl std::fmt::Debug for ChannelMessageParser {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 match self {
146 ChannelMessageParser::New => "New",
147 ChannelMessageParser::Pending { .. } => "Pending",
148 ChannelMessageParser::Done => "Done",
149 }
150 .fmt(f)
151 }
152}
153
154impl Serializer for ChannelMessageParser {
155 type Message = ChannelMessage;
156 fn new() -> Self {
157 Self::New
158 }
159 fn poll_ser(
160 &mut self,
161 msg: &mut Self::Message,
162 serialized: &mut Vec<u8>,
163 conn: PeerConnRef<'_>,
164 router: &mut RouterHolder<'_>,
165 fut_ctx: &mut Context<'_>,
166 ) -> Poll<Result<(), Error>> {
167 log::trace!(msg:?, serialized:?, self:?; "ChannelMessageParser::poll_ser",);
168 match self {
169 ChannelMessageParser::New => {
170 let ZirconChannelMessage { mut bytes, handles: unbound_handles } =
171 decode_fidl(serialized)?;
172 if unbound_handles.is_empty() {
174 msg.handles.clear();
175 std::mem::swap(&mut msg.bytes, &mut bytes);
176 *self = ChannelMessageParser::Done;
177 return Poll::Ready(Ok(()));
178 }
179 let closure_conn = conn.into_peer_conn();
180 let closure_router = router.get()?.clone();
181 *self = ChannelMessageParser::Pending {
182 bytes,
183 handles: async move {
184 let mut handles = Vec::new();
185 for hdl in unbound_handles.into_iter() {
186 handles.push(
187 closure_router
188 .clone()
189 .recv_proxied(hdl, closure_conn.as_ref())
190 .await?,
191 );
192 }
193 Ok(handles)
194 }
195 .boxed(),
196 };
197 self.poll_ser(msg, serialized, conn, router, fut_ctx)
198 }
199 ChannelMessageParser::Pending { ref mut bytes, handles } => {
200 let mut handles = ready!(handles.as_mut().poll(fut_ctx))?;
201 std::mem::swap(&mut msg.handles, &mut handles);
202 std::mem::swap(&mut msg.bytes, bytes);
203 *self = ChannelMessageParser::Done;
204 Poll::Ready(Ok(()))
205 }
206 ChannelMessageParser::Done => unreachable!(),
207 }
208 }
209}
210
211pub(crate) enum ChannelMessageSerializer {
212 New,
213 Pending(Pin<Box<dyn 'static + Send + Future<Output = Result<Vec<ZirconHandle>, Error>>>>),
214 Done,
215}
216
217impl Serializer for ChannelMessageSerializer {
218 type Message = ChannelMessage;
219 fn new() -> Self {
220 Self::New
221 }
222 fn poll_ser(
223 &mut self,
224 msg: &mut Self::Message,
225 serialized: &mut Vec<u8>,
226 conn: PeerConnRef<'_>,
227 router: &mut RouterHolder<'_>,
228 fut_ctx: &mut Context<'_>,
229 ) -> Poll<Result<(), Error>> {
230 let self_val = match self {
231 ChannelMessageSerializer::New => "New",
232 ChannelMessageSerializer::Pending { .. } => "Pending",
233 ChannelMessageSerializer::Done => "Done",
234 };
235 log::trace!(msg:?, serialized:?, self = self_val; "ChannelMessageSerializer::poll_ser");
236 match self {
237 ChannelMessageSerializer::New => {
238 let handles = std::mem::replace(&mut msg.handles, Vec::new());
239 if handles.is_empty() {
241 *serialized = encode_fidl(&mut ZirconChannelMessage {
242 bytes: std::mem::replace(&mut msg.bytes, Vec::new()),
243 handles: Vec::new(),
244 })?;
245 *self = ChannelMessageSerializer::Done;
246 return Poll::Ready(Ok(()));
247 }
248 let closure_conn = conn.into_peer_conn();
249 let closure_router = router.get()?.clone();
250 *self = ChannelMessageSerializer::Pending(
251 async move {
252 let mut send_handles = Vec::new();
253 for handle in handles {
254 let raw_handle = handle.raw_handle();
256 send_handles.push(
257 closure_router
258 .send_proxied(handle, closure_conn.as_ref())
259 .await
260 .with_context(|| format!("Sending handle {:?}", raw_handle))?,
261 );
262 }
263 Ok(send_handles)
264 }
265 .boxed(),
266 );
267 self.poll_ser(msg, serialized, conn, router, fut_ctx)
268 }
269 ChannelMessageSerializer::Pending(handles) => {
270 let handles = ready!(handles.as_mut().poll(fut_ctx))?;
271 *serialized = encode_fidl(&mut ZirconChannelMessage {
272 bytes: std::mem::replace(&mut msg.bytes, Vec::new()),
273 handles,
274 })?;
275 *self = ChannelMessageSerializer::Done;
276 Poll::Ready(Ok(()))
277 }
278 ChannelMessageSerializer::Done => unreachable!(),
279 }
280 }
281}