fuchsia_async/net/fuchsia/
mod.rs1#![allow(missing_docs)]
6
7mod tcp;
8pub use self::tcp::*;
9
10mod udp;
11pub use self::udp::*;
12
13use futures::io::{self, AsyncRead, AsyncWrite};
14use futures::ready;
15use futures::task::{AtomicWaker, Context};
16
17use std::convert::{AsMut, AsRef};
18use std::io::{Read, Write};
19use std::os::unix::io::{AsRawFd, RawFd};
20use std::pin::Pin;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::task::Poll;
23use std::{fmt, mem};
24
25use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
26
27const READABLE: usize = libc::EPOLLIN as usize;
28const WRITABLE: usize = libc::EPOLLOUT as usize;
29const ERROR: usize = libc::EPOLLERR as usize;
30const HUP: usize = libc::EPOLLHUP as usize;
31
32pub(crate) struct EventedFdPacketReceiver {
35 fdio: *const syscall::fdio_t,
36 signals: AtomicUsize,
37 read_task: AtomicWaker,
38 write_task: AtomicWaker,
39}
40
41unsafe impl Send for EventedFdPacketReceiver {}
46unsafe impl Sync for EventedFdPacketReceiver {}
47
48impl PacketReceiver for EventedFdPacketReceiver {
49 fn receive_packet(&self, packet: zx::Packet) {
50 let observed_signals = if let zx::PacketContents::SignalOne(p) = packet.contents() {
51 p.observed()
52 } else {
53 return;
54 };
55
56 let mut events: u32 = 0;
57 unsafe {
58 syscall::fdio_unsafe_wait_end(self.fdio, observed_signals.bits(), &mut events);
59 }
60 let events = events as usize;
61
62 let old = self.signals.fetch_or(events, Ordering::SeqCst);
63 let became_readable = ((events & READABLE) != 0) && ((old & READABLE) == 0);
64 let became_writable = ((events & WRITABLE) != 0) && ((old & WRITABLE) == 0);
65 let err_occurred = (events & (ERROR | HUP)) != 0;
66
67 if became_readable || err_occurred {
68 self.read_task.wake();
69 }
70 if became_writable || err_occurred {
71 self.write_task.wake();
72 }
73 }
74}
75
76pub struct EventedFd<T> {
78 inner: T,
79 fdio: *const syscall::fdio_t,
81 signal_receiver: mem::ManuallyDrop<ReceiverRegistration<EventedFdPacketReceiver>>,
83}
84
85unsafe impl<T> Send for EventedFd<T> where T: Send {}
86unsafe impl<T> Sync for EventedFd<T> where T: Sync {}
87
88impl<T> Unpin for EventedFd<T> {}
89
90impl<T> Drop for EventedFd<T> {
91 fn drop(&mut self) {
92 unsafe {
93 mem::ManuallyDrop::drop(&mut self.signal_receiver);
95
96 syscall::fdio_unsafe_release(self.fdio);
98 }
99
100 }
102}
103
104impl<T: fmt::Debug> fmt::Debug for EventedFd<T> {
105 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106 f.debug_struct("EventedFd").field("inner", &self.inner).finish()
110 }
111}
112
113impl<T> AsRef<T> for EventedFd<T>
114where
115 T: AsRawFd,
116{
117 fn as_ref(&self) -> &T {
119 &self.inner
120 }
121}
122
123impl<T> AsMut<T> for EventedFd<T>
124where
125 T: AsRawFd,
126{
127 fn as_mut(&mut self) -> &mut T {
129 &mut self.inner
130 }
131}
132
133impl<T> EventedFd<T>
134where
135 T: AsRawFd,
136{
137 pub unsafe fn new(inner: T) -> io::Result<Self> {
144 let fdio = unsafe { syscall::fdio_unsafe_fd_to_io(inner.as_raw_fd()) };
145 let signal_receiver = EHandle::local().register_receiver(EventedFdPacketReceiver {
146 fdio,
147 signals: AtomicUsize::new(READABLE | WRITABLE),
154 read_task: AtomicWaker::new(),
155 write_task: AtomicWaker::new(),
156 });
157
158 let evented_fd =
159 EventedFd { inner, fdio, signal_receiver: mem::ManuallyDrop::new(signal_receiver) };
160
161 evented_fd.schedule_packet(ERROR | HUP);
163
164 evented_fd.schedule_packet(READABLE);
167 evented_fd.schedule_packet(WRITABLE);
168
169 Ok(evented_fd)
170 }
171 pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
175 let receiver = self.signal_receiver.receiver();
176 if (receiver.signals.load(Ordering::SeqCst) & (READABLE | ERROR | HUP)) != 0 {
177 Poll::Ready(Ok(()))
178 } else {
179 self.need_read(cx);
180 Poll::Pending
181 }
182 }
183
184 pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
188 let receiver = self.signal_receiver.receiver();
189 if (receiver.signals.load(Ordering::SeqCst) & (WRITABLE | ERROR | HUP)) != 0 {
190 Poll::Ready(Ok(()))
191 } else {
192 self.need_write(cx);
193 Poll::Pending
194 }
195 }
196
197 pub fn need_read(&self, cx: &mut Context<'_>) {
200 let receiver = self.signal_receiver.receiver();
201 receiver.read_task.register(cx.waker());
202 let old = receiver.signals.fetch_and(!READABLE, Ordering::SeqCst);
203 if (old & READABLE) != 0 {
206 self.schedule_packet(READABLE);
207 }
208 }
209
210 pub fn need_write(&self, cx: &mut Context<'_>) {
213 let receiver = self.signal_receiver.receiver();
214 receiver.write_task.register(cx.waker());
215 let old = receiver.signals.fetch_and(!WRITABLE, Ordering::SeqCst);
216 if (old & WRITABLE) != 0 {
219 self.schedule_packet(WRITABLE);
220 }
221 }
222
223 fn schedule_packet(&self, signals: usize) {
224 unsafe {
225 let (mut raw_handle, mut raw_signals) = (0, 0);
226 syscall::fdio_unsafe_wait_begin(
227 self.fdio,
228 signals as u32,
229 &mut raw_handle,
230 &mut raw_signals,
231 );
232
233 let handle = zx::NullableHandle::from_raw(raw_handle);
234 let signals = zx::Signals::from_bits_truncate(raw_signals);
235
236 let res = handle.wait_async(
237 self.signal_receiver.port(),
238 self.signal_receiver.key(),
239 signals,
240 zx::WaitAsyncOpts::empty(),
241 );
242
243 mem::forget(handle);
245 res.expect("Error scheduling EventedFd notification");
246 }
247 }
248
249 pub fn clear(&self) {
251 self.signal_receiver.receiver().signals.store(0, Ordering::SeqCst);
252 }
253}
254
255impl<T: AsRawFd> AsRawFd for EventedFd<T> {
256 fn as_raw_fd(&self) -> RawFd {
257 self.as_ref().as_raw_fd()
258 }
259}
260
261impl<T: AsRawFd + Read> AsyncRead for EventedFd<T> {
262 fn poll_read(
263 mut self: Pin<&mut Self>,
264 cx: &mut Context<'_>,
265 buf: &mut [u8],
266 ) -> Poll<Result<usize, io::Error>> {
267 ready!(EventedFd::poll_readable(&*self, cx))?;
268 let res = (*self).as_mut().read(buf);
269 if let Err(e) = &res
270 && e.kind() == io::ErrorKind::WouldBlock
271 {
272 self.need_read(cx);
273 return Poll::Pending;
274 }
275 Poll::Ready(res)
276 }
277
278 }
280
281impl<T: AsRawFd + Write> AsyncWrite for EventedFd<T> {
282 fn poll_write(
283 mut self: Pin<&mut Self>,
284 cx: &mut Context<'_>,
285 buf: &[u8],
286 ) -> Poll<Result<usize, io::Error>> {
287 ready!(EventedFd::poll_writable(&*self, cx))?;
288 let res = (*self).as_mut().write(buf);
289 if let Err(e) = &res
290 && e.kind() == io::ErrorKind::WouldBlock
291 {
292 self.need_write(cx);
293 return Poll::Pending;
294 }
295 Poll::Ready(res)
296 }
297
298 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
299 Poll::Ready(Ok(()))
300 }
301
302 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
303 Poll::Ready(Ok(()))
304 }
305
306 }
308
309impl<T> AsyncRead for &EventedFd<T>
310where
311 T: AsRawFd,
312 for<'b> &'b T: Read,
313{
314 fn poll_read(
315 self: Pin<&mut Self>,
316 cx: &mut Context<'_>,
317 buf: &mut [u8],
318 ) -> Poll<Result<usize, io::Error>> {
319 ready!(EventedFd::poll_readable(*self, cx))?;
320 let res = (*self).as_ref().read(buf);
321 if let Err(e) = &res
322 && e.kind() == io::ErrorKind::WouldBlock
323 {
324 self.need_read(cx);
325 return Poll::Pending;
326 }
327 Poll::Ready(res)
328 }
329}
330
331impl<T> AsyncWrite for &EventedFd<T>
332where
333 T: AsRawFd,
334 for<'b> &'b T: Write,
335{
336 fn poll_write(
337 self: Pin<&mut Self>,
338 cx: &mut Context<'_>,
339 buf: &[u8],
340 ) -> Poll<Result<usize, io::Error>> {
341 ready!(EventedFd::poll_writable(*self, cx))?;
342 let res = (*self).as_ref().write(buf);
343 if let Err(e) = &res
344 && e.kind() == io::ErrorKind::WouldBlock
345 {
346 self.need_write(cx);
347 return Poll::Pending;
348 }
349 Poll::Ready(res)
350 }
351
352 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
353 Poll::Ready(Ok(()))
354 }
355
356 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
357 Poll::Ready(Ok(()))
358 }
359}
360
361mod syscall {
362 #![allow(non_camel_case_types, improper_ctypes)]
363 use std::os::unix::io::RawFd;
364 pub use zx::sys::{zx_handle_t, zx_signals_t};
365
366 pub type fdio_t = ();
368
369 unsafe extern "C" {
371 pub fn fdio_unsafe_fd_to_io(fd: RawFd) -> *const fdio_t;
372 pub fn fdio_unsafe_release(io: *const fdio_t);
373
374 pub fn fdio_unsafe_wait_begin(
375 io: *const fdio_t,
376 events: u32,
377 handle_out: &mut zx_handle_t,
378 signals_out: &mut zx_signals_t,
379 );
380
381 pub fn fdio_unsafe_wait_end(io: *const fdio_t, signals: zx_signals_t, events_out: &mut u32);
382 }
383}