fuchsia_async/net/fuchsia/
mod.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![allow(missing_docs)]
6
7mod tcp;
8pub use self::tcp::*;
9
10mod udp;
11pub use self::udp::*;
12
13use futures::io::{self, AsyncRead, AsyncWrite};
14use futures::ready;
15use futures::task::{AtomicWaker, Context};
16
17use std::convert::{AsMut, AsRef};
18use std::io::{Read, Write};
19use std::os::unix::io::{AsRawFd, RawFd};
20use std::pin::Pin;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::task::Poll;
23use std::{fmt, mem};
24
25use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
26
27const READABLE: usize = libc::EPOLLIN as usize;
28const WRITABLE: usize = libc::EPOLLOUT as usize;
29const ERROR: usize = libc::EPOLLERR as usize;
30const HUP: usize = libc::EPOLLHUP as usize;
31
32// Unsafe to use. `receive_packet` must not be called after
33// `fdio` is invalidated.
34pub(crate) struct EventedFdPacketReceiver {
35    fdio: *const syscall::fdio_t,
36    signals: AtomicUsize,
37    read_task: AtomicWaker,
38    write_task: AtomicWaker,
39}
40
41// Needed because of the fdio pointer.
42// It is safe to send because the `EventedFdPacketReceiver` must be
43// deregistered (and therefore `receive_packet` never called again)
44// before `fdio_unsafe_release` is called.
45unsafe impl Send for EventedFdPacketReceiver {}
46unsafe impl Sync for EventedFdPacketReceiver {}
47
48impl PacketReceiver for EventedFdPacketReceiver {
49    fn receive_packet(&self, packet: zx::Packet) {
50        let observed_signals = if let zx::PacketContents::SignalOne(p) = packet.contents() {
51            p.observed()
52        } else {
53            return;
54        };
55
56        let mut events: u32 = 0;
57        unsafe {
58            syscall::fdio_unsafe_wait_end(self.fdio, observed_signals.bits(), &mut events);
59        }
60        let events = events as usize;
61
62        let old = self.signals.fetch_or(events, Ordering::SeqCst);
63        let became_readable = ((events & READABLE) != 0) && ((old & READABLE) == 0);
64        let became_writable = ((events & WRITABLE) != 0) && ((old & WRITABLE) == 0);
65        let err_occurred = (events & (ERROR | HUP)) != 0;
66
67        if became_readable || err_occurred {
68            self.read_task.wake();
69        }
70        if became_writable || err_occurred {
71            self.write_task.wake();
72        }
73    }
74}
75
76/// A type which can be used for receiving IO events for a file descriptor.
77pub struct EventedFd<T> {
78    inner: T,
79    // Must be valid, acquired from `fdio_unsafe_fd_to_io`
80    fdio: *const syscall::fdio_t,
81    // Must be dropped before `fdio_unsafe_release` is called
82    signal_receiver: mem::ManuallyDrop<ReceiverRegistration<EventedFdPacketReceiver>>,
83}
84
85unsafe impl<T> Send for EventedFd<T> where T: Send {}
86unsafe impl<T> Sync for EventedFd<T> where T: Sync {}
87
88impl<T> Unpin for EventedFd<T> {}
89
90impl<T> Drop for EventedFd<T> {
91    fn drop(&mut self) {
92        unsafe {
93            // Drop the receiver so `packet_receive` may not be called again.
94            mem::ManuallyDrop::drop(&mut self.signal_receiver);
95
96            // Release the fdio
97            syscall::fdio_unsafe_release(self.fdio);
98        }
99
100        // Then `inner` gets dropped
101    }
102}
103
104impl<T: fmt::Debug> fmt::Debug for EventedFd<T> {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106        // FIXME(https://github.com/rust-lang/rust/issues/67364): This could be
107        // better written with `.finish_non_exhaustive()` once that feature is
108        // stablized.
109        f.debug_struct("EventedFd").field("inner", &self.inner).finish()
110    }
111}
112
113impl<T> AsRef<T> for EventedFd<T>
114where
115    T: AsRawFd,
116{
117    // Returns a reference to the underlying IO object.
118    fn as_ref(&self) -> &T {
119        &self.inner
120    }
121}
122
123impl<T> AsMut<T> for EventedFd<T>
124where
125    T: AsRawFd,
126{
127    // Returns a mutable reference to the underlying IO object.
128    fn as_mut(&mut self) -> &mut T {
129        &mut self.inner
130    }
131}
132
133impl<T> EventedFd<T>
134where
135    T: AsRawFd,
136{
137    /// Creates a new EventedFd.
138    ///
139    /// # Safety
140    ///
141    /// The raw file descriptor returned from `inner.as_raw_fd()` must not be
142    /// closed until the returned `EventedFd` is dropped.
143    pub unsafe fn new(inner: T) -> io::Result<Self> {
144        let fdio = unsafe { syscall::fdio_unsafe_fd_to_io(inner.as_raw_fd()) };
145        let signal_receiver = EHandle::local().register_receiver(EventedFdPacketReceiver {
146            fdio,
147            // Optimistically assume that the fd is readable and writable.
148            // Reads and writes will be attempted before queueing a packet.
149            // This makes fds slightly faster to read/write the first time
150            // they're accessed after being created, provided they start off as
151            // readable or writable. In return, there will be an extra wasted
152            // syscall per read/write if the fd is not readable or writable.
153            signals: AtomicUsize::new(READABLE | WRITABLE),
154            read_task: AtomicWaker::new(),
155            write_task: AtomicWaker::new(),
156        });
157
158        let evented_fd =
159            EventedFd { inner, fdio, signal_receiver: mem::ManuallyDrop::new(signal_receiver) };
160
161        // Make sure a packet is delivered if an error or closure occurs.
162        evented_fd.schedule_packet(ERROR | HUP);
163
164        // Need to schedule packets to maintain the invariant that
165        // if !READABLE or !WRITABLE a packet has been scheduled.
166        evented_fd.schedule_packet(READABLE);
167        evented_fd.schedule_packet(WRITABLE);
168
169        Ok(evented_fd)
170    }
171    /// Tests to see if this resource is ready to be read from.
172    /// If it is not, it arranges for the current task to receive a notification
173    /// when a "readable" signal arrives.
174    pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
175        let receiver = self.signal_receiver.receiver();
176        if (receiver.signals.load(Ordering::SeqCst) & (READABLE | ERROR | HUP)) != 0 {
177            Poll::Ready(Ok(()))
178        } else {
179            self.need_read(cx);
180            Poll::Pending
181        }
182    }
183
184    /// Tests to see if this resource is ready to be written to.
185    /// If it is not, it arranges for the current task to receive a notification
186    /// when a "writable" signal arrives.
187    pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
188        let receiver = self.signal_receiver.receiver();
189        if (receiver.signals.load(Ordering::SeqCst) & (WRITABLE | ERROR | HUP)) != 0 {
190            Poll::Ready(Ok(()))
191        } else {
192            self.need_write(cx);
193            Poll::Pending
194        }
195    }
196
197    /// Arranges for the current task to receive a notification when a "readable"
198    /// signal arrives.
199    pub fn need_read(&self, cx: &mut Context<'_>) {
200        let receiver = self.signal_receiver.receiver();
201        receiver.read_task.register(cx.waker());
202        let old = receiver.signals.fetch_and(!READABLE, Ordering::SeqCst);
203        // We only need to schedule a new packet if one isn't already scheduled.
204        // If READABLE was already false, a packet was already scheduled.
205        if (old & READABLE) != 0 {
206            self.schedule_packet(READABLE);
207        }
208    }
209
210    /// Arranges for the current task to receive a notification when a "writable"
211    /// signal arrives.
212    pub fn need_write(&self, cx: &mut Context<'_>) {
213        let receiver = self.signal_receiver.receiver();
214        receiver.write_task.register(cx.waker());
215        let old = receiver.signals.fetch_and(!WRITABLE, Ordering::SeqCst);
216        // We only need to schedule a new packet if one isn't already scheduled.
217        // If WRITABLE was already false, a packet was already scheduled.
218        if (old & WRITABLE) != 0 {
219            self.schedule_packet(WRITABLE);
220        }
221    }
222
223    fn schedule_packet(&self, signals: usize) {
224        unsafe {
225            let (mut raw_handle, mut raw_signals) = (0, 0);
226            syscall::fdio_unsafe_wait_begin(
227                self.fdio,
228                signals as u32,
229                &mut raw_handle,
230                &mut raw_signals,
231            );
232
233            let handle = zx::NullableHandle::from_raw(raw_handle);
234            let signals = zx::Signals::from_bits_truncate(raw_signals);
235
236            let res = handle.wait_async(
237                self.signal_receiver.port(),
238                self.signal_receiver.key(),
239                signals,
240                zx::WaitAsyncOpts::empty(),
241            );
242
243            // The handle is borrowed, so we cannot drop it.
244            mem::forget(handle);
245            res.expect("Error scheduling EventedFd notification");
246        }
247    }
248
249    /// Clears all incoming signals.
250    pub fn clear(&self) {
251        self.signal_receiver.receiver().signals.store(0, Ordering::SeqCst);
252    }
253}
254
255impl<T: AsRawFd> AsRawFd for EventedFd<T> {
256    fn as_raw_fd(&self) -> RawFd {
257        self.as_ref().as_raw_fd()
258    }
259}
260
261impl<T: AsRawFd + Read> AsyncRead for EventedFd<T> {
262    fn poll_read(
263        mut self: Pin<&mut Self>,
264        cx: &mut Context<'_>,
265        buf: &mut [u8],
266    ) -> Poll<Result<usize, io::Error>> {
267        ready!(EventedFd::poll_readable(&*self, cx))?;
268        let res = (*self).as_mut().read(buf);
269        if let Err(e) = &res
270            && e.kind() == io::ErrorKind::WouldBlock
271        {
272            self.need_read(cx);
273            return Poll::Pending;
274        }
275        Poll::Ready(res)
276    }
277
278    // TODO: override poll_vectored_read and call readv on the underlying handle
279}
280
281impl<T: AsRawFd + Write> AsyncWrite for EventedFd<T> {
282    fn poll_write(
283        mut self: Pin<&mut Self>,
284        cx: &mut Context<'_>,
285        buf: &[u8],
286    ) -> Poll<Result<usize, io::Error>> {
287        ready!(EventedFd::poll_writable(&*self, cx))?;
288        let res = (*self).as_mut().write(buf);
289        if let Err(e) = &res
290            && e.kind() == io::ErrorKind::WouldBlock
291        {
292            self.need_write(cx);
293            return Poll::Pending;
294        }
295        Poll::Ready(res)
296    }
297
298    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
299        Poll::Ready(Ok(()))
300    }
301
302    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
303        Poll::Ready(Ok(()))
304    }
305
306    // TODO: override poll_vectored_write and call writev on the underlying handle
307}
308
309impl<T> AsyncRead for &EventedFd<T>
310where
311    T: AsRawFd,
312    for<'b> &'b T: Read,
313{
314    fn poll_read(
315        self: Pin<&mut Self>,
316        cx: &mut Context<'_>,
317        buf: &mut [u8],
318    ) -> Poll<Result<usize, io::Error>> {
319        ready!(EventedFd::poll_readable(*self, cx))?;
320        let res = (*self).as_ref().read(buf);
321        if let Err(e) = &res
322            && e.kind() == io::ErrorKind::WouldBlock
323        {
324            self.need_read(cx);
325            return Poll::Pending;
326        }
327        Poll::Ready(res)
328    }
329}
330
331impl<T> AsyncWrite for &EventedFd<T>
332where
333    T: AsRawFd,
334    for<'b> &'b T: Write,
335{
336    fn poll_write(
337        self: Pin<&mut Self>,
338        cx: &mut Context<'_>,
339        buf: &[u8],
340    ) -> Poll<Result<usize, io::Error>> {
341        ready!(EventedFd::poll_writable(*self, cx))?;
342        let res = (*self).as_ref().write(buf);
343        if let Err(e) = &res
344            && e.kind() == io::ErrorKind::WouldBlock
345        {
346            self.need_write(cx);
347            return Poll::Pending;
348        }
349        Poll::Ready(res)
350    }
351
352    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
353        Poll::Ready(Ok(()))
354    }
355
356    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
357        Poll::Ready(Ok(()))
358    }
359}
360
361mod syscall {
362    #![allow(non_camel_case_types, improper_ctypes)]
363    use std::os::unix::io::RawFd;
364    pub use zx::sys::{zx_handle_t, zx_signals_t};
365
366    // This is the "improper" c type
367    pub type fdio_t = ();
368
369    // From libfdio.so
370    unsafe extern "C" {
371        pub fn fdio_unsafe_fd_to_io(fd: RawFd) -> *const fdio_t;
372        pub fn fdio_unsafe_release(io: *const fdio_t);
373
374        pub fn fdio_unsafe_wait_begin(
375            io: *const fdio_t,
376            events: u32,
377            handle_out: &mut zx_handle_t,
378            signals_out: &mut zx_signals_t,
379        );
380
381        pub fn fdio_unsafe_wait_end(io: *const fdio_t, signals: zx_signals_t, events_out: &mut u32);
382    }
383}