fuchsia_async/net/fuchsia/
mod.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![allow(missing_docs)]
6
7mod tcp;
8pub use self::tcp::*;
9
10mod udp;
11pub use self::udp::*;
12
13use futures::io::{self, AsyncRead, AsyncWrite};
14use futures::ready;
15use futures::task::{AtomicWaker, Context};
16use zx::{self as zx, AsHandleRef};
17
18use std::io::{Read, Write};
19use std::os::unix::io::{AsRawFd, RawFd};
20use std::pin::Pin;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::sync::Arc;
23use std::task::Poll;
24use std::{fmt, mem};
25
26use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
27
28const READABLE: usize = libc::EPOLLIN as usize;
29const WRITABLE: usize = libc::EPOLLOUT as usize;
30const ERROR: usize = libc::EPOLLERR as usize;
31const HUP: usize = libc::EPOLLHUP as usize;
32
33// Unsafe to use. `receive_packet` must not be called after
34// `fdio` is invalidated.
35pub(crate) struct EventedFdPacketReceiver {
36    fdio: *const syscall::fdio_t,
37    signals: AtomicUsize,
38    read_task: AtomicWaker,
39    write_task: AtomicWaker,
40}
41
42// Needed because of the fdio pointer.
43// It is safe to send because the `EventedFdPacketReceiver` must be
44// deregistered (and therefore `receive_packet` never called again)
45// before `fdio_unsafe_release` is called.
46unsafe impl Send for EventedFdPacketReceiver {}
47unsafe impl Sync for EventedFdPacketReceiver {}
48
49impl PacketReceiver for EventedFdPacketReceiver {
50    fn receive_packet(&self, packet: zx::Packet) {
51        let observed_signals = if let zx::PacketContents::SignalOne(p) = packet.contents() {
52            p.observed()
53        } else {
54            return;
55        };
56
57        let mut events: u32 = 0;
58        unsafe {
59            syscall::fdio_unsafe_wait_end(self.fdio, observed_signals.bits(), &mut events);
60        }
61        let events = events as usize;
62
63        let old = self.signals.fetch_or(events, Ordering::SeqCst);
64        let became_readable = ((events & READABLE) != 0) && ((old & READABLE) == 0);
65        let became_writable = ((events & WRITABLE) != 0) && ((old & WRITABLE) == 0);
66        let err_occurred = (events & (ERROR | HUP)) != 0;
67
68        if became_readable || err_occurred {
69            self.read_task.wake();
70        }
71        if became_writable || err_occurred {
72            self.write_task.wake();
73        }
74    }
75}
76
77/// A type which can be used for receiving IO events for a file descriptor.
78pub struct EventedFd<T> {
79    inner: T,
80    // Must be valid, acquired from `fdio_unsafe_fd_to_io`
81    fdio: *const syscall::fdio_t,
82    // Must be dropped before `fdio_unsafe_release` is called
83    signal_receiver: mem::ManuallyDrop<ReceiverRegistration<EventedFdPacketReceiver>>,
84}
85
86unsafe impl<T> Send for EventedFd<T> where T: Send {}
87unsafe impl<T> Sync for EventedFd<T> where T: Sync {}
88
89impl<T> Unpin for EventedFd<T> {}
90
91impl<T> Drop for EventedFd<T> {
92    fn drop(&mut self) {
93        unsafe {
94            // Drop the receiver so `packet_receive` may not be called again.
95            mem::ManuallyDrop::drop(&mut self.signal_receiver);
96
97            // Release the fdio
98            syscall::fdio_unsafe_release(self.fdio);
99        }
100
101        // Then `inner` gets dropped
102    }
103}
104
105impl<T: fmt::Debug> fmt::Debug for EventedFd<T> {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        // FIXME(https://github.com/rust-lang/rust/issues/67364): This could be
108        // better written with `.finish_non_exhaustive()` once that feature is
109        // stablized.
110        f.debug_struct("EventedFd").field("inner", &self.inner).finish()
111    }
112}
113
114impl<T> EventedFd<T>
115where
116    T: AsRawFd,
117{
118    /// Creates a new EventedFd.
119    ///
120    /// # Safety
121    ///
122    /// The raw file descriptor returned from `inner.as_raw_fd()` must not be
123    /// closed until the returned `EventedFd` is dropped.
124    pub unsafe fn new(inner: T) -> io::Result<Self> {
125        let fdio = syscall::fdio_unsafe_fd_to_io(inner.as_raw_fd());
126        let signal_receiver =
127            EHandle::local().register_receiver(Arc::new(EventedFdPacketReceiver {
128                fdio,
129                // Optimistically assume that the fd is readable and writable.
130                // Reads and writes will be attempted before queueing a packet.
131                // This makes fds slightly faster to read/write the first time
132                // they're accessed after being created, provided they start off as
133                // readable or writable. In return, there will be an extra wasted
134                // syscall per read/write if the fd is not readable or writable.
135                signals: AtomicUsize::new(READABLE | WRITABLE),
136                read_task: AtomicWaker::new(),
137                write_task: AtomicWaker::new(),
138            }));
139
140        let evented_fd =
141            EventedFd { inner, fdio, signal_receiver: mem::ManuallyDrop::new(signal_receiver) };
142
143        // Make sure a packet is delivered if an error or closure occurs.
144        evented_fd.schedule_packet(ERROR | HUP);
145
146        // Need to schedule packets to maintain the invariant that
147        // if !READABLE or !WRITABLE a packet has been scheduled.
148        evented_fd.schedule_packet(READABLE);
149        evented_fd.schedule_packet(WRITABLE);
150
151        Ok(evented_fd)
152    }
153    /// Tests to see if this resource is ready to be read from.
154    /// If it is not, it arranges for the current task to receive a notification
155    /// when a "readable" signal arrives.
156    pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
157        let receiver = self.signal_receiver.receiver();
158        if (receiver.signals.load(Ordering::SeqCst) & (READABLE | ERROR | HUP)) != 0 {
159            Poll::Ready(Ok(()))
160        } else {
161            self.need_read(cx);
162            Poll::Pending
163        }
164    }
165
166    /// Tests to see if this resource is ready to be written to.
167    /// If it is not, it arranges for the current task to receive a notification
168    /// when a "writable" signal arrives.
169    pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
170        let receiver = self.signal_receiver.receiver();
171        if (receiver.signals.load(Ordering::SeqCst) & (WRITABLE | ERROR | HUP)) != 0 {
172            Poll::Ready(Ok(()))
173        } else {
174            self.need_write(cx);
175            Poll::Pending
176        }
177    }
178
179    // Returns a reference to the underlying IO object.
180    pub fn as_ref(&self) -> &T {
181        &self.inner
182    }
183
184    // Returns a mutable reference to the underlying IO object.
185    pub fn as_mut(&mut self) -> &mut T {
186        &mut self.inner
187    }
188
189    /// Arranges for the current task to receive a notification when a "readable"
190    /// signal arrives.
191    pub fn need_read(&self, cx: &mut Context<'_>) {
192        let receiver = self.signal_receiver.receiver();
193        receiver.read_task.register(cx.waker());
194        let old = receiver.signals.fetch_and(!READABLE, Ordering::SeqCst);
195        // We only need to schedule a new packet if one isn't already scheduled.
196        // If READABLE was already false, a packet was already scheduled.
197        if (old & READABLE) != 0 {
198            self.schedule_packet(READABLE);
199        }
200    }
201
202    /// Arranges for the current task to receive a notification when a "writable"
203    /// signal arrives.
204    pub fn need_write(&self, cx: &mut Context<'_>) {
205        let receiver = self.signal_receiver.receiver();
206        receiver.write_task.register(cx.waker());
207        let old = receiver.signals.fetch_and(!WRITABLE, Ordering::SeqCst);
208        // We only need to schedule a new packet if one isn't already scheduled.
209        // If WRITABLE was already false, a packet was already scheduled.
210        if (old & WRITABLE) != 0 {
211            self.schedule_packet(WRITABLE);
212        }
213    }
214
215    fn schedule_packet(&self, signals: usize) {
216        unsafe {
217            let (mut raw_handle, mut raw_signals) = (0, 0);
218            syscall::fdio_unsafe_wait_begin(
219                self.fdio,
220                signals as u32,
221                &mut raw_handle,
222                &mut raw_signals,
223            );
224
225            let handle = zx::Handle::from_raw(raw_handle);
226            let signals = zx::Signals::from_bits_truncate(raw_signals);
227
228            let res = handle.wait_async_handle(
229                self.signal_receiver.port(),
230                self.signal_receiver.key(),
231                signals,
232                zx::WaitAsyncOpts::empty(),
233            );
234
235            // The handle is borrowed, so we cannot drop it.
236            mem::forget(handle);
237            res.expect("Error scheduling EventedFd notification");
238        }
239    }
240
241    /// Clears all incoming signals.
242    pub fn clear(&self) {
243        self.signal_receiver.receiver().signals.store(0, Ordering::SeqCst);
244    }
245}
246
247impl<T: AsRawFd> AsRawFd for EventedFd<T> {
248    fn as_raw_fd(&self) -> RawFd {
249        self.as_ref().as_raw_fd()
250    }
251}
252
253impl<T: AsRawFd + Read> AsyncRead for EventedFd<T> {
254    fn poll_read(
255        mut self: Pin<&mut Self>,
256        cx: &mut Context<'_>,
257        buf: &mut [u8],
258    ) -> Poll<Result<usize, io::Error>> {
259        ready!(EventedFd::poll_readable(&*self, cx))?;
260        let res = (&mut *self).as_mut().read(buf);
261        if let Err(e) = &res {
262            if e.kind() == io::ErrorKind::WouldBlock {
263                self.need_read(cx);
264                return Poll::Pending;
265            }
266        }
267        Poll::Ready(res.map_err(Into::into))
268    }
269
270    // TODO: override poll_vectored_read and call readv on the underlying handle
271}
272
273impl<T: AsRawFd + Write> AsyncWrite for EventedFd<T> {
274    fn poll_write(
275        mut self: Pin<&mut Self>,
276        cx: &mut Context<'_>,
277        buf: &[u8],
278    ) -> Poll<Result<usize, io::Error>> {
279        ready!(EventedFd::poll_writable(&*self, cx))?;
280        let res = (&mut *self).as_mut().write(buf);
281        if let Err(e) = &res {
282            if e.kind() == io::ErrorKind::WouldBlock {
283                self.need_write(cx);
284                return Poll::Pending;
285            }
286        }
287        Poll::Ready(res.map_err(Into::into))
288    }
289
290    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
291        Poll::Ready(Ok(()))
292    }
293
294    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
295        Poll::Ready(Ok(()))
296    }
297
298    // TODO: override poll_vectored_write and call writev on the underlying handle
299}
300
301impl<'a, T> AsyncRead for &'a EventedFd<T>
302where
303    T: AsRawFd,
304    for<'b> &'b T: Read,
305{
306    fn poll_read(
307        self: Pin<&mut Self>,
308        cx: &mut Context<'_>,
309        buf: &mut [u8],
310    ) -> Poll<Result<usize, io::Error>> {
311        ready!(EventedFd::poll_readable(&*self, cx))?;
312        let res = (&*self).as_ref().read(buf);
313        if let Err(e) = &res {
314            if e.kind() == io::ErrorKind::WouldBlock {
315                self.need_read(cx);
316                return Poll::Pending;
317            }
318        }
319        Poll::Ready(res.map_err(Into::into))
320    }
321}
322
323impl<'a, T> AsyncWrite for &'a EventedFd<T>
324where
325    T: AsRawFd,
326    for<'b> &'b T: Write,
327{
328    fn poll_write(
329        self: Pin<&mut Self>,
330        cx: &mut Context<'_>,
331        buf: &[u8],
332    ) -> Poll<Result<usize, io::Error>> {
333        ready!(EventedFd::poll_writable(&*self, cx))?;
334        let res = (&*self).as_ref().write(buf);
335        if let Err(e) = &res {
336            if e.kind() == io::ErrorKind::WouldBlock {
337                self.need_write(cx);
338                return Poll::Pending;
339            }
340        }
341        Poll::Ready(res.map_err(Into::into))
342    }
343
344    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
345        Poll::Ready(Ok(()))
346    }
347
348    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
349        Poll::Ready(Ok(()))
350    }
351}
352
353mod syscall {
354    #![allow(non_camel_case_types, improper_ctypes)]
355    use std::os::unix::io::RawFd;
356    pub use zx::sys::{zx_handle_t, zx_signals_t};
357
358    // This is the "improper" c type
359    pub type fdio_t = ();
360
361    // From libfdio.so
362    extern "C" {
363        pub fn fdio_unsafe_fd_to_io(fd: RawFd) -> *const fdio_t;
364        pub fn fdio_unsafe_release(io: *const fdio_t);
365
366        pub fn fdio_unsafe_wait_begin(
367            io: *const fdio_t,
368            events: u32,
369            handle_out: &mut zx_handle_t,
370            signals_out: &mut zx_signals_t,
371        );
372
373        pub fn fdio_unsafe_wait_end(io: *const fdio_t, signals: zx_signals_t, events_out: &mut u32);
374    }
375}