overnet_core/proxy/handle/
channel.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{
6    IntoProxied, Message, Proxyable, ProxyableRW, ReadValue, RouterHolder, Serializer, IO,
7};
8use crate::coding::{decode_fidl, encode_fidl};
9use crate::peer::PeerConnRef;
10use anyhow::{Context as _, Error};
11use fidl::{AsHandleRef, AsyncChannel, HandleBased, Peered, Signals};
12use fidl_fuchsia_overnet_protocol::{ZirconChannelMessage, ZirconHandle};
13use futures::prelude::*;
14use futures::ready;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use zx_status;
18
19#[cfg(not(target_os = "fuchsia"))]
20use fuchsia_async::emulated_handle::ChannelProxyProtocol;
21
22pub(crate) struct Channel {
23    chan: AsyncChannel,
24}
25
26impl std::fmt::Debug for Channel {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        self.chan.fmt(f)
29    }
30}
31
32impl Proxyable for Channel {
33    type Message = ChannelMessage;
34
35    fn from_fidl_handle(hdl: fidl::Handle) -> Result<Self, Error> {
36        Ok(fidl::Channel::from_handle(hdl).into_proxied()?)
37    }
38
39    fn into_fidl_handle(self) -> Result<fidl::Handle, Error> {
40        Ok(self.chan.into_zx_channel().into_handle())
41    }
42
43    fn signal_peer(&self, clear: Signals, set: Signals) -> Result<(), Error> {
44        let chan: &fidl::Channel = self.chan.as_ref();
45        chan.signal_peer(clear, set)?;
46        Ok(())
47    }
48
49    #[cfg(not(target_os = "fuchsia"))]
50    fn set_channel_proxy_protocol(&self, proto: ChannelProxyProtocol) {
51        self.chan.set_channel_proxy_protocol(proto);
52    }
53
54    #[cfg(not(target_os = "fuchsia"))]
55    fn close_with_reason(self, msg: String) {
56        self.chan.close_with_reason(msg);
57    }
58}
59
60impl<'a> ProxyableRW<'a> for Channel {
61    type Reader = ChannelReader<'a>;
62    type Writer = ChannelWriter;
63}
64
65impl IntoProxied for fidl::Channel {
66    type Proxied = Channel;
67    fn into_proxied(self) -> Result<Channel, Error> {
68        Ok(Channel { chan: AsyncChannel::from_channel(self) })
69    }
70}
71
72pub(crate) struct ChannelReader<'a> {
73    collector: super::signals::Collector<'a>,
74}
75
76impl<'a> IO<'a> for ChannelReader<'a> {
77    type Proxyable = Channel;
78    type Output = ReadValue;
79    fn new() -> ChannelReader<'a> {
80        ChannelReader { collector: Default::default() }
81    }
82    fn poll_io(
83        &mut self,
84        msg: &mut ChannelMessage,
85        channel: &'a Channel,
86        fut_ctx: &mut Context<'_>,
87    ) -> Poll<Result<ReadValue, zx_status::Status>> {
88        let read_result = channel.chan.read(fut_ctx, &mut msg.bytes, &mut msg.handles);
89        self.collector.after_read(fut_ctx, channel.chan.as_handle_ref(), read_result, false)
90    }
91}
92
93pub(crate) struct ChannelWriter;
94
95impl IO<'_> for ChannelWriter {
96    type Proxyable = Channel;
97    type Output = ();
98    fn new() -> ChannelWriter {
99        ChannelWriter
100    }
101    fn poll_io(
102        &mut self,
103        msg: &mut ChannelMessage,
104        channel: &Channel,
105        _: &mut Context<'_>,
106    ) -> Poll<Result<(), zx_status::Status>> {
107        Poll::Ready(Ok(channel.chan.write(&msg.bytes, &mut msg.handles)?))
108    }
109}
110
111#[derive(Default, Debug)]
112pub(crate) struct ChannelMessage {
113    bytes: Vec<u8>,
114    handles: Vec<fidl::Handle>,
115}
116
117impl Message for ChannelMessage {
118    type Parser = ChannelMessageParser;
119    type Serializer = ChannelMessageSerializer;
120}
121
122impl PartialEq for ChannelMessage {
123    fn eq(&self, rhs: &Self) -> bool {
124        if !self.handles.is_empty() {
125            return false;
126        }
127        if !rhs.handles.is_empty() {
128            return false;
129        }
130        return self.bytes == rhs.bytes;
131    }
132}
133
134pub(crate) enum ChannelMessageParser {
135    New,
136    Pending {
137        bytes: Vec<u8>,
138        handles: Pin<Box<dyn 'static + Send + Future<Output = Result<Vec<fidl::Handle>, Error>>>>,
139    },
140    Done,
141}
142
143impl std::fmt::Debug for ChannelMessageParser {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        match self {
146            ChannelMessageParser::New => "New",
147            ChannelMessageParser::Pending { .. } => "Pending",
148            ChannelMessageParser::Done => "Done",
149        }
150        .fmt(f)
151    }
152}
153
154impl Serializer for ChannelMessageParser {
155    type Message = ChannelMessage;
156    fn new() -> Self {
157        Self::New
158    }
159    fn poll_ser(
160        &mut self,
161        msg: &mut Self::Message,
162        serialized: &mut Vec<u8>,
163        conn: PeerConnRef<'_>,
164        router: &mut RouterHolder<'_>,
165        fut_ctx: &mut Context<'_>,
166    ) -> Poll<Result<(), Error>> {
167        log::trace!(msg:?, serialized:?, self:?; "ChannelMessageParser::poll_ser",);
168        match self {
169            ChannelMessageParser::New => {
170                let ZirconChannelMessage { mut bytes, handles: unbound_handles } =
171                    decode_fidl(serialized)?;
172                // Special case no handles case to avoid allocation dance
173                if unbound_handles.is_empty() {
174                    msg.handles.clear();
175                    std::mem::swap(&mut msg.bytes, &mut bytes);
176                    *self = ChannelMessageParser::Done;
177                    return Poll::Ready(Ok(()));
178                }
179                let closure_conn = conn.into_peer_conn();
180                let closure_router = router.get()?.clone();
181                *self = ChannelMessageParser::Pending {
182                    bytes,
183                    handles: async move {
184                        let mut handles = Vec::new();
185                        for hdl in unbound_handles.into_iter() {
186                            handles.push(
187                                closure_router
188                                    .clone()
189                                    .recv_proxied(hdl, closure_conn.as_ref())
190                                    .await?,
191                            );
192                        }
193                        Ok(handles)
194                    }
195                    .boxed(),
196                };
197                self.poll_ser(msg, serialized, conn, router, fut_ctx)
198            }
199            ChannelMessageParser::Pending { ref mut bytes, handles } => {
200                let mut handles = ready!(handles.as_mut().poll(fut_ctx))?;
201                std::mem::swap(&mut msg.handles, &mut handles);
202                std::mem::swap(&mut msg.bytes, bytes);
203                *self = ChannelMessageParser::Done;
204                Poll::Ready(Ok(()))
205            }
206            ChannelMessageParser::Done => unreachable!(),
207        }
208    }
209}
210
211pub(crate) enum ChannelMessageSerializer {
212    New,
213    Pending(Pin<Box<dyn 'static + Send + Future<Output = Result<Vec<ZirconHandle>, Error>>>>),
214    Done,
215}
216
217impl Serializer for ChannelMessageSerializer {
218    type Message = ChannelMessage;
219    fn new() -> Self {
220        Self::New
221    }
222    fn poll_ser(
223        &mut self,
224        msg: &mut Self::Message,
225        serialized: &mut Vec<u8>,
226        conn: PeerConnRef<'_>,
227        router: &mut RouterHolder<'_>,
228        fut_ctx: &mut Context<'_>,
229    ) -> Poll<Result<(), Error>> {
230        let self_val = match self {
231            ChannelMessageSerializer::New => "New",
232            ChannelMessageSerializer::Pending { .. } => "Pending",
233            ChannelMessageSerializer::Done => "Done",
234        };
235        log::trace!(msg:?, serialized:?, self = self_val; "ChannelMessageSerializer::poll_ser");
236        match self {
237            ChannelMessageSerializer::New => {
238                let handles = std::mem::replace(&mut msg.handles, Vec::new());
239                // Special case no handles case to avoid allocation dance
240                if handles.is_empty() {
241                    *serialized = encode_fidl(&mut ZirconChannelMessage {
242                        bytes: std::mem::replace(&mut msg.bytes, Vec::new()),
243                        handles: Vec::new(),
244                    })?;
245                    *self = ChannelMessageSerializer::Done;
246                    return Poll::Ready(Ok(()));
247                }
248                let closure_conn = conn.into_peer_conn();
249                let closure_router = router.get()?.clone();
250                *self = ChannelMessageSerializer::Pending(
251                    async move {
252                        let mut send_handles = Vec::new();
253                        for handle in handles {
254                            // save for debugging
255                            let raw_handle = handle.raw_handle();
256                            send_handles.push(
257                                closure_router
258                                    .send_proxied(handle, closure_conn.as_ref())
259                                    .await
260                                    .with_context(|| format!("Sending handle {:?}", raw_handle))?,
261                            );
262                        }
263                        Ok(send_handles)
264                    }
265                    .boxed(),
266                );
267                self.poll_ser(msg, serialized, conn, router, fut_ctx)
268            }
269            ChannelMessageSerializer::Pending(handles) => {
270                let handles = ready!(handles.as_mut().poll(fut_ctx))?;
271                *serialized = encode_fidl(&mut ZirconChannelMessage {
272                    bytes: std::mem::replace(&mut msg.bytes, Vec::new()),
273                    handles,
274                })?;
275                *self = ChannelMessageSerializer::Done;
276                Poll::Ready(Ok(()))
277            }
278            ChannelMessageSerializer::Done => unreachable!(),
279        }
280    }
281}