fuchsia_async/handle/zircon/
channel.rs1use std::fmt;
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll, ready};
9
10use zx::{self as zx, AsHandleRef, MessageBuf, MessageBufEtc};
11
12use crate::{OnSignalsRef, RWHandle, ReadableHandle as _};
13
14pub struct Channel(RWHandle<zx::Channel>);
16
17impl AsRef<zx::Channel> for Channel {
18 fn as_ref(&self) -> &zx::Channel {
19 self.0.get_ref()
20 }
21}
22
23impl AsHandleRef for Channel {
24 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
25 self.0.get_ref().as_handle_ref()
26 }
27}
28
29impl From<Channel> for zx::Channel {
30 fn from(channel: Channel) -> zx::Channel {
31 channel.0.into_inner()
32 }
33}
34
35impl Channel {
36 pub fn from_channel(channel: zx::Channel) -> Self {
42 Channel(RWHandle::new(channel))
43 }
44
45 pub fn into_zx_channel(self) -> zx::Channel {
47 self.0.into_inner()
48 }
49
50 pub fn is_closed(&self) -> bool {
52 self.0.is_closed()
53 }
54
55 pub fn on_closed(&self) -> OnSignalsRef<'_> {
57 self.0.on_closed()
58 }
59
60 pub fn read(
66 &self,
67 cx: &mut Context<'_>,
68 bytes: &mut Vec<u8>,
69 handles: &mut Vec<zx::NullableHandle>,
70 ) -> Poll<Result<(), zx::Status>> {
71 loop {
72 let res = self.0.get_ref().read_split(bytes, handles);
73 if res == Err(zx::Status::SHOULD_WAIT) {
74 ready!(self.0.need_readable(cx)?);
75 } else {
76 return Poll::Ready(res);
77 }
78 }
79 }
80
81 pub fn read_etc(
87 &self,
88 cx: &mut Context<'_>,
89 bytes: &mut Vec<u8>,
90 handles: &mut Vec<zx::HandleInfo>,
91 ) -> Poll<Result<(), zx::Status>> {
92 loop {
93 let res = self.0.get_ref().read_etc_split(bytes, handles);
94 if res == Err(zx::Status::SHOULD_WAIT) {
95 ready!(self.0.need_readable(cx)?);
96 } else {
97 return Poll::Ready(res);
98 }
99 }
100 }
101
102 pub fn recv_from(
105 &self,
106 cx: &mut Context<'_>,
107 buf: &mut MessageBuf,
108 ) -> Poll<Result<(), zx::Status>> {
109 let (bytes, handles) = buf.split_mut();
110 self.read(cx, bytes, handles)
111 }
112
113 pub fn recv_etc_from(
116 &self,
117 cx: &mut Context<'_>,
118 buf: &mut MessageBufEtc,
119 ) -> Poll<Result<(), zx::Status>> {
120 let (bytes, handles) = buf.split_mut();
121 self.read_etc(cx, bytes, handles)
122 }
123
124 pub fn recv_msg<'a>(&'a self, buf: &'a mut MessageBuf) -> RecvMsg<'a> {
130 RecvMsg { channel: self, buf }
131 }
132
133 pub fn recv_etc_msg<'a>(&'a self, buf: &'a mut MessageBufEtc) -> RecvEtcMsg<'a> {
139 RecvEtcMsg { channel: self, buf }
140 }
141
142 pub fn write(
144 &self,
145 bytes: &[u8],
146 handles: &mut [zx::NullableHandle],
147 ) -> Result<(), zx::Status> {
148 self.0.get_ref().write(bytes, handles)
149 }
150
151 pub fn write_etc(
153 &self,
154 bytes: &[u8],
155 handles: &mut [zx::HandleDisposition<'_>],
156 ) -> Result<(), zx::Status> {
157 self.0.get_ref().write_etc(bytes, handles)
158 }
159}
160
161impl fmt::Debug for Channel {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 self.0.get_ref().fmt(f)
164 }
165}
166
167#[must_use = "futures do nothing unless polled"]
171pub struct RecvMsg<'a> {
172 channel: &'a Channel,
173 buf: &'a mut MessageBuf,
174}
175
176impl<'a> Future for RecvMsg<'a> {
177 type Output = Result<(), zx::Status>;
178
179 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
180 let this = &mut *self;
181 this.channel.recv_from(cx, this.buf)
182 }
183}
184#[must_use = "futures do nothing unless polled"]
188pub struct RecvEtcMsg<'a> {
189 channel: &'a Channel,
190 buf: &'a mut MessageBufEtc,
191}
192
193impl<'a> Future for RecvEtcMsg<'a> {
194 type Output = Result<(), zx::Status>;
195
196 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
197 let this = &mut *self;
198 this.channel.recv_etc_from(cx, this.buf)
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::TestExecutor;
206
207 use futures::task::{ArcWake, waker};
208 use std::future::poll_fn;
209 use std::mem;
210 use std::pin::pin;
211 use std::sync::Arc;
212
213 #[test]
214 fn can_receive() {
215 let mut exec = TestExecutor::new();
216 let bytes = &[0, 1, 2, 3];
217
218 let (tx, rx) = zx::Channel::create();
219 let f_rx = Channel::from_channel(rx);
220
221 let mut receiver = pin!(async move {
222 let mut buffer = MessageBuf::new();
223 f_rx.recv_msg(&mut buffer).await.expect("failed to receive message");
224 assert_eq!(bytes, buffer.bytes());
225 });
226
227 assert!(exec.run_until_stalled(&mut receiver).is_pending());
228
229 let mut handles = Vec::new();
230 tx.write(bytes, &mut handles).expect("failed to write message");
231
232 assert!(exec.run_until_stalled(&mut receiver).is_ready());
233 }
234
235 #[test]
236 fn can_receive_etc() {
237 let mut exec = TestExecutor::new();
238 let bytes = &[0, 1, 2, 3];
239
240 let (tx, rx) = zx::Channel::create();
241 let f_rx = Channel::from_channel(rx);
242
243 let mut receiver = pin!(async move {
244 let mut buffer = MessageBufEtc::new();
245 f_rx.recv_etc_msg(&mut buffer).await.expect("failed to receive message");
246 assert_eq!(bytes, buffer.bytes());
247 });
248
249 assert!(exec.run_until_stalled(&mut receiver).is_pending());
250
251 let mut handles = Vec::new();
252 tx.write_etc(bytes, &mut handles).expect("failed to write message");
253
254 assert!(exec.run_until_stalled(&mut receiver).is_ready());
255 }
256
257 #[test]
258 fn key_reuse() {
259 let mut exec = TestExecutor::new();
260 let (tx0, rx0) = zx::Channel::create();
261 let (_tx1, rx1) = zx::Channel::create();
262 let f_rx0 = Channel::from_channel(rx0);
263 mem::drop(tx0);
264 mem::drop(f_rx0);
265 let f_rx1 = Channel::from_channel(rx1);
266 let mut receiver = pin!(async move {
268 let mut buffer = MessageBuf::new();
269 f_rx1.recv_msg(&mut buffer).await.expect("failed to receive message");
270 });
271
272 assert!(exec.run_until_stalled(&mut receiver).is_pending());
273 }
274
275 #[test]
276 fn key_reuse_etc() {
277 let mut exec = TestExecutor::new();
278 let (tx0, rx0) = zx::Channel::create();
279 let (_tx1, rx1) = zx::Channel::create();
280 let f_rx0 = Channel::from_channel(rx0);
281 mem::drop(tx0);
282 mem::drop(f_rx0);
283 let f_rx1 = Channel::from_channel(rx1);
284 let mut receiver = pin!(async move {
286 let mut buffer = MessageBufEtc::new();
287 f_rx1.recv_etc_msg(&mut buffer).await.expect("failed to receive message");
288 });
289
290 assert!(exec.run_until_stalled(&mut receiver).is_pending());
291 }
292
293 #[test]
294 fn test_always_polls_channel() {
295 let mut exec = TestExecutor::new();
296
297 let (rx, tx) = zx::Channel::create();
298 let rx_channel = Channel::from_channel(rx);
299
300 let mut fut = pin!(poll_fn(|cx| {
301 let mut bytes = Vec::with_capacity(64);
302 let mut handles = Vec::new();
303 rx_channel.read(cx, &mut bytes, &mut handles)
304 }));
305
306 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
307
308 tx.write(b"hello", &mut []).expect("write failed");
309
310 struct Waker;
311 impl ArcWake for Waker {
312 fn wake_by_ref(_arc_self: &Arc<Self>) {}
313 }
314
315 assert_eq!(
318 fut.poll(&mut Context::from_waker(&waker(Arc::new(Waker)))),
319 Poll::Ready(Ok(()))
320 );
321 }
322}