Skip to main content

fuchsia_async/handle/zircon/
socket.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::on_signals::OnSignalsRef;
6use super::rwhandle::{
7    RWHandle, RWHandleSpec, ReadableHandle, ReadableState, WritableHandle, WritableState,
8};
9use futures::future::poll_fn;
10use futures::io::{self, AsyncRead, AsyncWrite};
11use futures::ready;
12use futures::stream::Stream;
13use futures::task::Context;
14use std::fmt;
15use std::pin::Pin;
16use std::task::Poll;
17use zx::{self as zx, AsHandleRef};
18use zx_status_ext::StatusExt;
19
20/// An I/O object representing a `Socket`.
21pub struct Socket(RWHandle<zx::Socket, SocketRWHandleSpec>);
22
23impl AsRef<zx::Socket> for Socket {
24    fn as_ref(&self) -> &zx::Socket {
25        self.0.get_ref()
26    }
27}
28
29impl AsHandleRef for Socket {
30    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
31        self.0.get_ref().as_handle_ref()
32    }
33}
34
35impl Socket {
36    /// Create a new `Socket` from a previously-created `zx::Socket`.
37    ///
38    /// # Panics
39    ///
40    /// If called outside the context of an active async executor.
41    pub fn from_socket(socket: zx::Socket) -> Self {
42        Socket(RWHandle::new_with_spec(socket))
43    }
44
45    /// Consumes `self` and returns the underlying `zx::Socket`.
46    pub fn into_zx_socket(self) -> zx::Socket {
47        self.0.into_inner()
48    }
49
50    /// Returns true if the socket received the `OBJECT_PEER_CLOSED` signal.
51    pub fn is_closed(&self) -> bool {
52        self.0.is_closed()
53    }
54
55    /// Returns a future that completes when the socket received the `OBJECT_PEER_CLOSED` signal.
56    pub fn on_closed(&self) -> OnSignalsRef<'_> {
57        self.0.on_closed()
58    }
59
60    /// Attempt to read from the socket, registering for wakeup if the socket doesn't have any
61    /// contents available. Used internally in the `AsyncRead` implementation, exposed for users
62    /// who know the concrete type they're using and don't want to pin the socket.
63    ///
64    /// Note: this function will never return `PEER_CLOSED` as an error. Instead, it will return
65    /// `Ok(0)` when the peer closes, to match the contract of `std::io::Read`.
66    pub fn poll_read_ref(
67        &self,
68        cx: &mut Context<'_>,
69        buf: &mut [u8],
70    ) -> Poll<Result<usize, zx::Status>> {
71        ready!(self.poll_readable(cx))?;
72        loop {
73            let res = self.0.get_ref().read(buf);
74            match res {
75                Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
76                Err(zx::Status::BAD_STATE) => {
77                    // BAD_STATE indicates our peer is closed for writes.
78                    return Poll::Ready(Ok(0));
79                }
80                Err(zx::Status::PEER_CLOSED) => return Poll::Ready(Ok(0)),
81                _ => return Poll::Ready(res),
82            }
83        }
84    }
85
86    /// Attempt to write into the socket, registering for wakeup if the socket is not ready. Used
87    /// internally in the `AsyncWrite` implementation, exposed for users who know the concrete type
88    /// they're using and don't want to pin the socket.
89    pub fn poll_write_ref(
90        &self,
91        cx: &mut Context<'_>,
92        buf: &[u8],
93    ) -> Poll<Result<usize, zx::Status>> {
94        ready!(self.poll_writable(cx))?;
95        loop {
96            let res = self.0.get_ref().write(buf);
97            match res {
98                Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
99                Err(zx::Status::BAD_STATE) => {
100                    // BAD_STATE indicates we're closed for writes.
101                    return Poll::Ready(Err(zx::Status::BAD_STATE));
102                }
103                _ => return Poll::Ready(res),
104            }
105        }
106    }
107
108    /// Polls for the next data on the socket, appending it to the end of |out| if it has arrived.
109    /// Not very useful for a non-datagram socket as it will return all available data
110    /// on the socket.
111    pub fn poll_datagram(
112        &self,
113        cx: &mut Context<'_>,
114        out: &mut Vec<u8>,
115    ) -> Poll<Result<usize, zx::Status>> {
116        ready!(self.poll_readable(cx))?;
117        let avail = self.0.get_ref().outstanding_read_bytes()?;
118        let len = out.len();
119        out.resize(len + avail, 0);
120        let (_, tail) = out.split_at_mut(len);
121        loop {
122            match self.0.get_ref().read(tail) {
123                Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
124                Err(e) => return Poll::Ready(Err(e)),
125                Ok(bytes) => {
126                    return if bytes == avail {
127                        Poll::Ready(Ok(bytes))
128                    } else {
129                        Poll::Ready(Err(zx::Status::IO_DATA_LOSS))
130                    };
131                }
132            }
133        }
134    }
135
136    /// Reads the next datagram that becomes available onto the end of |out|.  Note: Using this
137    /// multiple times concurrently is an error and the first one will never complete.
138    pub async fn read_datagram<'a>(&'a self, out: &'a mut Vec<u8>) -> Result<usize, zx::Status> {
139        poll_fn(move |cx| self.poll_datagram(cx, out)).await
140    }
141
142    /// Use this socket as a stream of `Result<Vec<u8>, zx::Status>` datagrams.
143    ///
144    /// Note: multiple concurrent streams from the same socket are not supported.
145    pub fn as_datagram_stream(&self) -> DatagramStream<&Self> {
146        DatagramStream(self)
147    }
148
149    /// Convert this socket into a stream of `Result<Vec<u8>, zx::Status>` datagrams.
150    pub fn into_datagram_stream(self) -> DatagramStream<Self> {
151        DatagramStream(self)
152    }
153}
154
155impl ReadableHandle for Socket {
156    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
157        self.0.poll_readable(cx)
158    }
159
160    fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
161        self.0.need_readable(cx)
162    }
163}
164
165impl WritableHandle for Socket {
166    fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
167        self.0.poll_writable(cx)
168    }
169
170    fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
171        self.0.need_writable(cx)
172    }
173}
174
175impl fmt::Debug for Socket {
176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177        self.0.get_ref().fmt(f)
178    }
179}
180
181impl AsyncRead for Socket {
182    /// Note: this function will never return `PEER_CLOSED` as an error. Instead, it will return
183    /// `Ok(0)` when the peer closes, to match the contract of `std::io::Read`.
184    fn poll_read(
185        self: Pin<&mut Self>,
186        cx: &mut Context<'_>,
187        buf: &mut [u8],
188    ) -> Poll<io::Result<usize>> {
189        self.poll_read_ref(cx, buf).map_err(|s| s.into_io_error())
190    }
191}
192
193impl AsyncWrite for Socket {
194    fn poll_write(
195        self: Pin<&mut Self>,
196        cx: &mut Context<'_>,
197        buf: &[u8],
198    ) -> Poll<io::Result<usize>> {
199        self.poll_write_ref(cx, buf).map_err(|s| s.into_io_error())
200    }
201
202    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
203        Poll::Ready(Ok(()))
204    }
205
206    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
207        Poll::Ready(Ok(()))
208    }
209}
210
211impl AsyncRead for &Socket {
212    /// Note: this function will never return `PEER_CLOSED` as an error. Instead, it will return
213    /// `Ok(0)` when the peer closes, to match the contract of `std::io::Read`.
214    fn poll_read(
215        self: Pin<&mut Self>,
216        cx: &mut Context<'_>,
217        buf: &mut [u8],
218    ) -> Poll<io::Result<usize>> {
219        self.poll_read_ref(cx, buf).map_err(|s| s.into_io_error())
220    }
221}
222
223impl AsyncWrite for &Socket {
224    fn poll_write(
225        self: Pin<&mut Self>,
226        cx: &mut Context<'_>,
227        buf: &[u8],
228    ) -> Poll<io::Result<usize>> {
229        self.poll_write_ref(cx, buf).map_err(|s| s.into_io_error())
230    }
231
232    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
233        Poll::Ready(Ok(()))
234    }
235
236    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
237        Poll::Ready(Ok(()))
238    }
239}
240
241/// A datagram stream from a `Socket`.
242#[derive(Debug)]
243pub struct DatagramStream<S>(pub S);
244
245fn poll_datagram_as_stream(
246    socket: &Socket,
247    cx: &mut Context<'_>,
248) -> Poll<Option<Result<Vec<u8>, zx::Status>>> {
249    let mut res = Vec::<u8>::new();
250    Poll::Ready(match ready!(socket.poll_datagram(cx, &mut res)) {
251        Ok(_size) => Some(Ok(res)),
252        Err(zx::Status::PEER_CLOSED) => None,
253        Err(e) => Some(Err(e)),
254    })
255}
256
257impl Stream for DatagramStream<Socket> {
258    type Item = Result<Vec<u8>, zx::Status>;
259
260    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
261        poll_datagram_as_stream(&self.0, cx)
262    }
263}
264
265impl Stream for DatagramStream<&Socket> {
266    type Item = Result<Vec<u8>, zx::Status>;
267
268    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
269        poll_datagram_as_stream(self.0, cx)
270    }
271}
272
273struct SocketRWHandleSpec;
274impl RWHandleSpec for SocketRWHandleSpec {
275    const READABLE_SIGNALS: zx::Signals =
276        zx::Signals::SOCKET_READABLE.union(zx::Signals::SOCKET_PEER_WRITE_DISABLED);
277    const WRITABLE_SIGNALS: zx::Signals =
278        zx::Signals::SOCKET_WRITABLE.union(zx::Signals::SOCKET_WRITE_DISABLED);
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284    use crate::{MonotonicInstant, TestExecutor, TimeoutExt, Timer};
285
286    use futures::FutureExt;
287    use futures::future::{self, join};
288    use futures::io::{AsyncReadExt as _, AsyncWriteExt as _};
289    use futures::stream::TryStreamExt;
290    use std::pin::pin;
291    use std::task::Waker;
292    use zx::SocketWriteDisposition;
293
294    #[test]
295    fn can_read_write() {
296        let mut exec = TestExecutor::new();
297        let bytes = &[0, 1, 2, 3];
298
299        let (tx, rx) = zx::Socket::create_stream();
300        let (mut tx, mut rx) = (Socket::from_socket(tx), Socket::from_socket(rx));
301
302        let receive_future = async {
303            let mut buf = vec![];
304            rx.read_to_end(&mut buf).await.expect("reading socket");
305            assert_eq!(&*buf, bytes);
306        };
307
308        // add a timeout to receiver so if test is broken it doesn't take forever
309        // Note: if debugging a hang, you may want to lower the timeout to `300.millis()` to get
310        // faster feedback. This is set to 10s rather than something shorter to avoid triggering
311        // flakes if things happen to be slow.
312        let receiver = receive_future
313            .on_timeout(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10)), || {
314                panic!("timeout")
315            });
316
317        // Sends a message after the timeout has passed
318        let sender = async move {
319            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_millis(100))).await;
320            tx.write_all(bytes).await.expect("writing into socket");
321            // close socket to signal no more bytes will be written
322            drop(tx);
323        };
324
325        let done = join(receiver, sender);
326        exec.run_singlethreaded(done);
327    }
328
329    #[test]
330    fn can_read_datagram() {
331        let mut exec = TestExecutor::new();
332
333        let (one, two) = (&[0, 1], &[2, 3, 4, 5]);
334
335        let (tx, rx) = zx::Socket::create_datagram();
336        let rx = Socket::from_socket(rx);
337
338        let mut out = vec![50];
339
340        assert!(tx.write(one).is_ok());
341        assert!(tx.write(two).is_ok());
342
343        let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
344
345        assert!(size.is_ok());
346        assert_eq!(one.len(), size.unwrap());
347
348        assert_eq!([50, 0, 1], out.as_slice());
349
350        let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
351
352        assert!(size.is_ok());
353        assert_eq!(two.len(), size.unwrap());
354
355        assert_eq!([50, 0, 1, 2, 3, 4, 5], out.as_slice());
356    }
357
358    #[test]
359    fn stream_datagram() {
360        let mut exec = TestExecutor::new();
361
362        let (tx, rx) = zx::Socket::create_datagram();
363        let mut rx = Socket::from_socket(rx).into_datagram_stream();
364
365        let packets = 20;
366
367        for size in 1..packets + 1 {
368            let mut vec = Vec::<u8>::new();
369            vec.resize(size, size as u8);
370            assert!(tx.write(&vec).is_ok());
371        }
372
373        // Close the socket.
374        drop(tx);
375
376        let stream_read_fut = async move {
377            let mut count = 0;
378            while let Some(packet) = rx.try_next().await.expect("received error from stream") {
379                count += 1;
380                assert_eq!(packet.len(), count);
381                assert!(packet.iter().all(|&x| x == count as u8));
382            }
383            assert_eq!(packets, count);
384        };
385
386        exec.run_singlethreaded(stream_read_fut);
387    }
388
389    #[test]
390    fn peer_closed_signal_raised() {
391        let mut executor = TestExecutor::new();
392
393        let (s1, s2) = zx::Socket::create_stream();
394        let mut async_s2 = Socket::from_socket(s2);
395
396        // The socket won't start watching for peer-closed until we actually try reading from it.
397        let _ = executor.run_until_stalled(&mut pin!(async {
398            let mut buf = [0; 16];
399            let _ = async_s2.read(&mut buf).await;
400        }));
401
402        let on_closed_fut = async_s2.on_closed();
403
404        drop(s1);
405
406        // Now make sure all packets get processed before we poll the socket.
407        let _ = executor.run_until_stalled(&mut future::pending::<()>());
408
409        // Dropping s1 raises a closed signal on s2 when the executor next polls the signal port.
410        let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
411
412        if let Poll::Ready(Ok(state)) = executor.run_until_stalled(&mut rx_fut) {
413            assert_eq!(state, ReadableState::MaybeReadableAndClosed);
414        } else {
415            panic!("Expected future to be ready and Ok");
416        }
417        assert!(async_s2.is_closed());
418        assert_eq!(on_closed_fut.now_or_never(), Some(Ok(zx::Signals::CHANNEL_PEER_CLOSED)));
419    }
420
421    #[test]
422    fn need_read_ensures_freshness() {
423        let mut executor = TestExecutor::new();
424
425        let (s1, s2) = zx::Socket::create_stream();
426        let async_s2 = Socket::from_socket(s2);
427
428        // The read signal is optimistically set on socket creation, so even though there is
429        // nothing to read, poll_readable returns Ready.
430        let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
431        assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
432
433        // Call need_readable to reacquire the read signal. The socket now knows
434        // that the signal is not actually set, so returns Pending.
435        assert!(async_s2.need_readable(&mut Context::from_waker(Waker::noop())).is_pending());
436        let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
437        assert!(executor.run_until_stalled(&mut rx_fut).is_pending());
438
439        assert_eq!(s1.write(b"hello!").expect("failed to write 6 bytes"), 6);
440
441        // After writing to s1, its peer now has an actual read signal and is Ready.
442        assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
443    }
444
445    #[test]
446    fn need_write_ensures_freshness() {
447        let mut executor = TestExecutor::new();
448
449        let (s1, s2) = zx::Socket::create_stream();
450
451        // Completely fill the transmit buffer. This socket is no longer writable.
452        let socket_info = s2.info().expect("failed to get socket info");
453        let bytes = vec![0u8; socket_info.tx_buf_max];
454        assert_eq!(socket_info.tx_buf_max, s2.write(&bytes).expect("failed to write to socket"));
455
456        let async_s2 = Socket::from_socket(s2);
457
458        // The write signal is optimistically set on socket creation, so even though it's not
459        // possible to write, poll_writable returns Ready.
460        let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
461        assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
462
463        // Call need_writable to reacquire the write signal. The socket now
464        // knows that the signal is not actually set, so returns Pending.
465        assert!(async_s2.need_writable(&mut Context::from_waker(Waker::noop())).is_pending());
466        let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
467        assert!(executor.run_until_stalled(&mut tx_fut).is_pending());
468
469        let mut buffer = [0u8; 5];
470        assert_eq!(s1.read(&mut buffer).expect("failed to read 5 bytes"), 5);
471
472        // After reading from s1, its peer is now able to write and should have a write signal.
473        assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
474    }
475
476    #[test]
477    fn half_closed_for_writes() {
478        let mut executor = TestExecutor::new();
479
480        let (s1, s2) = zx::Socket::create_stream();
481
482        // Completely fill the transmit buffer. This socket is no longer writable.
483        let socket_info = s2.info().expect("failed to get socket info");
484        let bytes = vec![0u8; socket_info.tx_buf_max];
485        assert_eq!(socket_info.tx_buf_max, s2.write(&bytes).expect("failed to write to socket"));
486
487        let async_s2 = Socket::from_socket(s2);
488        let mut tx_fut = poll_fn(|cx| async_s2.poll_write_ref(cx, &bytes[..]));
489        assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Pending);
490
491        s1.set_disposition(None, Some(SocketWriteDisposition::Disabled)).expect("set disposition");
492        assert_eq!(
493            executor.run_until_stalled(&mut tx_fut),
494            Poll::Ready(Err::<usize, _>(zx::Status::BAD_STATE))
495        );
496
497        // Drain the socket so we can reopen it.
498        let mut readbuf = vec![0u8; bytes.len()];
499        assert_eq!(s1.read(&mut readbuf[..]), Ok(readbuf.len()));
500        s1.set_disposition(None, Some(SocketWriteDisposition::Enabled)).expect("set disposition");
501
502        assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(bytes.len())));
503    }
504
505    #[test]
506    fn half_closed_for_reads() {
507        let mut executor = TestExecutor::new();
508
509        let (s1, s2) = zx::Socket::create_stream();
510        let async_s2 = Socket::from_socket(s2);
511        let mut bytes = [0u8; 10];
512        let mut tx_fut = poll_fn(|cx| async_s2.poll_read_ref(cx, &mut bytes[..]));
513        assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Pending);
514
515        // Write a message and then half close.
516        let msg = b"hello";
517        assert_eq!(s1.write(msg), Ok(msg.len()));
518        s1.set_disposition(Some(SocketWriteDisposition::Disabled), None).expect("set disposition");
519        assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(msg.len())));
520        assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(0)));
521
522        // Reopen.
523        s1.set_disposition(Some(SocketWriteDisposition::Enabled), None).expect("set disposition");
524        assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Pending);
525
526        // Close once more, without any bytes this time.
527        s1.set_disposition(Some(SocketWriteDisposition::Disabled), None).expect("set disposition");
528        assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(0)));
529    }
530}