fuchsia_async/handle/zircon/
socket.rs1use super::on_signals::OnSignalsRef;
6use super::rwhandle::{
7 RWHandle, RWHandleSpec, ReadableHandle, ReadableState, WritableHandle, WritableState,
8};
9use futures::future::poll_fn;
10use futures::io::{self, AsyncRead, AsyncWrite};
11use futures::ready;
12use futures::stream::Stream;
13use futures::task::Context;
14use std::fmt;
15use std::pin::Pin;
16use std::task::Poll;
17use zx::{self as zx, AsHandleRef};
18use zx_status_ext::StatusExt;
19
20pub struct Socket(RWHandle<zx::Socket, SocketRWHandleSpec>);
22
23impl AsRef<zx::Socket> for Socket {
24 fn as_ref(&self) -> &zx::Socket {
25 self.0.get_ref()
26 }
27}
28
29impl AsHandleRef for Socket {
30 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
31 self.0.get_ref().as_handle_ref()
32 }
33}
34
35impl Socket {
36 pub fn from_socket(socket: zx::Socket) -> Self {
42 Socket(RWHandle::new_with_spec(socket))
43 }
44
45 pub fn into_zx_socket(self) -> zx::Socket {
47 self.0.into_inner()
48 }
49
50 pub fn is_closed(&self) -> bool {
52 self.0.is_closed()
53 }
54
55 pub fn on_closed(&self) -> OnSignalsRef<'_> {
57 self.0.on_closed()
58 }
59
60 pub fn poll_read_ref(
67 &self,
68 cx: &mut Context<'_>,
69 buf: &mut [u8],
70 ) -> Poll<Result<usize, zx::Status>> {
71 ready!(self.poll_readable(cx))?;
72 loop {
73 let res = self.0.get_ref().read(buf);
74 match res {
75 Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
76 Err(zx::Status::BAD_STATE) => {
77 return Poll::Ready(Ok(0));
79 }
80 Err(zx::Status::PEER_CLOSED) => return Poll::Ready(Ok(0)),
81 _ => return Poll::Ready(res),
82 }
83 }
84 }
85
86 pub fn poll_write_ref(
90 &self,
91 cx: &mut Context<'_>,
92 buf: &[u8],
93 ) -> Poll<Result<usize, zx::Status>> {
94 ready!(self.poll_writable(cx))?;
95 loop {
96 let res = self.0.get_ref().write(buf);
97 match res {
98 Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
99 Err(zx::Status::BAD_STATE) => {
100 return Poll::Ready(Err(zx::Status::BAD_STATE));
102 }
103 _ => return Poll::Ready(res),
104 }
105 }
106 }
107
108 pub fn poll_datagram(
112 &self,
113 cx: &mut Context<'_>,
114 out: &mut Vec<u8>,
115 ) -> Poll<Result<usize, zx::Status>> {
116 ready!(self.poll_readable(cx))?;
117 let avail = self.0.get_ref().outstanding_read_bytes()?;
118 let len = out.len();
119 out.resize(len + avail, 0);
120 let (_, tail) = out.split_at_mut(len);
121 loop {
122 match self.0.get_ref().read(tail) {
123 Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
124 Err(e) => return Poll::Ready(Err(e)),
125 Ok(bytes) => {
126 return if bytes == avail {
127 Poll::Ready(Ok(bytes))
128 } else {
129 Poll::Ready(Err(zx::Status::IO_DATA_LOSS))
130 };
131 }
132 }
133 }
134 }
135
136 pub async fn read_datagram<'a>(&'a self, out: &'a mut Vec<u8>) -> Result<usize, zx::Status> {
139 poll_fn(move |cx| self.poll_datagram(cx, out)).await
140 }
141
142 pub fn as_datagram_stream(&self) -> DatagramStream<&Self> {
146 DatagramStream(self)
147 }
148
149 pub fn into_datagram_stream(self) -> DatagramStream<Self> {
151 DatagramStream(self)
152 }
153}
154
155impl ReadableHandle for Socket {
156 fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
157 self.0.poll_readable(cx)
158 }
159
160 fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
161 self.0.need_readable(cx)
162 }
163}
164
165impl WritableHandle for Socket {
166 fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
167 self.0.poll_writable(cx)
168 }
169
170 fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
171 self.0.need_writable(cx)
172 }
173}
174
175impl fmt::Debug for Socket {
176 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177 self.0.get_ref().fmt(f)
178 }
179}
180
181impl AsyncRead for Socket {
182 fn poll_read(
185 self: Pin<&mut Self>,
186 cx: &mut Context<'_>,
187 buf: &mut [u8],
188 ) -> Poll<io::Result<usize>> {
189 self.poll_read_ref(cx, buf).map_err(|s| s.into_io_error())
190 }
191}
192
193impl AsyncWrite for Socket {
194 fn poll_write(
195 self: Pin<&mut Self>,
196 cx: &mut Context<'_>,
197 buf: &[u8],
198 ) -> Poll<io::Result<usize>> {
199 self.poll_write_ref(cx, buf).map_err(|s| s.into_io_error())
200 }
201
202 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
203 Poll::Ready(Ok(()))
204 }
205
206 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
207 Poll::Ready(Ok(()))
208 }
209}
210
211impl AsyncRead for &Socket {
212 fn poll_read(
215 self: Pin<&mut Self>,
216 cx: &mut Context<'_>,
217 buf: &mut [u8],
218 ) -> Poll<io::Result<usize>> {
219 self.poll_read_ref(cx, buf).map_err(|s| s.into_io_error())
220 }
221}
222
223impl AsyncWrite for &Socket {
224 fn poll_write(
225 self: Pin<&mut Self>,
226 cx: &mut Context<'_>,
227 buf: &[u8],
228 ) -> Poll<io::Result<usize>> {
229 self.poll_write_ref(cx, buf).map_err(|s| s.into_io_error())
230 }
231
232 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
233 Poll::Ready(Ok(()))
234 }
235
236 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
237 Poll::Ready(Ok(()))
238 }
239}
240
241#[derive(Debug)]
243pub struct DatagramStream<S>(pub S);
244
245fn poll_datagram_as_stream(
246 socket: &Socket,
247 cx: &mut Context<'_>,
248) -> Poll<Option<Result<Vec<u8>, zx::Status>>> {
249 let mut res = Vec::<u8>::new();
250 Poll::Ready(match ready!(socket.poll_datagram(cx, &mut res)) {
251 Ok(_size) => Some(Ok(res)),
252 Err(zx::Status::PEER_CLOSED) => None,
253 Err(e) => Some(Err(e)),
254 })
255}
256
257impl Stream for DatagramStream<Socket> {
258 type Item = Result<Vec<u8>, zx::Status>;
259
260 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
261 poll_datagram_as_stream(&self.0, cx)
262 }
263}
264
265impl Stream for DatagramStream<&Socket> {
266 type Item = Result<Vec<u8>, zx::Status>;
267
268 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
269 poll_datagram_as_stream(self.0, cx)
270 }
271}
272
273struct SocketRWHandleSpec;
274impl RWHandleSpec for SocketRWHandleSpec {
275 const READABLE_SIGNALS: zx::Signals =
276 zx::Signals::SOCKET_READABLE.union(zx::Signals::SOCKET_PEER_WRITE_DISABLED);
277 const WRITABLE_SIGNALS: zx::Signals =
278 zx::Signals::SOCKET_WRITABLE.union(zx::Signals::SOCKET_WRITE_DISABLED);
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284 use crate::{MonotonicInstant, TestExecutor, TimeoutExt, Timer};
285
286 use futures::FutureExt;
287 use futures::future::{self, join};
288 use futures::io::{AsyncReadExt as _, AsyncWriteExt as _};
289 use futures::stream::TryStreamExt;
290 use std::pin::pin;
291 use std::task::Waker;
292 use zx::SocketWriteDisposition;
293
294 #[test]
295 fn can_read_write() {
296 let mut exec = TestExecutor::new();
297 let bytes = &[0, 1, 2, 3];
298
299 let (tx, rx) = zx::Socket::create_stream();
300 let (mut tx, mut rx) = (Socket::from_socket(tx), Socket::from_socket(rx));
301
302 let receive_future = async {
303 let mut buf = vec![];
304 rx.read_to_end(&mut buf).await.expect("reading socket");
305 assert_eq!(&*buf, bytes);
306 };
307
308 let receiver = receive_future
313 .on_timeout(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10)), || {
314 panic!("timeout")
315 });
316
317 let sender = async move {
319 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_millis(100))).await;
320 tx.write_all(bytes).await.expect("writing into socket");
321 drop(tx);
323 };
324
325 let done = join(receiver, sender);
326 exec.run_singlethreaded(done);
327 }
328
329 #[test]
330 fn can_read_datagram() {
331 let mut exec = TestExecutor::new();
332
333 let (one, two) = (&[0, 1], &[2, 3, 4, 5]);
334
335 let (tx, rx) = zx::Socket::create_datagram();
336 let rx = Socket::from_socket(rx);
337
338 let mut out = vec![50];
339
340 assert!(tx.write(one).is_ok());
341 assert!(tx.write(two).is_ok());
342
343 let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
344
345 assert!(size.is_ok());
346 assert_eq!(one.len(), size.unwrap());
347
348 assert_eq!([50, 0, 1], out.as_slice());
349
350 let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
351
352 assert!(size.is_ok());
353 assert_eq!(two.len(), size.unwrap());
354
355 assert_eq!([50, 0, 1, 2, 3, 4, 5], out.as_slice());
356 }
357
358 #[test]
359 fn stream_datagram() {
360 let mut exec = TestExecutor::new();
361
362 let (tx, rx) = zx::Socket::create_datagram();
363 let mut rx = Socket::from_socket(rx).into_datagram_stream();
364
365 let packets = 20;
366
367 for size in 1..packets + 1 {
368 let mut vec = Vec::<u8>::new();
369 vec.resize(size, size as u8);
370 assert!(tx.write(&vec).is_ok());
371 }
372
373 drop(tx);
375
376 let stream_read_fut = async move {
377 let mut count = 0;
378 while let Some(packet) = rx.try_next().await.expect("received error from stream") {
379 count += 1;
380 assert_eq!(packet.len(), count);
381 assert!(packet.iter().all(|&x| x == count as u8));
382 }
383 assert_eq!(packets, count);
384 };
385
386 exec.run_singlethreaded(stream_read_fut);
387 }
388
389 #[test]
390 fn peer_closed_signal_raised() {
391 let mut executor = TestExecutor::new();
392
393 let (s1, s2) = zx::Socket::create_stream();
394 let mut async_s2 = Socket::from_socket(s2);
395
396 let _ = executor.run_until_stalled(&mut pin!(async {
398 let mut buf = [0; 16];
399 let _ = async_s2.read(&mut buf).await;
400 }));
401
402 let on_closed_fut = async_s2.on_closed();
403
404 drop(s1);
405
406 let _ = executor.run_until_stalled(&mut future::pending::<()>());
408
409 let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
411
412 if let Poll::Ready(Ok(state)) = executor.run_until_stalled(&mut rx_fut) {
413 assert_eq!(state, ReadableState::MaybeReadableAndClosed);
414 } else {
415 panic!("Expected future to be ready and Ok");
416 }
417 assert!(async_s2.is_closed());
418 assert_eq!(on_closed_fut.now_or_never(), Some(Ok(zx::Signals::CHANNEL_PEER_CLOSED)));
419 }
420
421 #[test]
422 fn need_read_ensures_freshness() {
423 let mut executor = TestExecutor::new();
424
425 let (s1, s2) = zx::Socket::create_stream();
426 let async_s2 = Socket::from_socket(s2);
427
428 let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
431 assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
432
433 assert!(async_s2.need_readable(&mut Context::from_waker(Waker::noop())).is_pending());
436 let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
437 assert!(executor.run_until_stalled(&mut rx_fut).is_pending());
438
439 assert_eq!(s1.write(b"hello!").expect("failed to write 6 bytes"), 6);
440
441 assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
443 }
444
445 #[test]
446 fn need_write_ensures_freshness() {
447 let mut executor = TestExecutor::new();
448
449 let (s1, s2) = zx::Socket::create_stream();
450
451 let socket_info = s2.info().expect("failed to get socket info");
453 let bytes = vec![0u8; socket_info.tx_buf_max];
454 assert_eq!(socket_info.tx_buf_max, s2.write(&bytes).expect("failed to write to socket"));
455
456 let async_s2 = Socket::from_socket(s2);
457
458 let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
461 assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
462
463 assert!(async_s2.need_writable(&mut Context::from_waker(Waker::noop())).is_pending());
466 let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
467 assert!(executor.run_until_stalled(&mut tx_fut).is_pending());
468
469 let mut buffer = [0u8; 5];
470 assert_eq!(s1.read(&mut buffer).expect("failed to read 5 bytes"), 5);
471
472 assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
474 }
475
476 #[test]
477 fn half_closed_for_writes() {
478 let mut executor = TestExecutor::new();
479
480 let (s1, s2) = zx::Socket::create_stream();
481
482 let socket_info = s2.info().expect("failed to get socket info");
484 let bytes = vec![0u8; socket_info.tx_buf_max];
485 assert_eq!(socket_info.tx_buf_max, s2.write(&bytes).expect("failed to write to socket"));
486
487 let async_s2 = Socket::from_socket(s2);
488 let mut tx_fut = poll_fn(|cx| async_s2.poll_write_ref(cx, &bytes[..]));
489 assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Pending);
490
491 s1.set_disposition(None, Some(SocketWriteDisposition::Disabled)).expect("set disposition");
492 assert_eq!(
493 executor.run_until_stalled(&mut tx_fut),
494 Poll::Ready(Err::<usize, _>(zx::Status::BAD_STATE))
495 );
496
497 let mut readbuf = vec![0u8; bytes.len()];
499 assert_eq!(s1.read(&mut readbuf[..]), Ok(readbuf.len()));
500 s1.set_disposition(None, Some(SocketWriteDisposition::Enabled)).expect("set disposition");
501
502 assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(bytes.len())));
503 }
504
505 #[test]
506 fn half_closed_for_reads() {
507 let mut executor = TestExecutor::new();
508
509 let (s1, s2) = zx::Socket::create_stream();
510 let async_s2 = Socket::from_socket(s2);
511 let mut bytes = [0u8; 10];
512 let mut tx_fut = poll_fn(|cx| async_s2.poll_read_ref(cx, &mut bytes[..]));
513 assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Pending);
514
515 let msg = b"hello";
517 assert_eq!(s1.write(msg), Ok(msg.len()));
518 s1.set_disposition(Some(SocketWriteDisposition::Disabled), None).expect("set disposition");
519 assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(msg.len())));
520 assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(0)));
521
522 s1.set_disposition(Some(SocketWriteDisposition::Enabled), None).expect("set disposition");
524 assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Pending);
525
526 s1.set_disposition(Some(SocketWriteDisposition::Disabled), None).expect("set disposition");
528 assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(0)));
529 }
530}