overnet_core/proxy/handle/
channel.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{
6    IO, IntoProxied, Message, Proxyable, ProxyableRW, ReadValue, RouterHolder, Serializer,
7};
8use crate::coding::{decode_fidl, encode_fidl};
9use crate::peer::PeerConnRef;
10use anyhow::{Context as _, Error};
11use fidl::{AsHandleRef, AsyncChannel, HandleBased, Peered, Signals};
12use fidl_fuchsia_overnet_protocol::{ZirconChannelMessage, ZirconHandle};
13use futures::prelude::*;
14use futures::ready;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use zx_status;
18
19pub(crate) struct Channel {
20    chan: AsyncChannel,
21}
22
23impl std::fmt::Debug for Channel {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        self.chan.fmt(f)
26    }
27}
28
29impl Proxyable for Channel {
30    type Message = ChannelMessage;
31
32    fn from_fidl_handle(hdl: fidl::NullableHandle) -> Result<Self, Error> {
33        Ok(fidl::Channel::from_handle(hdl).into_proxied()?)
34    }
35
36    fn into_fidl_handle(self) -> Result<fidl::NullableHandle, Error> {
37        Ok(self.chan.into_zx_channel().into_handle())
38    }
39
40    fn signal_peer(&self, clear: Signals, set: Signals) -> Result<(), Error> {
41        let chan: &fidl::Channel = self.chan.as_ref();
42        chan.signal_peer(clear, set)?;
43        Ok(())
44    }
45
46    #[cfg(not(target_os = "fuchsia"))]
47    fn close_with_reason(self, msg: String) {
48        self.chan.close_with_reason(msg);
49    }
50}
51
52impl<'a> ProxyableRW<'a> for Channel {
53    type Reader = ChannelReader<'a>;
54    type Writer = ChannelWriter;
55}
56
57impl IntoProxied for fidl::Channel {
58    type Proxied = Channel;
59    fn into_proxied(self) -> Result<Channel, Error> {
60        Ok(Channel { chan: AsyncChannel::from_channel(self) })
61    }
62}
63
64pub(crate) struct ChannelReader<'a> {
65    collector: super::signals::Collector<'a>,
66}
67
68impl<'a> IO<'a> for ChannelReader<'a> {
69    type Proxyable = Channel;
70    type Output = ReadValue;
71    fn new() -> ChannelReader<'a> {
72        ChannelReader { collector: Default::default() }
73    }
74    fn poll_io(
75        &mut self,
76        msg: &mut ChannelMessage,
77        channel: &'a Channel,
78        fut_ctx: &mut Context<'_>,
79    ) -> Poll<Result<ReadValue, zx_status::Status>> {
80        let read_result = channel.chan.read(fut_ctx, &mut msg.bytes, &mut msg.handles);
81        self.collector.after_read(fut_ctx, channel.chan.as_handle_ref(), read_result, false)
82    }
83}
84
85pub(crate) struct ChannelWriter;
86
87impl IO<'_> for ChannelWriter {
88    type Proxyable = Channel;
89    type Output = ();
90    fn new() -> ChannelWriter {
91        ChannelWriter
92    }
93    fn poll_io(
94        &mut self,
95        msg: &mut ChannelMessage,
96        channel: &Channel,
97        _: &mut Context<'_>,
98    ) -> Poll<Result<(), zx_status::Status>> {
99        Poll::Ready(Ok(channel.chan.write(&msg.bytes, &mut msg.handles)?))
100    }
101}
102
103#[derive(Default, Debug)]
104pub(crate) struct ChannelMessage {
105    bytes: Vec<u8>,
106    handles: Vec<fidl::NullableHandle>,
107}
108
109impl Message for ChannelMessage {
110    type Parser = ChannelMessageParser;
111    type Serializer = ChannelMessageSerializer;
112}
113
114impl PartialEq for ChannelMessage {
115    fn eq(&self, rhs: &Self) -> bool {
116        if !self.handles.is_empty() {
117            return false;
118        }
119        if !rhs.handles.is_empty() {
120            return false;
121        }
122        return self.bytes == rhs.bytes;
123    }
124}
125
126pub(crate) enum ChannelMessageParser {
127    New,
128    Pending {
129        bytes: Vec<u8>,
130        handles: Pin<
131            Box<dyn 'static + Send + Future<Output = Result<Vec<fidl::NullableHandle>, Error>>>,
132        >,
133    },
134    Done,
135}
136
137impl std::fmt::Debug for ChannelMessageParser {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        match self {
140            ChannelMessageParser::New => "New",
141            ChannelMessageParser::Pending { .. } => "Pending",
142            ChannelMessageParser::Done => "Done",
143        }
144        .fmt(f)
145    }
146}
147
148impl Serializer for ChannelMessageParser {
149    type Message = ChannelMessage;
150    fn new() -> Self {
151        Self::New
152    }
153    fn poll_ser(
154        &mut self,
155        msg: &mut Self::Message,
156        serialized: &mut Vec<u8>,
157        conn: PeerConnRef<'_>,
158        router: &mut RouterHolder<'_>,
159        fut_ctx: &mut Context<'_>,
160    ) -> Poll<Result<(), Error>> {
161        log::trace!(msg:?, serialized:?, self:?; "ChannelMessageParser::poll_ser",);
162        match self {
163            ChannelMessageParser::New => {
164                let ZirconChannelMessage { mut bytes, handles: unbound_handles } =
165                    decode_fidl(serialized)?;
166                // Special case no handles case to avoid allocation dance
167                if unbound_handles.is_empty() {
168                    msg.handles.clear();
169                    std::mem::swap(&mut msg.bytes, &mut bytes);
170                    *self = ChannelMessageParser::Done;
171                    return Poll::Ready(Ok(()));
172                }
173                let closure_conn = conn.into_peer_conn();
174                let closure_router = router.get()?.clone();
175                *self = ChannelMessageParser::Pending {
176                    bytes,
177                    handles: async move {
178                        let mut handles = Vec::new();
179                        for hdl in unbound_handles.into_iter() {
180                            handles.push(
181                                closure_router
182                                    .clone()
183                                    .recv_proxied(hdl, closure_conn.as_ref())
184                                    .await?,
185                            );
186                        }
187                        Ok(handles)
188                    }
189                    .boxed(),
190                };
191                self.poll_ser(msg, serialized, conn, router, fut_ctx)
192            }
193            ChannelMessageParser::Pending { bytes, handles } => {
194                let mut handles = ready!(handles.as_mut().poll(fut_ctx))?;
195                std::mem::swap(&mut msg.handles, &mut handles);
196                std::mem::swap(&mut msg.bytes, bytes);
197                *self = ChannelMessageParser::Done;
198                Poll::Ready(Ok(()))
199            }
200            ChannelMessageParser::Done => unreachable!(),
201        }
202    }
203}
204
205pub(crate) enum ChannelMessageSerializer {
206    New,
207    Pending(Pin<Box<dyn 'static + Send + Future<Output = Result<Vec<ZirconHandle>, Error>>>>),
208    Done,
209}
210
211impl Serializer for ChannelMessageSerializer {
212    type Message = ChannelMessage;
213    fn new() -> Self {
214        Self::New
215    }
216    fn poll_ser(
217        &mut self,
218        msg: &mut Self::Message,
219        serialized: &mut Vec<u8>,
220        conn: PeerConnRef<'_>,
221        router: &mut RouterHolder<'_>,
222        fut_ctx: &mut Context<'_>,
223    ) -> Poll<Result<(), Error>> {
224        let self_val = match self {
225            ChannelMessageSerializer::New => "New",
226            ChannelMessageSerializer::Pending { .. } => "Pending",
227            ChannelMessageSerializer::Done => "Done",
228        };
229        log::trace!(msg:?, serialized:?, self = self_val; "ChannelMessageSerializer::poll_ser");
230        match self {
231            ChannelMessageSerializer::New => {
232                let handles = std::mem::replace(&mut msg.handles, Vec::new());
233                // Special case no handles case to avoid allocation dance
234                if handles.is_empty() {
235                    *serialized = encode_fidl(&mut ZirconChannelMessage {
236                        bytes: std::mem::replace(&mut msg.bytes, Vec::new()),
237                        handles: Vec::new(),
238                    })?;
239                    *self = ChannelMessageSerializer::Done;
240                    return Poll::Ready(Ok(()));
241                }
242                let closure_conn = conn.into_peer_conn();
243                let closure_router = router.get()?.clone();
244                *self = ChannelMessageSerializer::Pending(
245                    async move {
246                        let mut send_handles = Vec::new();
247                        for handle in handles {
248                            // save for debugging
249                            let raw_handle = handle.raw_handle();
250                            send_handles.push(
251                                closure_router
252                                    .send_proxied(handle, closure_conn.as_ref())
253                                    .await
254                                    .with_context(|| format!("Sending handle {:?}", raw_handle))?,
255                            );
256                        }
257                        Ok(send_handles)
258                    }
259                    .boxed(),
260                );
261                self.poll_ser(msg, serialized, conn, router, fut_ctx)
262            }
263            ChannelMessageSerializer::Pending(handles) => {
264                let handles = ready!(handles.as_mut().poll(fut_ctx))?;
265                *serialized = encode_fidl(&mut ZirconChannelMessage {
266                    bytes: std::mem::replace(&mut msg.bytes, Vec::new()),
267                    handles,
268                })?;
269                *self = ChannelMessageSerializer::Done;
270                Poll::Ready(Ok(()))
271            }
272            ChannelMessageSerializer::Done => unreachable!(),
273        }
274    }
275}