fuchsia_async/handle/zircon/
channel.rs
1use std::fmt;
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{ready, Context, Poll};
9
10use zx::{self as zx, AsHandleRef, MessageBuf, MessageBufEtc};
11
12use crate::{OnSignalsRef, RWHandle, ReadableHandle as _};
13
14pub struct Channel(RWHandle<zx::Channel>);
16
17impl AsRef<zx::Channel> for Channel {
18 fn as_ref(&self) -> &zx::Channel {
19 self.0.get_ref()
20 }
21}
22
23impl AsHandleRef for Channel {
24 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
25 self.0.get_ref().as_handle_ref()
26 }
27}
28
29impl From<Channel> for zx::Channel {
30 fn from(channel: Channel) -> zx::Channel {
31 channel.0.into_inner()
32 }
33}
34
35impl Channel {
36 pub fn from_channel(channel: zx::Channel) -> Self {
42 Channel(RWHandle::new(channel))
43 }
44
45 pub fn into_zx_channel(self) -> zx::Channel {
47 self.0.into_inner()
48 }
49
50 pub fn is_closed(&self) -> bool {
52 self.0.is_closed()
53 }
54
55 pub fn on_closed(&self) -> OnSignalsRef<'_> {
57 self.0.on_closed()
58 }
59
60 pub fn read(
66 &self,
67 cx: &mut Context<'_>,
68 bytes: &mut Vec<u8>,
69 handles: &mut Vec<zx::Handle>,
70 ) -> Poll<Result<(), zx::Status>> {
71 loop {
72 let res = self.0.get_ref().read_split(bytes, handles);
73 if res == Err(zx::Status::SHOULD_WAIT) {
74 ready!(self.0.need_readable(cx)?);
75 } else {
76 return Poll::Ready(res);
77 }
78 }
79 }
80
81 pub fn read_etc(
87 &self,
88 cx: &mut Context<'_>,
89 bytes: &mut Vec<u8>,
90 handles: &mut Vec<zx::HandleInfo>,
91 ) -> Poll<Result<(), zx::Status>> {
92 loop {
93 let res = self.0.get_ref().read_etc_split(bytes, handles);
94 if res == Err(zx::Status::SHOULD_WAIT) {
95 ready!(self.0.need_readable(cx)?);
96 } else {
97 return Poll::Ready(res);
98 }
99 }
100 }
101
102 pub fn recv_from(
105 &self,
106 cx: &mut Context<'_>,
107 buf: &mut MessageBuf,
108 ) -> Poll<Result<(), zx::Status>> {
109 let (bytes, handles) = buf.split_mut();
110 self.read(cx, bytes, handles)
111 }
112
113 pub fn recv_etc_from(
116 &self,
117 cx: &mut Context<'_>,
118 buf: &mut MessageBufEtc,
119 ) -> Poll<Result<(), zx::Status>> {
120 let (bytes, handles) = buf.split_mut();
121 self.read_etc(cx, bytes, handles)
122 }
123
124 pub fn recv_msg<'a>(&'a self, buf: &'a mut MessageBuf) -> RecvMsg<'a> {
130 RecvMsg { channel: self, buf }
131 }
132
133 pub fn recv_etc_msg<'a>(&'a self, buf: &'a mut MessageBufEtc) -> RecvEtcMsg<'a> {
139 RecvEtcMsg { channel: self, buf }
140 }
141
142 pub fn write(&self, bytes: &[u8], handles: &mut [zx::Handle]) -> Result<(), zx::Status> {
144 self.0.get_ref().write(bytes, handles)
145 }
146
147 pub fn write_etc(
149 &self,
150 bytes: &[u8],
151 handles: &mut [zx::HandleDisposition<'_>],
152 ) -> Result<(), zx::Status> {
153 self.0.get_ref().write_etc(bytes, handles)
154 }
155}
156
157impl fmt::Debug for Channel {
158 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
159 self.0.get_ref().fmt(f)
160 }
161}
162
163#[must_use = "futures do nothing unless polled"]
167pub struct RecvMsg<'a> {
168 channel: &'a Channel,
169 buf: &'a mut MessageBuf,
170}
171
172impl<'a> Future for RecvMsg<'a> {
173 type Output = Result<(), zx::Status>;
174
175 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176 let this = &mut *self;
177 this.channel.recv_from(cx, this.buf)
178 }
179}
180#[must_use = "futures do nothing unless polled"]
184pub struct RecvEtcMsg<'a> {
185 channel: &'a Channel,
186 buf: &'a mut MessageBufEtc,
187}
188
189impl<'a> Future for RecvEtcMsg<'a> {
190 type Output = Result<(), zx::Status>;
191
192 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193 let this = &mut *self;
194 this.channel.recv_etc_from(cx, this.buf)
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use crate::TestExecutor;
202
203 use futures::task::{waker, ArcWake};
204 use std::future::poll_fn;
205 use std::mem;
206 use std::pin::pin;
207 use std::sync::Arc;
208
209 #[test]
210 fn can_receive() {
211 let mut exec = TestExecutor::new();
212 let bytes = &[0, 1, 2, 3];
213
214 let (tx, rx) = zx::Channel::create();
215 let f_rx = Channel::from_channel(rx);
216
217 let mut receiver = pin!(async move {
218 let mut buffer = MessageBuf::new();
219 f_rx.recv_msg(&mut buffer).await.expect("failed to receive message");
220 assert_eq!(bytes, buffer.bytes());
221 });
222
223 assert!(exec.run_until_stalled(&mut receiver).is_pending());
224
225 let mut handles = Vec::new();
226 tx.write(bytes, &mut handles).expect("failed to write message");
227
228 assert!(exec.run_until_stalled(&mut receiver).is_ready());
229 }
230
231 #[test]
232 fn can_receive_etc() {
233 let mut exec = TestExecutor::new();
234 let bytes = &[0, 1, 2, 3];
235
236 let (tx, rx) = zx::Channel::create();
237 let f_rx = Channel::from_channel(rx);
238
239 let mut receiver = pin!(async move {
240 let mut buffer = MessageBufEtc::new();
241 f_rx.recv_etc_msg(&mut buffer).await.expect("failed to receive message");
242 assert_eq!(bytes, buffer.bytes());
243 });
244
245 assert!(exec.run_until_stalled(&mut receiver).is_pending());
246
247 let mut handles = Vec::new();
248 tx.write_etc(bytes, &mut handles).expect("failed to write message");
249
250 assert!(exec.run_until_stalled(&mut receiver).is_ready());
251 }
252
253 #[test]
254 fn key_reuse() {
255 let mut exec = TestExecutor::new();
256 let (tx0, rx0) = zx::Channel::create();
257 let (_tx1, rx1) = zx::Channel::create();
258 let f_rx0 = Channel::from_channel(rx0);
259 mem::drop(tx0);
260 mem::drop(f_rx0);
261 let f_rx1 = Channel::from_channel(rx1);
262 let mut receiver = pin!(async move {
264 let mut buffer = MessageBuf::new();
265 f_rx1.recv_msg(&mut buffer).await.expect("failed to receive message");
266 });
267
268 assert!(exec.run_until_stalled(&mut receiver).is_pending());
269 }
270
271 #[test]
272 fn key_reuse_etc() {
273 let mut exec = TestExecutor::new();
274 let (tx0, rx0) = zx::Channel::create();
275 let (_tx1, rx1) = zx::Channel::create();
276 let f_rx0 = Channel::from_channel(rx0);
277 mem::drop(tx0);
278 mem::drop(f_rx0);
279 let f_rx1 = Channel::from_channel(rx1);
280 let mut receiver = pin!(async move {
282 let mut buffer = MessageBufEtc::new();
283 f_rx1.recv_etc_msg(&mut buffer).await.expect("failed to receive message");
284 });
285
286 assert!(exec.run_until_stalled(&mut receiver).is_pending());
287 }
288
289 #[test]
290 fn test_always_polls_channel() {
291 let mut exec = TestExecutor::new();
292
293 let (rx, tx) = zx::Channel::create();
294 let rx_channel = Channel::from_channel(rx);
295
296 let mut fut = pin!(poll_fn(|cx| {
297 let mut bytes = Vec::with_capacity(64);
298 let mut handles = Vec::new();
299 rx_channel.read(cx, &mut bytes, &mut handles)
300 }));
301
302 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
303
304 tx.write(b"hello", &mut []).expect("write failed");
305
306 struct Waker;
307 impl ArcWake for Waker {
308 fn wake_by_ref(_arc_self: &Arc<Self>) {}
309 }
310
311 assert_eq!(
314 fut.poll(&mut Context::from_waker(&waker(Arc::new(Waker)))),
315 Poll::Ready(Ok(()))
316 );
317 }
318}