fuchsia_async/handle/zircon/
socket.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::on_signals::OnSignalsRef;
6use super::rwhandle::{RWHandle, ReadableHandle, ReadableState, WritableHandle, WritableState};
7use futures::future::poll_fn;
8use futures::io::{self, AsyncRead, AsyncWrite};
9use futures::ready;
10use futures::stream::Stream;
11use futures::task::Context;
12use std::fmt;
13use std::pin::Pin;
14use std::task::Poll;
15use zx::{self as zx, AsHandleRef};
16
17/// An I/O object representing a `Socket`.
18pub struct Socket(RWHandle<zx::Socket>);
19
20impl AsRef<zx::Socket> for Socket {
21    fn as_ref(&self) -> &zx::Socket {
22        &self.0.get_ref()
23    }
24}
25
26impl AsHandleRef for Socket {
27    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
28        self.0.get_ref().as_handle_ref()
29    }
30}
31
32impl Socket {
33    /// Create a new `Socket` from a previously-created `zx::Socket`.
34    ///
35    /// # Panics
36    ///
37    /// If called outside the context of an active async executor.
38    pub fn from_socket(socket: zx::Socket) -> Self {
39        Socket(RWHandle::new(socket))
40    }
41
42    /// Consumes `self` and returns the underlying `zx::Socket`.
43    pub fn into_zx_socket(self) -> zx::Socket {
44        self.0.into_inner()
45    }
46
47    /// Returns true if the socket received the `OBJECT_PEER_CLOSED` signal.
48    pub fn is_closed(&self) -> bool {
49        self.0.is_closed()
50    }
51
52    /// Returns a future that completes when the socket received the `OBJECT_PEER_CLOSED` signal.
53    pub fn on_closed(&self) -> OnSignalsRef<'_> {
54        self.0.on_closed()
55    }
56
57    /// Attempt to read from the socket, registering for wakeup if the socket doesn't have any
58    /// contents available. Used internally in the `AsyncRead` implementation, exposed for users
59    /// who know the concrete type they're using and don't want to pin the socket.
60    pub fn poll_read_ref(
61        &self,
62        cx: &mut Context<'_>,
63        buf: &mut [u8],
64    ) -> Poll<Result<usize, zx::Status>> {
65        ready!(self.poll_readable(cx))?;
66        loop {
67            let res = self.0.get_ref().read(buf);
68            match res {
69                Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
70                Err(zx::Status::PEER_CLOSED) => return Poll::Ready(Ok(0)),
71                _ => return Poll::Ready(res),
72            }
73        }
74    }
75
76    /// Attempt to write into the socket, registering for wakeup if the socket is not ready. Used
77    /// internally in the `AsyncWrite` implementation, exposed for users who know the concrete type
78    /// they're using and don't want to pin the socket.
79    pub fn poll_write_ref(
80        &self,
81        cx: &mut Context<'_>,
82        buf: &[u8],
83    ) -> Poll<Result<usize, zx::Status>> {
84        ready!(self.poll_writable(cx))?;
85        loop {
86            let res = self.0.get_ref().write(buf);
87            match res {
88                Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
89                _ => return Poll::Ready(res),
90            }
91        }
92    }
93
94    /// Polls for the next data on the socket, appending it to the end of |out| if it has arrived.
95    /// Not very useful for a non-datagram socket as it will return all available data
96    /// on the socket.
97    pub fn poll_datagram(
98        &self,
99        cx: &mut Context<'_>,
100        out: &mut Vec<u8>,
101    ) -> Poll<Result<usize, zx::Status>> {
102        ready!(self.poll_readable(cx))?;
103        let avail = self.0.get_ref().outstanding_read_bytes()?;
104        let len = out.len();
105        out.resize(len + avail, 0);
106        let (_, mut tail) = out.split_at_mut(len);
107        loop {
108            match self.0.get_ref().read(&mut tail) {
109                Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
110                Err(e) => return Poll::Ready(Err(e)),
111                Ok(bytes) => {
112                    return if bytes == avail {
113                        Poll::Ready(Ok(bytes))
114                    } else {
115                        Poll::Ready(Err(zx::Status::BAD_STATE))
116                    }
117                }
118            }
119        }
120    }
121
122    /// Reads the next datagram that becomes available onto the end of |out|.  Note: Using this
123    /// multiple times concurrently is an error and the first one will never complete.
124    pub async fn read_datagram<'a>(&'a self, out: &'a mut Vec<u8>) -> Result<usize, zx::Status> {
125        poll_fn(move |cx| self.poll_datagram(cx, out)).await
126    }
127
128    /// Use this socket as a stream of `Result<Vec<u8>, zx::Status>` datagrams.
129    ///
130    /// Note: multiple concurrent streams from the same socket are not supported.
131    pub fn as_datagram_stream<'a>(&'a self) -> DatagramStream<&'a Self> {
132        DatagramStream(self)
133    }
134
135    /// Convert this socket into a stream of `Result<Vec<u8>, zx::Status>` datagrams.
136    pub fn into_datagram_stream(self) -> DatagramStream<Self> {
137        DatagramStream(self)
138    }
139}
140
141impl ReadableHandle for Socket {
142    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
143        self.0.poll_readable(cx)
144    }
145
146    fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
147        self.0.need_readable(cx)
148    }
149}
150
151impl WritableHandle for Socket {
152    fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
153        self.0.poll_writable(cx)
154    }
155
156    fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
157        self.0.need_writable(cx)
158    }
159}
160
161impl fmt::Debug for Socket {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        self.0.get_ref().fmt(f)
164    }
165}
166
167impl AsyncRead for Socket {
168    fn poll_read(
169        self: Pin<&mut Self>,
170        cx: &mut Context<'_>,
171        buf: &mut [u8],
172    ) -> Poll<io::Result<usize>> {
173        self.poll_read_ref(cx, buf).map_err(Into::into)
174    }
175}
176
177impl AsyncWrite for Socket {
178    fn poll_write(
179        self: Pin<&mut Self>,
180        cx: &mut Context<'_>,
181        buf: &[u8],
182    ) -> Poll<io::Result<usize>> {
183        self.poll_write_ref(cx, buf).map_err(Into::into)
184    }
185
186    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
187        Poll::Ready(Ok(()))
188    }
189
190    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
191        Poll::Ready(Ok(()))
192    }
193}
194
195impl<'a> AsyncRead for &'a Socket {
196    fn poll_read(
197        self: Pin<&mut Self>,
198        cx: &mut Context<'_>,
199        buf: &mut [u8],
200    ) -> Poll<io::Result<usize>> {
201        self.poll_read_ref(cx, buf).map_err(Into::into)
202    }
203}
204
205impl<'a> AsyncWrite for &'a Socket {
206    fn poll_write(
207        self: Pin<&mut Self>,
208        cx: &mut Context<'_>,
209        buf: &[u8],
210    ) -> Poll<io::Result<usize>> {
211        self.poll_write_ref(cx, buf).map_err(Into::into)
212    }
213
214    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
215        Poll::Ready(Ok(()))
216    }
217
218    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
219        Poll::Ready(Ok(()))
220    }
221}
222
223/// A datagram stream from a `Socket`.
224#[derive(Debug)]
225pub struct DatagramStream<S>(pub S);
226
227fn poll_datagram_as_stream(
228    socket: &Socket,
229    cx: &mut Context<'_>,
230) -> Poll<Option<Result<Vec<u8>, zx::Status>>> {
231    let mut res = Vec::<u8>::new();
232    Poll::Ready(match ready!(socket.poll_datagram(cx, &mut res)) {
233        Ok(_size) => Some(Ok(res)),
234        Err(zx::Status::PEER_CLOSED) => None,
235        Err(e) => Some(Err(e)),
236    })
237}
238
239impl Stream for DatagramStream<Socket> {
240    type Item = Result<Vec<u8>, zx::Status>;
241
242    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
243        poll_datagram_as_stream(&self.0, cx)
244    }
245}
246
247impl Stream for DatagramStream<&Socket> {
248    type Item = Result<Vec<u8>, zx::Status>;
249
250    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
251        poll_datagram_as_stream(self.0, cx)
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258    use crate::{MonotonicInstant, TestExecutor, TimeoutExt, Timer};
259
260    use futures::future::{self, join};
261    use futures::io::{AsyncReadExt as _, AsyncWriteExt as _};
262    use futures::stream::TryStreamExt;
263    use futures::task::noop_waker_ref;
264    use futures::FutureExt;
265    use std::pin::pin;
266
267    #[test]
268    fn can_read_write() {
269        let mut exec = TestExecutor::new();
270        let bytes = &[0, 1, 2, 3];
271
272        let (tx, rx) = zx::Socket::create_stream();
273        let (mut tx, mut rx) = (Socket::from_socket(tx), Socket::from_socket(rx));
274
275        let receive_future = async {
276            let mut buf = vec![];
277            rx.read_to_end(&mut buf).await.expect("reading socket");
278            assert_eq!(&*buf, bytes);
279        };
280
281        // add a timeout to receiver so if test is broken it doesn't take forever
282        // Note: if debugging a hang, you may want to lower the timeout to `300.millis()` to get
283        // faster feedback. This is set to 10s rather than something shorter to avoid triggering
284        // flakes if things happen to be slow.
285        let receiver = receive_future
286            .on_timeout(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10)), || {
287                panic!("timeout")
288            });
289
290        // Sends a message after the timeout has passed
291        let sender = async move {
292            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_millis(100))).await;
293            tx.write_all(bytes).await.expect("writing into socket");
294            // close socket to signal no more bytes will be written
295            drop(tx);
296        };
297
298        let done = join(receiver, sender);
299        exec.run_singlethreaded(done);
300    }
301
302    #[test]
303    fn can_read_datagram() {
304        let mut exec = TestExecutor::new();
305
306        let (one, two) = (&[0, 1], &[2, 3, 4, 5]);
307
308        let (tx, rx) = zx::Socket::create_datagram();
309        let rx = Socket::from_socket(rx);
310
311        let mut out = vec![50];
312
313        assert!(tx.write(one).is_ok());
314        assert!(tx.write(two).is_ok());
315
316        let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
317
318        assert!(size.is_ok());
319        assert_eq!(one.len(), size.unwrap());
320
321        assert_eq!([50, 0, 1], out.as_slice());
322
323        let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
324
325        assert!(size.is_ok());
326        assert_eq!(two.len(), size.unwrap());
327
328        assert_eq!([50, 0, 1, 2, 3, 4, 5], out.as_slice());
329    }
330
331    #[test]
332    fn stream_datagram() {
333        let mut exec = TestExecutor::new();
334
335        let (tx, rx) = zx::Socket::create_datagram();
336        let mut rx = Socket::from_socket(rx).into_datagram_stream();
337
338        let packets = 20;
339
340        for size in 1..packets + 1 {
341            let mut vec = Vec::<u8>::new();
342            vec.resize(size, size as u8);
343            assert!(tx.write(&vec).is_ok());
344        }
345
346        // Close the socket.
347        drop(tx);
348
349        let stream_read_fut = async move {
350            let mut count = 0;
351            while let Some(packet) = rx.try_next().await.expect("received error from stream") {
352                count = count + 1;
353                assert_eq!(packet.len(), count);
354                assert!(packet.iter().all(|&x| x == count as u8));
355            }
356            assert_eq!(packets, count);
357        };
358
359        exec.run_singlethreaded(stream_read_fut);
360    }
361
362    #[test]
363    fn peer_closed_signal_raised() {
364        let mut executor = TestExecutor::new();
365
366        let (s1, s2) = zx::Socket::create_stream();
367        let mut async_s2 = Socket::from_socket(s2);
368
369        // The socket won't start watching for peer-closed until we actually try reading from it.
370        let _ = executor.run_until_stalled(&mut pin!(async {
371            let mut buf = [0; 16];
372            let _ = async_s2.read(&mut buf).await;
373        }));
374
375        let on_closed_fut = async_s2.on_closed();
376
377        drop(s1);
378
379        // Now make sure all packets get processed before we poll the socket.
380        let _ = executor.run_until_stalled(&mut future::pending::<()>());
381
382        // Dropping s1 raises a closed signal on s2 when the executor next polls the signal port.
383        let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
384
385        if let Poll::Ready(Ok(state)) = executor.run_until_stalled(&mut rx_fut) {
386            assert_eq!(state, ReadableState::MaybeReadableAndClosed);
387        } else {
388            panic!("Expected future to be ready and Ok");
389        }
390        assert!(async_s2.is_closed());
391        assert_eq!(on_closed_fut.now_or_never(), Some(Ok(zx::Signals::CHANNEL_PEER_CLOSED)));
392    }
393
394    #[test]
395    fn need_read_ensures_freshness() {
396        let mut executor = TestExecutor::new();
397
398        let (s1, s2) = zx::Socket::create_stream();
399        let async_s2 = Socket::from_socket(s2);
400
401        // The read signal is optimistically set on socket creation, so even though there is
402        // nothing to read, poll_readable returns Ready.
403        let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
404        assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
405
406        // Call need_readable to reacquire the read signal. The socket now knows
407        // that the signal is not actually set, so returns Pending.
408        assert!(async_s2.need_readable(&mut Context::from_waker(noop_waker_ref())).is_pending());
409        let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
410        assert!(executor.run_until_stalled(&mut rx_fut).is_pending());
411
412        assert_eq!(s1.write(b"hello!").expect("failed to write 6 bytes"), 6);
413
414        // After writing to s1, its peer now has an actual read signal and is Ready.
415        assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
416    }
417
418    #[test]
419    fn need_write_ensures_freshness() {
420        let mut executor = TestExecutor::new();
421
422        let (s1, s2) = zx::Socket::create_stream();
423
424        // Completely fill the transmit buffer. This socket is no longer writable.
425        let socket_info = s2.info().expect("failed to get socket info");
426        let bytes = vec![0u8; socket_info.tx_buf_max];
427        assert_eq!(socket_info.tx_buf_max, s2.write(&bytes).expect("failed to write to socket"));
428
429        let async_s2 = Socket::from_socket(s2);
430
431        // The write signal is optimistically set on socket creation, so even though it's not
432        // possible to write, poll_writable returns Ready.
433        let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
434        assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
435
436        // Call need_writable to reacquire the write signal. The socket now
437        // knows that the signal is not actually set, so returns Pending.
438        assert!(async_s2.need_writable(&mut Context::from_waker(noop_waker_ref())).is_pending());
439        let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
440        assert!(executor.run_until_stalled(&mut tx_fut).is_pending());
441
442        let mut buffer = [0u8; 5];
443        assert_eq!(s1.read(&mut buffer).expect("failed to read 5 bytes"), 5);
444
445        // After reading from s1, its peer is now able to write and should have a write signal.
446        assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
447    }
448}