fuchsia_async/handle/zircon/
socket.rs
1use super::on_signals::OnSignalsRef;
6use super::rwhandle::{RWHandle, ReadableHandle, ReadableState, WritableHandle, WritableState};
7use futures::future::poll_fn;
8use futures::io::{self, AsyncRead, AsyncWrite};
9use futures::ready;
10use futures::stream::Stream;
11use futures::task::Context;
12use std::fmt;
13use std::pin::Pin;
14use std::task::Poll;
15use zx::{self as zx, AsHandleRef};
16
17pub struct Socket(RWHandle<zx::Socket>);
19
20impl AsRef<zx::Socket> for Socket {
21 fn as_ref(&self) -> &zx::Socket {
22 &self.0.get_ref()
23 }
24}
25
26impl AsHandleRef for Socket {
27 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
28 self.0.get_ref().as_handle_ref()
29 }
30}
31
32impl Socket {
33 pub fn from_socket(socket: zx::Socket) -> Self {
39 Socket(RWHandle::new(socket))
40 }
41
42 pub fn into_zx_socket(self) -> zx::Socket {
44 self.0.into_inner()
45 }
46
47 pub fn is_closed(&self) -> bool {
49 self.0.is_closed()
50 }
51
52 pub fn on_closed(&self) -> OnSignalsRef<'_> {
54 self.0.on_closed()
55 }
56
57 pub fn poll_read_ref(
61 &self,
62 cx: &mut Context<'_>,
63 buf: &mut [u8],
64 ) -> Poll<Result<usize, zx::Status>> {
65 ready!(self.poll_readable(cx))?;
66 loop {
67 let res = self.0.get_ref().read(buf);
68 match res {
69 Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
70 Err(zx::Status::PEER_CLOSED) => return Poll::Ready(Ok(0)),
71 _ => return Poll::Ready(res),
72 }
73 }
74 }
75
76 pub fn poll_write_ref(
80 &self,
81 cx: &mut Context<'_>,
82 buf: &[u8],
83 ) -> Poll<Result<usize, zx::Status>> {
84 ready!(self.poll_writable(cx))?;
85 loop {
86 let res = self.0.get_ref().write(buf);
87 match res {
88 Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
89 _ => return Poll::Ready(res),
90 }
91 }
92 }
93
94 pub fn poll_datagram(
98 &self,
99 cx: &mut Context<'_>,
100 out: &mut Vec<u8>,
101 ) -> Poll<Result<usize, zx::Status>> {
102 ready!(self.poll_readable(cx))?;
103 let avail = self.0.get_ref().outstanding_read_bytes()?;
104 let len = out.len();
105 out.resize(len + avail, 0);
106 let (_, mut tail) = out.split_at_mut(len);
107 loop {
108 match self.0.get_ref().read(&mut tail) {
109 Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
110 Err(e) => return Poll::Ready(Err(e)),
111 Ok(bytes) => {
112 return if bytes == avail {
113 Poll::Ready(Ok(bytes))
114 } else {
115 Poll::Ready(Err(zx::Status::BAD_STATE))
116 }
117 }
118 }
119 }
120 }
121
122 pub async fn read_datagram<'a>(&'a self, out: &'a mut Vec<u8>) -> Result<usize, zx::Status> {
125 poll_fn(move |cx| self.poll_datagram(cx, out)).await
126 }
127
128 pub fn as_datagram_stream<'a>(&'a self) -> DatagramStream<&'a Self> {
132 DatagramStream(self)
133 }
134
135 pub fn into_datagram_stream(self) -> DatagramStream<Self> {
137 DatagramStream(self)
138 }
139}
140
141impl ReadableHandle for Socket {
142 fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
143 self.0.poll_readable(cx)
144 }
145
146 fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
147 self.0.need_readable(cx)
148 }
149}
150
151impl WritableHandle for Socket {
152 fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
153 self.0.poll_writable(cx)
154 }
155
156 fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
157 self.0.need_writable(cx)
158 }
159}
160
161impl fmt::Debug for Socket {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 self.0.get_ref().fmt(f)
164 }
165}
166
167impl AsyncRead for Socket {
168 fn poll_read(
169 self: Pin<&mut Self>,
170 cx: &mut Context<'_>,
171 buf: &mut [u8],
172 ) -> Poll<io::Result<usize>> {
173 self.poll_read_ref(cx, buf).map_err(Into::into)
174 }
175}
176
177impl AsyncWrite for Socket {
178 fn poll_write(
179 self: Pin<&mut Self>,
180 cx: &mut Context<'_>,
181 buf: &[u8],
182 ) -> Poll<io::Result<usize>> {
183 self.poll_write_ref(cx, buf).map_err(Into::into)
184 }
185
186 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
187 Poll::Ready(Ok(()))
188 }
189
190 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
191 Poll::Ready(Ok(()))
192 }
193}
194
195impl<'a> AsyncRead for &'a Socket {
196 fn poll_read(
197 self: Pin<&mut Self>,
198 cx: &mut Context<'_>,
199 buf: &mut [u8],
200 ) -> Poll<io::Result<usize>> {
201 self.poll_read_ref(cx, buf).map_err(Into::into)
202 }
203}
204
205impl<'a> AsyncWrite for &'a Socket {
206 fn poll_write(
207 self: Pin<&mut Self>,
208 cx: &mut Context<'_>,
209 buf: &[u8],
210 ) -> Poll<io::Result<usize>> {
211 self.poll_write_ref(cx, buf).map_err(Into::into)
212 }
213
214 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
215 Poll::Ready(Ok(()))
216 }
217
218 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
219 Poll::Ready(Ok(()))
220 }
221}
222
223#[derive(Debug)]
225pub struct DatagramStream<S>(pub S);
226
227fn poll_datagram_as_stream(
228 socket: &Socket,
229 cx: &mut Context<'_>,
230) -> Poll<Option<Result<Vec<u8>, zx::Status>>> {
231 let mut res = Vec::<u8>::new();
232 Poll::Ready(match ready!(socket.poll_datagram(cx, &mut res)) {
233 Ok(_size) => Some(Ok(res)),
234 Err(zx::Status::PEER_CLOSED) => None,
235 Err(e) => Some(Err(e)),
236 })
237}
238
239impl Stream for DatagramStream<Socket> {
240 type Item = Result<Vec<u8>, zx::Status>;
241
242 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
243 poll_datagram_as_stream(&self.0, cx)
244 }
245}
246
247impl Stream for DatagramStream<&Socket> {
248 type Item = Result<Vec<u8>, zx::Status>;
249
250 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
251 poll_datagram_as_stream(self.0, cx)
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use crate::{MonotonicInstant, TestExecutor, TimeoutExt, Timer};
259
260 use futures::future::{self, join};
261 use futures::io::{AsyncReadExt as _, AsyncWriteExt as _};
262 use futures::stream::TryStreamExt;
263 use futures::task::noop_waker_ref;
264 use futures::FutureExt;
265 use std::pin::pin;
266
267 #[test]
268 fn can_read_write() {
269 let mut exec = TestExecutor::new();
270 let bytes = &[0, 1, 2, 3];
271
272 let (tx, rx) = zx::Socket::create_stream();
273 let (mut tx, mut rx) = (Socket::from_socket(tx), Socket::from_socket(rx));
274
275 let receive_future = async {
276 let mut buf = vec![];
277 rx.read_to_end(&mut buf).await.expect("reading socket");
278 assert_eq!(&*buf, bytes);
279 };
280
281 let receiver = receive_future
286 .on_timeout(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10)), || {
287 panic!("timeout")
288 });
289
290 let sender = async move {
292 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_millis(100))).await;
293 tx.write_all(bytes).await.expect("writing into socket");
294 drop(tx);
296 };
297
298 let done = join(receiver, sender);
299 exec.run_singlethreaded(done);
300 }
301
302 #[test]
303 fn can_read_datagram() {
304 let mut exec = TestExecutor::new();
305
306 let (one, two) = (&[0, 1], &[2, 3, 4, 5]);
307
308 let (tx, rx) = zx::Socket::create_datagram();
309 let rx = Socket::from_socket(rx);
310
311 let mut out = vec![50];
312
313 assert!(tx.write(one).is_ok());
314 assert!(tx.write(two).is_ok());
315
316 let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
317
318 assert!(size.is_ok());
319 assert_eq!(one.len(), size.unwrap());
320
321 assert_eq!([50, 0, 1], out.as_slice());
322
323 let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
324
325 assert!(size.is_ok());
326 assert_eq!(two.len(), size.unwrap());
327
328 assert_eq!([50, 0, 1, 2, 3, 4, 5], out.as_slice());
329 }
330
331 #[test]
332 fn stream_datagram() {
333 let mut exec = TestExecutor::new();
334
335 let (tx, rx) = zx::Socket::create_datagram();
336 let mut rx = Socket::from_socket(rx).into_datagram_stream();
337
338 let packets = 20;
339
340 for size in 1..packets + 1 {
341 let mut vec = Vec::<u8>::new();
342 vec.resize(size, size as u8);
343 assert!(tx.write(&vec).is_ok());
344 }
345
346 drop(tx);
348
349 let stream_read_fut = async move {
350 let mut count = 0;
351 while let Some(packet) = rx.try_next().await.expect("received error from stream") {
352 count = count + 1;
353 assert_eq!(packet.len(), count);
354 assert!(packet.iter().all(|&x| x == count as u8));
355 }
356 assert_eq!(packets, count);
357 };
358
359 exec.run_singlethreaded(stream_read_fut);
360 }
361
362 #[test]
363 fn peer_closed_signal_raised() {
364 let mut executor = TestExecutor::new();
365
366 let (s1, s2) = zx::Socket::create_stream();
367 let mut async_s2 = Socket::from_socket(s2);
368
369 let _ = executor.run_until_stalled(&mut pin!(async {
371 let mut buf = [0; 16];
372 let _ = async_s2.read(&mut buf).await;
373 }));
374
375 let on_closed_fut = async_s2.on_closed();
376
377 drop(s1);
378
379 let _ = executor.run_until_stalled(&mut future::pending::<()>());
381
382 let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
384
385 if let Poll::Ready(Ok(state)) = executor.run_until_stalled(&mut rx_fut) {
386 assert_eq!(state, ReadableState::MaybeReadableAndClosed);
387 } else {
388 panic!("Expected future to be ready and Ok");
389 }
390 assert!(async_s2.is_closed());
391 assert_eq!(on_closed_fut.now_or_never(), Some(Ok(zx::Signals::CHANNEL_PEER_CLOSED)));
392 }
393
394 #[test]
395 fn need_read_ensures_freshness() {
396 let mut executor = TestExecutor::new();
397
398 let (s1, s2) = zx::Socket::create_stream();
399 let async_s2 = Socket::from_socket(s2);
400
401 let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
404 assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
405
406 assert!(async_s2.need_readable(&mut Context::from_waker(noop_waker_ref())).is_pending());
409 let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
410 assert!(executor.run_until_stalled(&mut rx_fut).is_pending());
411
412 assert_eq!(s1.write(b"hello!").expect("failed to write 6 bytes"), 6);
413
414 assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
416 }
417
418 #[test]
419 fn need_write_ensures_freshness() {
420 let mut executor = TestExecutor::new();
421
422 let (s1, s2) = zx::Socket::create_stream();
423
424 let socket_info = s2.info().expect("failed to get socket info");
426 let bytes = vec![0u8; socket_info.tx_buf_max];
427 assert_eq!(socket_info.tx_buf_max, s2.write(&bytes).expect("failed to write to socket"));
428
429 let async_s2 = Socket::from_socket(s2);
430
431 let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
434 assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
435
436 assert!(async_s2.need_writable(&mut Context::from_waker(noop_waker_ref())).is_pending());
439 let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
440 assert!(executor.run_until_stalled(&mut tx_fut).is_pending());
441
442 let mut buffer = [0u8; 5];
443 assert_eq!(s1.read(&mut buffer).expect("failed to read 5 bytes"), 5);
444
445 assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
447 }
448}