fuchsia_async/net/fuchsia/
mod.rs
1#![allow(missing_docs)]
6
7mod tcp;
8pub use self::tcp::*;
9
10mod udp;
11pub use self::udp::*;
12
13use futures::io::{self, AsyncRead, AsyncWrite};
14use futures::ready;
15use futures::task::{AtomicWaker, Context};
16use zx::{self as zx, AsHandleRef};
17
18use std::convert::{AsMut, AsRef};
19use std::io::{Read, Write};
20use std::os::unix::io::{AsRawFd, RawFd};
21use std::pin::Pin;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::Arc;
24use std::task::Poll;
25use std::{fmt, mem};
26
27use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
28
29const READABLE: usize = libc::EPOLLIN as usize;
30const WRITABLE: usize = libc::EPOLLOUT as usize;
31const ERROR: usize = libc::EPOLLERR as usize;
32const HUP: usize = libc::EPOLLHUP as usize;
33
34pub(crate) struct EventedFdPacketReceiver {
37 fdio: *const syscall::fdio_t,
38 signals: AtomicUsize,
39 read_task: AtomicWaker,
40 write_task: AtomicWaker,
41}
42
43unsafe impl Send for EventedFdPacketReceiver {}
48unsafe impl Sync for EventedFdPacketReceiver {}
49
50impl PacketReceiver for EventedFdPacketReceiver {
51 fn receive_packet(&self, packet: zx::Packet) {
52 let observed_signals = if let zx::PacketContents::SignalOne(p) = packet.contents() {
53 p.observed()
54 } else {
55 return;
56 };
57
58 let mut events: u32 = 0;
59 unsafe {
60 syscall::fdio_unsafe_wait_end(self.fdio, observed_signals.bits(), &mut events);
61 }
62 let events = events as usize;
63
64 let old = self.signals.fetch_or(events, Ordering::SeqCst);
65 let became_readable = ((events & READABLE) != 0) && ((old & READABLE) == 0);
66 let became_writable = ((events & WRITABLE) != 0) && ((old & WRITABLE) == 0);
67 let err_occurred = (events & (ERROR | HUP)) != 0;
68
69 if became_readable || err_occurred {
70 self.read_task.wake();
71 }
72 if became_writable || err_occurred {
73 self.write_task.wake();
74 }
75 }
76}
77
78pub struct EventedFd<T> {
80 inner: T,
81 fdio: *const syscall::fdio_t,
83 signal_receiver: mem::ManuallyDrop<ReceiverRegistration<EventedFdPacketReceiver>>,
85}
86
87unsafe impl<T> Send for EventedFd<T> where T: Send {}
88unsafe impl<T> Sync for EventedFd<T> where T: Sync {}
89
90impl<T> Unpin for EventedFd<T> {}
91
92impl<T> Drop for EventedFd<T> {
93 fn drop(&mut self) {
94 unsafe {
95 mem::ManuallyDrop::drop(&mut self.signal_receiver);
97
98 syscall::fdio_unsafe_release(self.fdio);
100 }
101
102 }
104}
105
106impl<T: fmt::Debug> fmt::Debug for EventedFd<T> {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 f.debug_struct("EventedFd").field("inner", &self.inner).finish()
112 }
113}
114
115impl<T> AsRef<T> for EventedFd<T>
116where
117 T: AsRawFd,
118{
119 fn as_ref(&self) -> &T {
121 &self.inner
122 }
123}
124
125impl<T> AsMut<T> for EventedFd<T>
126where
127 T: AsRawFd,
128{
129 fn as_mut(&mut self) -> &mut T {
131 &mut self.inner
132 }
133}
134
135impl<T> EventedFd<T>
136where
137 T: AsRawFd,
138{
139 pub unsafe fn new(inner: T) -> io::Result<Self> {
146 let fdio = syscall::fdio_unsafe_fd_to_io(inner.as_raw_fd());
147 let signal_receiver =
148 EHandle::local().register_receiver(Arc::new(EventedFdPacketReceiver {
149 fdio,
150 signals: AtomicUsize::new(READABLE | WRITABLE),
157 read_task: AtomicWaker::new(),
158 write_task: AtomicWaker::new(),
159 }));
160
161 let evented_fd =
162 EventedFd { inner, fdio, signal_receiver: mem::ManuallyDrop::new(signal_receiver) };
163
164 evented_fd.schedule_packet(ERROR | HUP);
166
167 evented_fd.schedule_packet(READABLE);
170 evented_fd.schedule_packet(WRITABLE);
171
172 Ok(evented_fd)
173 }
174 pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
178 let receiver = self.signal_receiver.receiver();
179 if (receiver.signals.load(Ordering::SeqCst) & (READABLE | ERROR | HUP)) != 0 {
180 Poll::Ready(Ok(()))
181 } else {
182 self.need_read(cx);
183 Poll::Pending
184 }
185 }
186
187 pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
191 let receiver = self.signal_receiver.receiver();
192 if (receiver.signals.load(Ordering::SeqCst) & (WRITABLE | ERROR | HUP)) != 0 {
193 Poll::Ready(Ok(()))
194 } else {
195 self.need_write(cx);
196 Poll::Pending
197 }
198 }
199
200 pub fn need_read(&self, cx: &mut Context<'_>) {
203 let receiver = self.signal_receiver.receiver();
204 receiver.read_task.register(cx.waker());
205 let old = receiver.signals.fetch_and(!READABLE, Ordering::SeqCst);
206 if (old & READABLE) != 0 {
209 self.schedule_packet(READABLE);
210 }
211 }
212
213 pub fn need_write(&self, cx: &mut Context<'_>) {
216 let receiver = self.signal_receiver.receiver();
217 receiver.write_task.register(cx.waker());
218 let old = receiver.signals.fetch_and(!WRITABLE, Ordering::SeqCst);
219 if (old & WRITABLE) != 0 {
222 self.schedule_packet(WRITABLE);
223 }
224 }
225
226 fn schedule_packet(&self, signals: usize) {
227 unsafe {
228 let (mut raw_handle, mut raw_signals) = (0, 0);
229 syscall::fdio_unsafe_wait_begin(
230 self.fdio,
231 signals as u32,
232 &mut raw_handle,
233 &mut raw_signals,
234 );
235
236 let handle = zx::Handle::from_raw(raw_handle);
237 let signals = zx::Signals::from_bits_truncate(raw_signals);
238
239 let res = handle.wait_async_handle(
240 self.signal_receiver.port(),
241 self.signal_receiver.key(),
242 signals,
243 zx::WaitAsyncOpts::empty(),
244 );
245
246 mem::forget(handle);
248 res.expect("Error scheduling EventedFd notification");
249 }
250 }
251
252 pub fn clear(&self) {
254 self.signal_receiver.receiver().signals.store(0, Ordering::SeqCst);
255 }
256}
257
258impl<T: AsRawFd> AsRawFd for EventedFd<T> {
259 fn as_raw_fd(&self) -> RawFd {
260 self.as_ref().as_raw_fd()
261 }
262}
263
264impl<T: AsRawFd + Read> AsyncRead for EventedFd<T> {
265 fn poll_read(
266 mut self: Pin<&mut Self>,
267 cx: &mut Context<'_>,
268 buf: &mut [u8],
269 ) -> Poll<Result<usize, io::Error>> {
270 ready!(EventedFd::poll_readable(&*self, cx))?;
271 let res = (*self).as_mut().read(buf);
272 if let Err(e) = &res {
273 if e.kind() == io::ErrorKind::WouldBlock {
274 self.need_read(cx);
275 return Poll::Pending;
276 }
277 }
278 Poll::Ready(res)
279 }
280
281 }
283
284impl<T: AsRawFd + Write> AsyncWrite for EventedFd<T> {
285 fn poll_write(
286 mut self: Pin<&mut Self>,
287 cx: &mut Context<'_>,
288 buf: &[u8],
289 ) -> Poll<Result<usize, io::Error>> {
290 ready!(EventedFd::poll_writable(&*self, cx))?;
291 let res = (*self).as_mut().write(buf);
292 if let Err(e) = &res {
293 if e.kind() == io::ErrorKind::WouldBlock {
294 self.need_write(cx);
295 return Poll::Pending;
296 }
297 }
298 Poll::Ready(res)
299 }
300
301 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
302 Poll::Ready(Ok(()))
303 }
304
305 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
306 Poll::Ready(Ok(()))
307 }
308
309 }
311
312impl<T> AsyncRead for &EventedFd<T>
313where
314 T: AsRawFd,
315 for<'b> &'b T: Read,
316{
317 fn poll_read(
318 self: Pin<&mut Self>,
319 cx: &mut Context<'_>,
320 buf: &mut [u8],
321 ) -> Poll<Result<usize, io::Error>> {
322 ready!(EventedFd::poll_readable(*self, cx))?;
323 let res = (*self).as_ref().read(buf);
324 if let Err(e) = &res {
325 if e.kind() == io::ErrorKind::WouldBlock {
326 self.need_read(cx);
327 return Poll::Pending;
328 }
329 }
330 Poll::Ready(res)
331 }
332}
333
334impl<T> AsyncWrite for &EventedFd<T>
335where
336 T: AsRawFd,
337 for<'b> &'b T: Write,
338{
339 fn poll_write(
340 self: Pin<&mut Self>,
341 cx: &mut Context<'_>,
342 buf: &[u8],
343 ) -> Poll<Result<usize, io::Error>> {
344 ready!(EventedFd::poll_writable(*self, cx))?;
345 let res = (*self).as_ref().write(buf);
346 if let Err(e) = &res {
347 if e.kind() == io::ErrorKind::WouldBlock {
348 self.need_write(cx);
349 return Poll::Pending;
350 }
351 }
352 Poll::Ready(res)
353 }
354
355 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
356 Poll::Ready(Ok(()))
357 }
358
359 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
360 Poll::Ready(Ok(()))
361 }
362}
363
364mod syscall {
365 #![allow(non_camel_case_types, improper_ctypes)]
366 use std::os::unix::io::RawFd;
367 pub use zx::sys::{zx_handle_t, zx_signals_t};
368
369 pub type fdio_t = ();
371
372 extern "C" {
374 pub fn fdio_unsafe_fd_to_io(fd: RawFd) -> *const fdio_t;
375 pub fn fdio_unsafe_release(io: *const fdio_t);
376
377 pub fn fdio_unsafe_wait_begin(
378 io: *const fdio_t,
379 events: u32,
380 handle_out: &mut zx_handle_t,
381 signals_out: &mut zx_signals_t,
382 );
383
384 pub fn fdio_unsafe_wait_end(io: *const fdio_t, signals: zx_signals_t, events_out: &mut u32);
385 }
386}