fuchsia_async/net/fuchsia/
mod.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![allow(missing_docs)]
6
7mod tcp;
8pub use self::tcp::*;
9
10mod udp;
11pub use self::udp::*;
12
13use futures::io::{self, AsyncRead, AsyncWrite};
14use futures::ready;
15use futures::task::{AtomicWaker, Context};
16use zx::{self as zx, AsHandleRef};
17
18use std::convert::{AsMut, AsRef};
19use std::io::{Read, Write};
20use std::os::unix::io::{AsRawFd, RawFd};
21use std::pin::Pin;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::Arc;
24use std::task::Poll;
25use std::{fmt, mem};
26
27use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
28
29const READABLE: usize = libc::EPOLLIN as usize;
30const WRITABLE: usize = libc::EPOLLOUT as usize;
31const ERROR: usize = libc::EPOLLERR as usize;
32const HUP: usize = libc::EPOLLHUP as usize;
33
34// Unsafe to use. `receive_packet` must not be called after
35// `fdio` is invalidated.
36pub(crate) struct EventedFdPacketReceiver {
37    fdio: *const syscall::fdio_t,
38    signals: AtomicUsize,
39    read_task: AtomicWaker,
40    write_task: AtomicWaker,
41}
42
43// Needed because of the fdio pointer.
44// It is safe to send because the `EventedFdPacketReceiver` must be
45// deregistered (and therefore `receive_packet` never called again)
46// before `fdio_unsafe_release` is called.
47unsafe impl Send for EventedFdPacketReceiver {}
48unsafe impl Sync for EventedFdPacketReceiver {}
49
50impl PacketReceiver for EventedFdPacketReceiver {
51    fn receive_packet(&self, packet: zx::Packet) {
52        let observed_signals = if let zx::PacketContents::SignalOne(p) = packet.contents() {
53            p.observed()
54        } else {
55            return;
56        };
57
58        let mut events: u32 = 0;
59        unsafe {
60            syscall::fdio_unsafe_wait_end(self.fdio, observed_signals.bits(), &mut events);
61        }
62        let events = events as usize;
63
64        let old = self.signals.fetch_or(events, Ordering::SeqCst);
65        let became_readable = ((events & READABLE) != 0) && ((old & READABLE) == 0);
66        let became_writable = ((events & WRITABLE) != 0) && ((old & WRITABLE) == 0);
67        let err_occurred = (events & (ERROR | HUP)) != 0;
68
69        if became_readable || err_occurred {
70            self.read_task.wake();
71        }
72        if became_writable || err_occurred {
73            self.write_task.wake();
74        }
75    }
76}
77
78/// A type which can be used for receiving IO events for a file descriptor.
79pub struct EventedFd<T> {
80    inner: T,
81    // Must be valid, acquired from `fdio_unsafe_fd_to_io`
82    fdio: *const syscall::fdio_t,
83    // Must be dropped before `fdio_unsafe_release` is called
84    signal_receiver: mem::ManuallyDrop<ReceiverRegistration<EventedFdPacketReceiver>>,
85}
86
87unsafe impl<T> Send for EventedFd<T> where T: Send {}
88unsafe impl<T> Sync for EventedFd<T> where T: Sync {}
89
90impl<T> Unpin for EventedFd<T> {}
91
92impl<T> Drop for EventedFd<T> {
93    fn drop(&mut self) {
94        unsafe {
95            // Drop the receiver so `packet_receive` may not be called again.
96            mem::ManuallyDrop::drop(&mut self.signal_receiver);
97
98            // Release the fdio
99            syscall::fdio_unsafe_release(self.fdio);
100        }
101
102        // Then `inner` gets dropped
103    }
104}
105
106impl<T: fmt::Debug> fmt::Debug for EventedFd<T> {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        // FIXME(https://github.com/rust-lang/rust/issues/67364): This could be
109        // better written with `.finish_non_exhaustive()` once that feature is
110        // stablized.
111        f.debug_struct("EventedFd").field("inner", &self.inner).finish()
112    }
113}
114
115impl<T> AsRef<T> for EventedFd<T>
116where
117    T: AsRawFd,
118{
119    // Returns a reference to the underlying IO object.
120    fn as_ref(&self) -> &T {
121        &self.inner
122    }
123}
124
125impl<T> AsMut<T> for EventedFd<T>
126where
127    T: AsRawFd,
128{
129    // Returns a mutable reference to the underlying IO object.
130    fn as_mut(&mut self) -> &mut T {
131        &mut self.inner
132    }
133}
134
135impl<T> EventedFd<T>
136where
137    T: AsRawFd,
138{
139    /// Creates a new EventedFd.
140    ///
141    /// # Safety
142    ///
143    /// The raw file descriptor returned from `inner.as_raw_fd()` must not be
144    /// closed until the returned `EventedFd` is dropped.
145    pub unsafe fn new(inner: T) -> io::Result<Self> {
146        let fdio = syscall::fdio_unsafe_fd_to_io(inner.as_raw_fd());
147        let signal_receiver =
148            EHandle::local().register_receiver(Arc::new(EventedFdPacketReceiver {
149                fdio,
150                // Optimistically assume that the fd is readable and writable.
151                // Reads and writes will be attempted before queueing a packet.
152                // This makes fds slightly faster to read/write the first time
153                // they're accessed after being created, provided they start off as
154                // readable or writable. In return, there will be an extra wasted
155                // syscall per read/write if the fd is not readable or writable.
156                signals: AtomicUsize::new(READABLE | WRITABLE),
157                read_task: AtomicWaker::new(),
158                write_task: AtomicWaker::new(),
159            }));
160
161        let evented_fd =
162            EventedFd { inner, fdio, signal_receiver: mem::ManuallyDrop::new(signal_receiver) };
163
164        // Make sure a packet is delivered if an error or closure occurs.
165        evented_fd.schedule_packet(ERROR | HUP);
166
167        // Need to schedule packets to maintain the invariant that
168        // if !READABLE or !WRITABLE a packet has been scheduled.
169        evented_fd.schedule_packet(READABLE);
170        evented_fd.schedule_packet(WRITABLE);
171
172        Ok(evented_fd)
173    }
174    /// Tests to see if this resource is ready to be read from.
175    /// If it is not, it arranges for the current task to receive a notification
176    /// when a "readable" signal arrives.
177    pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
178        let receiver = self.signal_receiver.receiver();
179        if (receiver.signals.load(Ordering::SeqCst) & (READABLE | ERROR | HUP)) != 0 {
180            Poll::Ready(Ok(()))
181        } else {
182            self.need_read(cx);
183            Poll::Pending
184        }
185    }
186
187    /// Tests to see if this resource is ready to be written to.
188    /// If it is not, it arranges for the current task to receive a notification
189    /// when a "writable" signal arrives.
190    pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
191        let receiver = self.signal_receiver.receiver();
192        if (receiver.signals.load(Ordering::SeqCst) & (WRITABLE | ERROR | HUP)) != 0 {
193            Poll::Ready(Ok(()))
194        } else {
195            self.need_write(cx);
196            Poll::Pending
197        }
198    }
199
200    /// Arranges for the current task to receive a notification when a "readable"
201    /// signal arrives.
202    pub fn need_read(&self, cx: &mut Context<'_>) {
203        let receiver = self.signal_receiver.receiver();
204        receiver.read_task.register(cx.waker());
205        let old = receiver.signals.fetch_and(!READABLE, Ordering::SeqCst);
206        // We only need to schedule a new packet if one isn't already scheduled.
207        // If READABLE was already false, a packet was already scheduled.
208        if (old & READABLE) != 0 {
209            self.schedule_packet(READABLE);
210        }
211    }
212
213    /// Arranges for the current task to receive a notification when a "writable"
214    /// signal arrives.
215    pub fn need_write(&self, cx: &mut Context<'_>) {
216        let receiver = self.signal_receiver.receiver();
217        receiver.write_task.register(cx.waker());
218        let old = receiver.signals.fetch_and(!WRITABLE, Ordering::SeqCst);
219        // We only need to schedule a new packet if one isn't already scheduled.
220        // If WRITABLE was already false, a packet was already scheduled.
221        if (old & WRITABLE) != 0 {
222            self.schedule_packet(WRITABLE);
223        }
224    }
225
226    fn schedule_packet(&self, signals: usize) {
227        unsafe {
228            let (mut raw_handle, mut raw_signals) = (0, 0);
229            syscall::fdio_unsafe_wait_begin(
230                self.fdio,
231                signals as u32,
232                &mut raw_handle,
233                &mut raw_signals,
234            );
235
236            let handle = zx::Handle::from_raw(raw_handle);
237            let signals = zx::Signals::from_bits_truncate(raw_signals);
238
239            let res = handle.wait_async_handle(
240                self.signal_receiver.port(),
241                self.signal_receiver.key(),
242                signals,
243                zx::WaitAsyncOpts::empty(),
244            );
245
246            // The handle is borrowed, so we cannot drop it.
247            mem::forget(handle);
248            res.expect("Error scheduling EventedFd notification");
249        }
250    }
251
252    /// Clears all incoming signals.
253    pub fn clear(&self) {
254        self.signal_receiver.receiver().signals.store(0, Ordering::SeqCst);
255    }
256}
257
258impl<T: AsRawFd> AsRawFd for EventedFd<T> {
259    fn as_raw_fd(&self) -> RawFd {
260        self.as_ref().as_raw_fd()
261    }
262}
263
264impl<T: AsRawFd + Read> AsyncRead for EventedFd<T> {
265    fn poll_read(
266        mut self: Pin<&mut Self>,
267        cx: &mut Context<'_>,
268        buf: &mut [u8],
269    ) -> Poll<Result<usize, io::Error>> {
270        ready!(EventedFd::poll_readable(&*self, cx))?;
271        let res = (*self).as_mut().read(buf);
272        if let Err(e) = &res {
273            if e.kind() == io::ErrorKind::WouldBlock {
274                self.need_read(cx);
275                return Poll::Pending;
276            }
277        }
278        Poll::Ready(res)
279    }
280
281    // TODO: override poll_vectored_read and call readv on the underlying handle
282}
283
284impl<T: AsRawFd + Write> AsyncWrite for EventedFd<T> {
285    fn poll_write(
286        mut self: Pin<&mut Self>,
287        cx: &mut Context<'_>,
288        buf: &[u8],
289    ) -> Poll<Result<usize, io::Error>> {
290        ready!(EventedFd::poll_writable(&*self, cx))?;
291        let res = (*self).as_mut().write(buf);
292        if let Err(e) = &res {
293            if e.kind() == io::ErrorKind::WouldBlock {
294                self.need_write(cx);
295                return Poll::Pending;
296            }
297        }
298        Poll::Ready(res)
299    }
300
301    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
302        Poll::Ready(Ok(()))
303    }
304
305    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
306        Poll::Ready(Ok(()))
307    }
308
309    // TODO: override poll_vectored_write and call writev on the underlying handle
310}
311
312impl<T> AsyncRead for &EventedFd<T>
313where
314    T: AsRawFd,
315    for<'b> &'b T: Read,
316{
317    fn poll_read(
318        self: Pin<&mut Self>,
319        cx: &mut Context<'_>,
320        buf: &mut [u8],
321    ) -> Poll<Result<usize, io::Error>> {
322        ready!(EventedFd::poll_readable(*self, cx))?;
323        let res = (*self).as_ref().read(buf);
324        if let Err(e) = &res {
325            if e.kind() == io::ErrorKind::WouldBlock {
326                self.need_read(cx);
327                return Poll::Pending;
328            }
329        }
330        Poll::Ready(res)
331    }
332}
333
334impl<T> AsyncWrite for &EventedFd<T>
335where
336    T: AsRawFd,
337    for<'b> &'b T: Write,
338{
339    fn poll_write(
340        self: Pin<&mut Self>,
341        cx: &mut Context<'_>,
342        buf: &[u8],
343    ) -> Poll<Result<usize, io::Error>> {
344        ready!(EventedFd::poll_writable(*self, cx))?;
345        let res = (*self).as_ref().write(buf);
346        if let Err(e) = &res {
347            if e.kind() == io::ErrorKind::WouldBlock {
348                self.need_write(cx);
349                return Poll::Pending;
350            }
351        }
352        Poll::Ready(res)
353    }
354
355    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
356        Poll::Ready(Ok(()))
357    }
358
359    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
360        Poll::Ready(Ok(()))
361    }
362}
363
364mod syscall {
365    #![allow(non_camel_case_types, improper_ctypes)]
366    use std::os::unix::io::RawFd;
367    pub use zx::sys::{zx_handle_t, zx_signals_t};
368
369    // This is the "improper" c type
370    pub type fdio_t = ();
371
372    // From libfdio.so
373    extern "C" {
374        pub fn fdio_unsafe_fd_to_io(fd: RawFd) -> *const fdio_t;
375        pub fn fdio_unsafe_release(io: *const fdio_t);
376
377        pub fn fdio_unsafe_wait_begin(
378            io: *const fdio_t,
379            events: u32,
380            handle_out: &mut zx_handle_t,
381            signals_out: &mut zx_signals_t,
382        );
383
384        pub fn fdio_unsafe_wait_end(io: *const fdio_t, signals: zx_signals_t, events_out: &mut u32);
385    }
386}