fuchsia_async/handle/zircon/
channel.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::fmt;
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll, ready};
9
10use zx::{self as zx, AsHandleRef, MessageBuf, MessageBufEtc};
11
12use crate::{OnSignalsRef, RWHandle, ReadableHandle as _};
13
14/// An I/O object representing a `Channel`.
15pub struct Channel(RWHandle<zx::Channel>);
16
17impl AsRef<zx::Channel> for Channel {
18    fn as_ref(&self) -> &zx::Channel {
19        self.0.get_ref()
20    }
21}
22
23impl AsHandleRef for Channel {
24    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
25        self.0.get_ref().as_handle_ref()
26    }
27}
28
29impl From<Channel> for zx::Channel {
30    fn from(channel: Channel) -> zx::Channel {
31        channel.0.into_inner()
32    }
33}
34
35impl Channel {
36    /// Creates a new `Channel` from a previously-created `zx::Channel`.
37    ///
38    /// # Panics
39    ///
40    /// If called outside the context of an active async executor.
41    pub fn from_channel(channel: zx::Channel) -> Self {
42        Channel(RWHandle::new(channel))
43    }
44
45    /// Consumes `self` and returns the underlying `zx::Channel`.
46    pub fn into_zx_channel(self) -> zx::Channel {
47        self.0.into_inner()
48    }
49
50    /// Returns true if the channel received the `OBJECT_PEER_CLOSED` signal.
51    pub fn is_closed(&self) -> bool {
52        self.0.is_closed()
53    }
54
55    /// Returns a future that completes when `is_closed()` is true.
56    pub fn on_closed(&self) -> OnSignalsRef<'_> {
57        self.0.on_closed()
58    }
59
60    /// Receives a message on the channel and registers this `Channel` as
61    /// needing a read on receiving a `zx::Status::SHOULD_WAIT`.
62    ///
63    /// Identical to `recv_from` except takes separate bytes and handles buffers
64    /// rather than a single `MessageBuf`.
65    pub fn read(
66        &self,
67        cx: &mut Context<'_>,
68        bytes: &mut Vec<u8>,
69        handles: &mut Vec<zx::NullableHandle>,
70    ) -> Poll<Result<(), zx::Status>> {
71        loop {
72            let res = self.0.get_ref().read_split(bytes, handles);
73            if res == Err(zx::Status::SHOULD_WAIT) {
74                ready!(self.0.need_readable(cx)?);
75            } else {
76                return Poll::Ready(res);
77            }
78        }
79    }
80
81    /// Receives a message on the channel and registers this `Channel` as
82    /// needing a read on receiving a `zx::Status::SHOULD_WAIT`.
83    ///
84    /// Identical to `recv_etc_from` except takes separate bytes and handles
85    /// buffers rather than a single `MessageBufEtc`.
86    pub fn read_etc(
87        &self,
88        cx: &mut Context<'_>,
89        bytes: &mut Vec<u8>,
90        handles: &mut Vec<zx::HandleInfo>,
91    ) -> Poll<Result<(), zx::Status>> {
92        loop {
93            let res = self.0.get_ref().read_etc_split(bytes, handles);
94            if res == Err(zx::Status::SHOULD_WAIT) {
95                ready!(self.0.need_readable(cx)?);
96            } else {
97                return Poll::Ready(res);
98            }
99        }
100    }
101
102    /// Receives a message on the channel and registers this `Channel` as
103    /// needing a read on receiving a `zx::Status::SHOULD_WAIT`.
104    pub fn recv_from(
105        &self,
106        cx: &mut Context<'_>,
107        buf: &mut MessageBuf,
108    ) -> Poll<Result<(), zx::Status>> {
109        let (bytes, handles) = buf.split_mut();
110        self.read(cx, bytes, handles)
111    }
112
113    /// Receives a message on the channel and registers this `Channel` as
114    /// needing a read on receiving a `zx::Status::SHOULD_WAIT`.
115    pub fn recv_etc_from(
116        &self,
117        cx: &mut Context<'_>,
118        buf: &mut MessageBufEtc,
119    ) -> Poll<Result<(), zx::Status>> {
120        let (bytes, handles) = buf.split_mut();
121        self.read_etc(cx, bytes, handles)
122    }
123
124    /// Creates a future that receive a message to be written to the buffer
125    /// provided.
126    ///
127    /// The returned future will return after a message has been received on
128    /// this socket and been placed into the buffer.
129    pub fn recv_msg<'a>(&'a self, buf: &'a mut MessageBuf) -> RecvMsg<'a> {
130        RecvMsg { channel: self, buf }
131    }
132
133    /// Creates a future that receive a message to be written to the buffer
134    /// provided.
135    ///
136    /// The returned future will return after a message has been received on
137    /// this socket and been placed into the buffer.
138    pub fn recv_etc_msg<'a>(&'a self, buf: &'a mut MessageBufEtc) -> RecvEtcMsg<'a> {
139        RecvEtcMsg { channel: self, buf }
140    }
141
142    /// Writes a message into the channel.
143    pub fn write(
144        &self,
145        bytes: &[u8],
146        handles: &mut [zx::NullableHandle],
147    ) -> Result<(), zx::Status> {
148        self.0.get_ref().write(bytes, handles)
149    }
150
151    /// Writes a message into the channel.
152    pub fn write_etc(
153        &self,
154        bytes: &[u8],
155        handles: &mut [zx::HandleDisposition<'_>],
156    ) -> Result<(), zx::Status> {
157        self.0.get_ref().write_etc(bytes, handles)
158    }
159}
160
161impl fmt::Debug for Channel {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        self.0.get_ref().fmt(f)
164    }
165}
166
167/// A future used to receive a message from a channel.
168///
169/// This is created by the `Channel::recv_msg` method.
170#[must_use = "futures do nothing unless polled"]
171pub struct RecvMsg<'a> {
172    channel: &'a Channel,
173    buf: &'a mut MessageBuf,
174}
175
176impl<'a> Future for RecvMsg<'a> {
177    type Output = Result<(), zx::Status>;
178
179    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
180        let this = &mut *self;
181        this.channel.recv_from(cx, this.buf)
182    }
183}
184/// A future used to receive a message from a channel.
185///
186/// This is created by the `Channel::recv_etc_msg` method.
187#[must_use = "futures do nothing unless polled"]
188pub struct RecvEtcMsg<'a> {
189    channel: &'a Channel,
190    buf: &'a mut MessageBufEtc,
191}
192
193impl<'a> Future for RecvEtcMsg<'a> {
194    type Output = Result<(), zx::Status>;
195
196    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
197        let this = &mut *self;
198        this.channel.recv_etc_from(cx, this.buf)
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::TestExecutor;
206
207    use futures::task::{ArcWake, waker};
208    use std::future::poll_fn;
209    use std::mem;
210    use std::pin::pin;
211    use std::sync::Arc;
212
213    #[test]
214    fn can_receive() {
215        let mut exec = TestExecutor::new();
216        let bytes = &[0, 1, 2, 3];
217
218        let (tx, rx) = zx::Channel::create();
219        let f_rx = Channel::from_channel(rx);
220
221        let mut receiver = pin!(async move {
222            let mut buffer = MessageBuf::new();
223            f_rx.recv_msg(&mut buffer).await.expect("failed to receive message");
224            assert_eq!(bytes, buffer.bytes());
225        });
226
227        assert!(exec.run_until_stalled(&mut receiver).is_pending());
228
229        let mut handles = Vec::new();
230        tx.write(bytes, &mut handles).expect("failed to write message");
231
232        assert!(exec.run_until_stalled(&mut receiver).is_ready());
233    }
234
235    #[test]
236    fn can_receive_etc() {
237        let mut exec = TestExecutor::new();
238        let bytes = &[0, 1, 2, 3];
239
240        let (tx, rx) = zx::Channel::create();
241        let f_rx = Channel::from_channel(rx);
242
243        let mut receiver = pin!(async move {
244            let mut buffer = MessageBufEtc::new();
245            f_rx.recv_etc_msg(&mut buffer).await.expect("failed to receive message");
246            assert_eq!(bytes, buffer.bytes());
247        });
248
249        assert!(exec.run_until_stalled(&mut receiver).is_pending());
250
251        let mut handles = Vec::new();
252        tx.write_etc(bytes, &mut handles).expect("failed to write message");
253
254        assert!(exec.run_until_stalled(&mut receiver).is_ready());
255    }
256
257    #[test]
258    fn key_reuse() {
259        let mut exec = TestExecutor::new();
260        let (tx0, rx0) = zx::Channel::create();
261        let (_tx1, rx1) = zx::Channel::create();
262        let f_rx0 = Channel::from_channel(rx0);
263        mem::drop(tx0);
264        mem::drop(f_rx0);
265        let f_rx1 = Channel::from_channel(rx1);
266        // f_rx0 and f_rx1 use the same key.
267        let mut receiver = pin!(async move {
268            let mut buffer = MessageBuf::new();
269            f_rx1.recv_msg(&mut buffer).await.expect("failed to receive message");
270        });
271
272        assert!(exec.run_until_stalled(&mut receiver).is_pending());
273    }
274
275    #[test]
276    fn key_reuse_etc() {
277        let mut exec = TestExecutor::new();
278        let (tx0, rx0) = zx::Channel::create();
279        let (_tx1, rx1) = zx::Channel::create();
280        let f_rx0 = Channel::from_channel(rx0);
281        mem::drop(tx0);
282        mem::drop(f_rx0);
283        let f_rx1 = Channel::from_channel(rx1);
284        // f_rx0 and f_rx1 use the same key.
285        let mut receiver = pin!(async move {
286            let mut buffer = MessageBufEtc::new();
287            f_rx1.recv_etc_msg(&mut buffer).await.expect("failed to receive message");
288        });
289
290        assert!(exec.run_until_stalled(&mut receiver).is_pending());
291    }
292
293    #[test]
294    fn test_always_polls_channel() {
295        let mut exec = TestExecutor::new();
296
297        let (rx, tx) = zx::Channel::create();
298        let rx_channel = Channel::from_channel(rx);
299
300        let mut fut = pin!(poll_fn(|cx| {
301            let mut bytes = Vec::with_capacity(64);
302            let mut handles = Vec::new();
303            rx_channel.read(cx, &mut bytes, &mut handles)
304        }));
305
306        assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
307
308        tx.write(b"hello", &mut []).expect("write failed");
309
310        struct Waker;
311        impl ArcWake for Waker {
312            fn wake_by_ref(_arc_self: &Arc<Self>) {}
313        }
314
315        // Poll the future directly which guarantees the port notification for the write hasn't
316        // arrived.
317        assert_eq!(
318            fut.poll(&mut Context::from_waker(&waker(Arc::new(Waker)))),
319            Poll::Ready(Ok(()))
320        );
321    }
322}