fuchsia_async/handle/zircon/
channel.rs1use std::fmt;
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll, ready};
9
10use zx::{self as zx, AsHandleRef, MessageBuf, MessageBufEtc};
11
12use crate::{OnSignalsRef, RWHandle, ReadableHandle as _};
13
14pub struct Channel(RWHandle<zx::Channel>);
16
17impl AsRef<zx::Channel> for Channel {
18 fn as_ref(&self) -> &zx::Channel {
19 self.0.get_ref()
20 }
21}
22
23impl AsHandleRef for Channel {
24 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
25 self.0.get_ref().as_handle_ref()
26 }
27}
28
29impl From<Channel> for zx::Channel {
30 fn from(channel: Channel) -> zx::Channel {
31 channel.0.into_inner()
32 }
33}
34
35impl Channel {
36 pub fn from_channel(channel: zx::Channel) -> Self {
42 Channel(RWHandle::new(channel))
43 }
44
45 pub fn into_zx_channel(self) -> zx::Channel {
47 self.0.into_inner()
48 }
49
50 pub fn is_closed(&self) -> bool {
52 self.0.is_closed()
53 }
54
55 pub fn on_closed(&self) -> OnSignalsRef<'_> {
57 self.0.on_closed()
58 }
59
60 pub fn read(
66 &self,
67 cx: &mut Context<'_>,
68 bytes: &mut Vec<u8>,
69 handles: &mut Vec<zx::NullableHandle>,
70 ) -> Poll<Result<(), zx::Status>> {
71 loop {
72 let res = self.0.get_ref().read_split(bytes, handles);
73 if res == Err(zx::Status::SHOULD_WAIT) {
74 ready!(self.0.need_readable(cx)?);
75 } else {
76 return Poll::Ready(res);
77 }
78 }
79 }
80
81 pub fn read_etc(
87 &self,
88 cx: &mut Context<'_>,
89 bytes: &mut Vec<u8>,
90 handles: &mut Vec<zx::HandleInfo>,
91 ) -> Poll<Result<(), zx::Status>> {
92 loop {
93 let res = self.0.get_ref().read_etc_split(bytes, handles);
94 if res == Err(zx::Status::SHOULD_WAIT) {
95 ready!(self.0.need_readable(cx)?);
96 } else {
97 return Poll::Ready(res);
98 }
99 }
100 }
101
102 pub fn recv_from(
105 &self,
106 cx: &mut Context<'_>,
107 buf: &mut MessageBuf,
108 ) -> Poll<Result<(), zx::Status>> {
109 let (bytes, handles) = buf.split_mut();
110 self.read(cx, bytes, handles)
111 }
112
113 pub fn recv_etc_from(
116 &self,
117 cx: &mut Context<'_>,
118 buf: &mut MessageBufEtc,
119 ) -> Poll<Result<(), zx::Status>> {
120 let (bytes, handles) = buf.split_mut();
121 self.read_etc(cx, bytes, handles)
122 }
123
124 pub fn recv_msg<'a>(&'a self, buf: &'a mut MessageBuf) -> RecvMsg<'a> {
130 RecvMsg { channel: self, buf }
131 }
132
133 pub fn recv_etc_msg<'a>(&'a self, buf: &'a mut MessageBufEtc) -> RecvEtcMsg<'a> {
139 RecvEtcMsg { channel: self, buf }
140 }
141
142 pub fn write(
144 &self,
145 bytes: &[u8],
146 handles: &mut [zx::NullableHandle],
147 ) -> Result<(), zx::Status> {
148 self.0.get_ref().write(bytes, handles)
149 }
150
151 pub fn write_etc(
153 &self,
154 bytes: &[u8],
155 handles: &mut [zx::HandleDisposition<'_>],
156 ) -> Result<(), zx::Status> {
157 self.0.get_ref().write_etc(bytes, handles)
158 }
159}
160
161impl fmt::Debug for Channel {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 self.0.get_ref().fmt(f)
164 }
165}
166
167#[must_use = "futures do nothing unless polled"]
171pub struct RecvMsg<'a> {
172 channel: &'a Channel,
173 buf: &'a mut MessageBuf,
174}
175
176impl<'a> Future for RecvMsg<'a> {
177 type Output = Result<(), zx::Status>;
178
179 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
180 let this = &mut *self;
181 this.channel.recv_from(cx, this.buf)
182 }
183}
184#[must_use = "futures do nothing unless polled"]
188pub struct RecvEtcMsg<'a> {
189 channel: &'a Channel,
190 buf: &'a mut MessageBufEtc,
191}
192
193impl<'a> Future for RecvEtcMsg<'a> {
194 type Output = Result<(), zx::Status>;
195
196 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
197 let this = &mut *self;
198 this.channel.recv_etc_from(cx, this.buf)
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::TestExecutor;
206
207 use std::future::poll_fn;
208 use std::mem;
209 use std::pin::pin;
210 use std::task::Waker;
211
212 #[test]
213 fn can_receive() {
214 let mut exec = TestExecutor::new();
215 let bytes = &[0, 1, 2, 3];
216
217 let (tx, rx) = zx::Channel::create();
218 let f_rx = Channel::from_channel(rx);
219
220 let mut receiver = pin!(async move {
221 let mut buffer = MessageBuf::new();
222 f_rx.recv_msg(&mut buffer).await.expect("failed to receive message");
223 assert_eq!(bytes, buffer.bytes());
224 });
225
226 assert!(exec.run_until_stalled(&mut receiver).is_pending());
227
228 let mut handles = Vec::new();
229 tx.write(bytes, &mut handles).expect("failed to write message");
230
231 assert!(exec.run_until_stalled(&mut receiver).is_ready());
232 }
233
234 #[test]
235 fn can_receive_etc() {
236 let mut exec = TestExecutor::new();
237 let bytes = &[0, 1, 2, 3];
238
239 let (tx, rx) = zx::Channel::create();
240 let f_rx = Channel::from_channel(rx);
241
242 let mut receiver = pin!(async move {
243 let mut buffer = MessageBufEtc::new();
244 f_rx.recv_etc_msg(&mut buffer).await.expect("failed to receive message");
245 assert_eq!(bytes, buffer.bytes());
246 });
247
248 assert!(exec.run_until_stalled(&mut receiver).is_pending());
249
250 let mut handles = Vec::new();
251 tx.write_etc(bytes, &mut handles).expect("failed to write message");
252
253 assert!(exec.run_until_stalled(&mut receiver).is_ready());
254 }
255
256 #[test]
257 fn key_reuse() {
258 let mut exec = TestExecutor::new();
259 let (tx0, rx0) = zx::Channel::create();
260 let (_tx1, rx1) = zx::Channel::create();
261 let f_rx0 = Channel::from_channel(rx0);
262 mem::drop(tx0);
263 mem::drop(f_rx0);
264 let f_rx1 = Channel::from_channel(rx1);
265 let mut receiver = pin!(async move {
267 let mut buffer = MessageBuf::new();
268 f_rx1.recv_msg(&mut buffer).await.expect("failed to receive message");
269 });
270
271 assert!(exec.run_until_stalled(&mut receiver).is_pending());
272 }
273
274 #[test]
275 fn key_reuse_etc() {
276 let mut exec = TestExecutor::new();
277 let (tx0, rx0) = zx::Channel::create();
278 let (_tx1, rx1) = zx::Channel::create();
279 let f_rx0 = Channel::from_channel(rx0);
280 mem::drop(tx0);
281 mem::drop(f_rx0);
282 let f_rx1 = Channel::from_channel(rx1);
283 let mut receiver = pin!(async move {
285 let mut buffer = MessageBufEtc::new();
286 f_rx1.recv_etc_msg(&mut buffer).await.expect("failed to receive message");
287 });
288
289 assert!(exec.run_until_stalled(&mut receiver).is_pending());
290 }
291
292 #[test]
293 fn test_always_polls_channel() {
294 let mut exec = TestExecutor::new();
295
296 let (rx, tx) = zx::Channel::create();
297 let rx_channel = Channel::from_channel(rx);
298
299 let mut fut = pin!(poll_fn(|cx| {
300 let mut bytes = Vec::with_capacity(64);
301 let mut handles = Vec::new();
302 rx_channel.read(cx, &mut bytes, &mut handles)
303 }));
304
305 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
306
307 tx.write(b"hello", &mut []).expect("write failed");
308
309 assert_eq!(fut.poll(&mut Context::from_waker(Waker::noop())), Poll::Ready(Ok(())));
312 }
313}