Skip to main content

starnix_core/vfs/
epoll.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::power::WakeupSourceOrigin;
6use crate::task::{
7    CurrentTask, EventHandler, ReadyItem, ReadyItemKey, WaitCanceler, WaitQueue, Waiter,
8};
9use crate::vfs::{
10    Anon, FileHandle, FileObject, FileObjectState, FileOps, WeakFileHandle, fileops_impl_dataless,
11    fileops_impl_nonseekable, fileops_impl_noop_sync,
12};
13use itertools::Itertools;
14use starnix_logging::log_warn;
15use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex};
16use starnix_uapi::error;
17use starnix_uapi::errors::{EINTR, ETIMEDOUT, Errno};
18use starnix_uapi::open_flags::OpenFlags;
19use starnix_uapi::vfs::{EpollEvent, FdEvents};
20use std::collections::hash_map::Entry;
21use std::collections::{HashMap, VecDeque};
22use std::sync::Arc;
23
24/// Maximum depth of epoll instances monitoring one another.
25/// From https://man7.org/linux/man-pages/man2/epoll_ctl.2.html
26const MAX_NESTED_DEPTH: u32 = 5;
27
28/// WaitObject represents a FileHandle that is being waited upon.
29/// The `data` field is a user defined quantity passed in
30/// via `sys_epoll_ctl`. Typically C programs could use this
31/// to store a pointer to the data that needs to be processed
32/// after an event.
33struct WaitObject {
34    target: WeakFileHandle,
35    events: FdEvents,
36    data: u64,
37    wait_canceler: Option<WaitCanceler>,
38}
39
40impl WaitObject {
41    /// Returns the target `FileHandle` of the `WaitObject`, or `None` if the file has been closed.
42    ///
43    /// It is fine for the `FileHandle` to be closed after being added to an epoll, and subsequent
44    /// epoll_waits end up timing out (importantly not returning EBADF).
45    fn target(&self) -> Option<FileHandle> {
46        self.target.upgrade()
47    }
48}
49
50/// EpollKey acts as an key to a map of WaitObject.
51/// In reality it is a pointer to a FileHandle object.
52pub type EpollKey = usize;
53
54pub fn wakeup_source_name_for_epoll(current_task: &CurrentTask, key: EpollKey) -> String {
55    format!("[{}] {}", current_task.command(), key)
56}
57
58/// EpollFileObject represents the FileObject used to
59/// implement epoll_create1/epoll_ctl/epoll_pwait.
60#[derive(Default)]
61pub struct EpollFileObject {
62    waiter: Waiter,
63    /// Mutable state of this epoll object.
64    state: Mutex<EpollState>,
65    waitable_state: Arc<Mutex<EpollWaitableState>>,
66}
67
68#[derive(Default)]
69struct EpollState {
70    /// Any file tracked by this epoll instance
71    /// will exist as a key in `wait_objects`.
72    wait_objects: HashMap<ReadyItemKey, WaitObject>,
73    /// processing_list is a FIFO of events that are being
74    /// processed.
75    ///
76    /// Objects from the `EpollFileObject`'s `trigger_list` are moved into this
77    /// list so that we can handle triggered events without holding its lock
78    /// longer than we need to. This reduces contention with waited-on objects
79    /// that tries to notify this epoll object on subscribed events.
80    processing_list: VecDeque<ReadyItem>,
81    /// recheck_list is the list of items that need to have query_events checked
82    /// at the start of the next EpollFileObject::wait call. This is only items
83    /// that were returned from the last wait call, because those are the only
84    /// ones that might need to be returned even if no events come in between
85    /// wait calls.
86    recheck_list: Vec<ReadyItemKey>,
87}
88
89#[derive(Default)]
90struct EpollWaitableState {
91    /// trigger_list is a FIFO of events that have
92    /// happened, but have not yet been processed.
93    trigger_list: VecDeque<ReadyItem>,
94    /// A list of waiters waiting for events from this
95    /// epoll instance.
96    waiters: WaitQueue,
97}
98
99impl EpollFileObject {
100    /// Allocate a new, empty epoll object.
101    pub fn new_file<L>(locked: &mut Locked<L>, current_task: &CurrentTask) -> FileHandle
102    where
103        L: LockEqualOrBefore<FileOpsCore>,
104    {
105        let epoll = Box::new(EpollFileObject::default());
106
107        #[cfg(any(test, debug_assertions))]
108        {
109            let _l1 = epoll.state.lock();
110            let _l2 = epoll.waitable_state.lock();
111        }
112
113        Anon::new_private_file(locked, current_task, epoll, OpenFlags::RDWR, "[eventpoll]")
114    }
115
116    fn wait_on_file<L>(
117        &self,
118        locked: &mut Locked<L>,
119        current_task: &CurrentTask,
120        key: ReadyItemKey,
121        wait_object: &mut WaitObject,
122    ) -> Result<(), Errno>
123    where
124        L: LockEqualOrBefore<FileOpsCore>,
125    {
126        // First start the wait. If an event happens after this, we'll get it.
127        self.wait_on_file_edge_triggered(locked, current_task, key, wait_object)?;
128
129        self.do_recheck(locked, current_task, wait_object, key)?;
130
131        Ok(())
132    }
133
134    fn do_recheck<L>(
135        &self,
136        locked: &mut Locked<L>,
137        current_task: &CurrentTask,
138        wait_object: &mut WaitObject,
139        key: ReadyItemKey,
140    ) -> Result<(), Errno>
141    where
142        L: LockEqualOrBefore<FileOpsCore>,
143    {
144        let Some(target) = wait_object.target() else { return Ok(()) };
145        let events = target.query_events(locked, current_task)?;
146        if !(events & wait_object.events).is_empty() {
147            self.waiter.wake_immediately(events, self.new_wait_handler(key));
148            if let Some(wait_canceler) = wait_object.wait_canceler.take() {
149                wait_canceler.cancel();
150            } else {
151                log_warn!("wait canceler should have been set by `wait_on_file_edge_triggered`");
152            }
153        }
154        Ok(())
155    }
156
157    fn wait_on_file_edge_triggered<L>(
158        &self,
159        locked: &mut Locked<L>,
160        current_task: &CurrentTask,
161        key: ReadyItemKey,
162        wait_object: &mut WaitObject,
163    ) -> Result<(), Errno>
164    where
165        L: LockEqualOrBefore<FileOpsCore>,
166    {
167        let Some(target) = wait_object.target() else {
168            return Ok(());
169        };
170
171        wait_object.wait_canceler = target.wait_async(
172            locked,
173            current_task,
174            &self.waiter,
175            wait_object.events,
176            self.new_wait_handler(key),
177        );
178        if wait_object.wait_canceler.is_none() {
179            return error!(EPERM);
180        }
181        Ok(())
182    }
183
184    /// Checks if adding self to the `epoll_file_object` at `epoll_file_handle` would cause a loop
185    /// or exceed max depth.
186    fn check_eloop(&self, parent: &FileHandle, depth_left: u32) -> Result<(), Errno> {
187        if depth_left == 0 {
188            return error!(ELOOP);
189        }
190
191        let state = self.state.lock();
192        for nested_object in state.wait_objects.values() {
193            let Some(child) = nested_object.target() else {
194                continue;
195            };
196            let Some(child_file) = child.downcast_file::<EpollFileObject>() else {
197                continue;
198            };
199
200            if Arc::ptr_eq(&child, parent) {
201                return error!(ELOOP);
202            }
203            child_file.check_eloop(parent, depth_left - 1)?;
204        }
205
206        Ok(())
207    }
208
209    /// Asynchronously wait on certain events happening on a FileHandle.
210    pub fn add<L>(
211        &self,
212        locked: &mut Locked<L>,
213        current_task: &CurrentTask,
214        file: &FileHandle,
215        epoll_file_handle: &FileHandle,
216        epoll_event: EpollEvent,
217    ) -> Result<(), Errno>
218    where
219        L: LockEqualOrBefore<FileOpsCore>,
220    {
221        // Check if adding this file would cause a cycle at a max depth of 5.
222        if let Some(epoll_to_add) = file.downcast_file::<EpollFileObject>() {
223            // We need to check for `MAX_NESTED_DEPTH - 1` because adding `epoll_to_add` to self
224            // would result in a total depth of one more.
225            epoll_to_add.check_eloop(epoll_file_handle, MAX_NESTED_DEPTH - 1)?;
226        }
227
228        let mut state = self.state.lock();
229        let key = file.id.as_epoll_key().into();
230        match state.wait_objects.entry(key) {
231            Entry::Occupied(_) => error!(EEXIST),
232            Entry::Vacant(entry) => {
233                let wait_object = entry.insert(WaitObject {
234                    target: Arc::downgrade(file),
235                    events: epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR,
236                    data: epoll_event.data(),
237                    wait_canceler: None,
238                });
239                self.wait_on_file(locked, current_task, key, wait_object)
240            }
241        }
242    }
243
244    /// Modify the events we are looking for on a Filehandle.
245    pub fn modify<L>(
246        &self,
247        locked: &mut Locked<L>,
248        current_task: &CurrentTask,
249        file: &FileHandle,
250        epoll_event: EpollEvent,
251    ) -> Result<(), Errno>
252    where
253        L: LockEqualOrBefore<FileOpsCore>,
254    {
255        let mut state = self.state.lock();
256        let key = file.id.as_epoll_key();
257        state.recheck_list.retain(|x| *x != key.into());
258        let Some(wait_object) = state.wait_objects.get_mut(&key.into()) else {
259            return error!(ENOENT);
260        };
261        if let Some(wait_canceler) = wait_object.wait_canceler.take() {
262            wait_canceler.cancel();
263        }
264        wait_object.events = epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR;
265        wait_object.data = epoll_event.data();
266        // If the new epoll event doesn't include EPOLLWAKEUP, we need to take down the
267        // wake lease. This ensures that the system doesn't stay awake unnecessarily when
268        // the event no longer requires it to be awake.
269        if wait_object.events.contains(FdEvents::EPOLLWAKEUP)
270            && !epoll_event.events().contains(FdEvents::EPOLLWAKEUP)
271        {
272            current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
273                &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key)),
274            );
275        }
276        self.wait_on_file(locked, current_task, key.into(), wait_object)
277    }
278
279    /// Cancel an asynchronous wait on an object. Events triggered before
280    /// calling this will still be delivered.
281    pub fn delete(&self, current_task: &CurrentTask, file: &FileObject) -> Result<(), Errno> {
282        let mut state = self.state.lock();
283        let key = file.id.as_epoll_key().into();
284        let ReadyItemKey::Usize(key_usize) = key else {
285            // This should never happen as we only use Usize keys for files in epoll.
286            return error!(EINVAL);
287        };
288        if let Some(mut wait_object) = state.wait_objects.remove(&key) {
289            if let Some(wait_canceler) = wait_object.wait_canceler.take() {
290                wait_canceler.cancel();
291            }
292            state.recheck_list.retain(|x| *x != key);
293            // Deactivate the wake lock if it was active.
294            current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
295                &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key_usize)),
296            );
297            Ok(())
298        } else {
299            error!(ENOENT)
300        }
301    }
302
303    /// Stores events from the Epoll's trigger list to the parameter `pending_list`. This does not
304    /// actually invoke the waiter which is how items are added to the trigger list. The caller
305    /// will have to do that before calling if needed.
306    ///
307    /// If an event in the trigger list is stale, the event will be re-added to the waiter.
308    ///
309    /// Returns true if any events were added. False means there was nothing in the trigger list.
310    fn process_triggered_events<L>(
311        &self,
312        locked: &mut Locked<L>,
313        current_task: &CurrentTask,
314        pending_list: &mut Vec<ReadyItem>,
315        max_events: usize,
316    ) -> Result<(), Errno>
317    where
318        L: LockEqualOrBefore<FileOpsCore>,
319    {
320        let mut state = self.state.lock();
321        // Move all the elements from `self.trigger_list` to this intermediary
322        // queue that we handle events from. This reduces the time spent holding
323        // `self.trigger_list`'s lock which reduces contention with objects that
324        // this epoll object has subscribed for notifications from.
325        state.processing_list.append(&mut self.waitable_state.lock().trigger_list);
326        while pending_list.len() < max_events && !state.processing_list.is_empty() {
327            if let Some(pending) = state.processing_list.pop_front() {
328                if let Some(wait) = state.wait_objects.get_mut(&pending.key) {
329                    // The weak pointer to the FileObject target can be gone if the file was closed
330                    // out from under us. If this happens it is not an error: ignore it and
331                    // continue.
332                    if let Some(target) = wait.target.upgrade() {
333                        let ready = ReadyItem {
334                            key: pending.key,
335                            events: target.query_events(locked, current_task)?,
336                        };
337                        if ready.events.intersects(wait.events) {
338                            pending_list.push(ready);
339                        } else {
340                            // Another thread already handled this event, wait for another one.
341                            self.wait_on_file(locked, current_task, pending.key, wait)?;
342                        }
343                    }
344                }
345            }
346        }
347        Ok(())
348    }
349
350    /// Waits until an event exists in `pending_list` or until `timeout` has
351    /// been reached.
352    fn wait_until_pending_event<L>(
353        &self,
354        locked: &mut Locked<L>,
355        current_task: &CurrentTask,
356        max_events: usize,
357        mut wait_deadline: zx::MonotonicInstant,
358    ) -> Result<Vec<ReadyItem>, Errno>
359    where
360        L: LockEqualOrBefore<FileOpsCore>,
361    {
362        let mut pending_list = Vec::new();
363
364        loop {
365            self.process_triggered_events(locked, current_task, &mut pending_list, max_events)?;
366
367            if pending_list.len() == max_events {
368                break; // No input events or output list full, nothing more we can do.
369            }
370
371            if !pending_list.is_empty() {
372                // We now know we have at least one event to return. We shouldn't return
373                // immediately, in case there are more events available, but the next loop should
374                // wait with a 0 timeout to prevent further blocking.
375                wait_deadline = zx::MonotonicInstant::ZERO;
376            }
377
378            // Loop back to check if there are more items in the Waiter's queue. Every wait_until()
379            // call will process a single event. In order to drain as many events as we can that
380            // are synchronously available, keep trying until it reports empty.
381            //
382            // The handlers in the waits cause items to be appended to trigger_list. See the closure
383            // in `wait_on_file` to see how this happens.
384            //
385            // This wait may return EINTR for nonzero timeouts which is not an error. We must be
386            // careful not to lose events if this happens.
387            //
388            // The first time through this loop we'll use the timeout passed into this function so
389            // can get EINTR. But since we haven't done anything or accumulated any results yet it's
390            // OK to immediately return and no information will be lost.
391            match self.waiter.wait_until(locked, current_task, wait_deadline) {
392                Err(err) if err == ETIMEDOUT => break,
393                Err(err) if err == EINTR => {
394                    // Terminating early will lose any events in the pending_list so that should
395                    // only be for unrecoverable errors (not EINTR). The only time there should be a
396                    // nonzero wait_deadline (and hence the ability to encounter EINTR) is when the
397                    // pending list is empty.
398                    debug_assert!(
399                        pending_list.is_empty(),
400                        "Got EINTR from wait of {}ns with {} items pending.",
401                        wait_deadline.into_nanos(),
402                        pending_list.len()
403                    );
404                    return Err(err);
405                }
406                // TODO check if this is supposed to actually fail!
407                result => result?,
408            }
409        }
410
411        Ok(pending_list)
412    }
413
414    /// Blocking wait on all waited upon events with a timeout.
415    pub fn wait<L>(
416        &self,
417        locked: &mut Locked<L>,
418        current_task: &CurrentTask,
419        max_events: usize,
420        deadline: zx::MonotonicInstant,
421    ) -> Result<Vec<EpollEvent>, Errno>
422    where
423        L: LockEqualOrBefore<FileOpsCore>,
424    {
425        {
426            let mut state = self.state.lock();
427            let recheck_list = std::mem::take(&mut state.recheck_list);
428            for key in recheck_list {
429                if let ReadyItemKey::Usize(key) = key {
430                    current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
431                        &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key)),
432                    );
433                }
434                let wait_object = state.wait_objects.get_mut(&key).unwrap();
435                self.do_recheck(locked, current_task, wait_object, key)?;
436            }
437        }
438
439        let pending_list =
440            self.wait_until_pending_event(locked, current_task, max_events, deadline)?;
441
442        // Process the pending list and add processed ReadyItem
443        // entries to the rearm_list for the next wait.
444        let mut result = vec![];
445        let mut state = self.state.lock();
446        let state = &mut *state;
447        for pending_event in pending_list.iter().unique_by(|e| e.key) {
448            // The wait could have been deleted by here,
449            // so ignore the None case.
450            let Some(wait) = state.wait_objects.get_mut(&pending_event.key) else { continue };
451
452            let reported_events = pending_event.events & wait.events;
453            result.push(EpollEvent::new(reported_events, wait.data));
454
455            // Files marked with `EPOLLONESHOT` should only notify
456            // once and need to be rearmed manually with epoll_ctl_mod().
457            if wait.events.contains(FdEvents::EPOLLONESHOT) {
458                continue;
459            }
460
461            self.wait_on_file_edge_triggered(locked, current_task, pending_event.key, wait)?;
462
463            if !wait.events.contains(FdEvents::EPOLLET) {
464                state.recheck_list.push(pending_event.key);
465            }
466
467            // TODO: is this really only supposed to happen for level-triggered events?
468            if !wait.events.contains(FdEvents::EPOLLET) {
469                // When this is the first time epoll_wait on this epoll fd, create and
470                // hold a wake lease until the next epoll_wait.
471                if wait.events.contains(FdEvents::EPOLLWAKEUP) {
472                    if let ReadyItemKey::Usize(key) = pending_event.key {
473                        current_task.kernel().suspend_resume_manager.activate_wakeup_source(
474                            WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(
475                                current_task,
476                                key,
477                            )),
478                        );
479                    }
480                }
481            }
482        }
483
484        Ok(result)
485    }
486}
487
488impl FileOps for EpollFileObject {
489    fileops_impl_nonseekable!();
490    fileops_impl_noop_sync!();
491    fileops_impl_dataless!();
492
493    fn wait_async(
494        &self,
495        _locked: &mut Locked<FileOpsCore>,
496        _file: &FileObject,
497        _current_task: &CurrentTask,
498        waiter: &Waiter,
499        events: FdEvents,
500        handler: EventHandler,
501    ) -> Option<WaitCanceler> {
502        Some(self.waitable_state.lock().waiters.wait_async_fd_events(waiter, events, handler))
503    }
504
505    fn query_events(
506        &self,
507        locked: &mut Locked<FileOpsCore>,
508        _file: &FileObject,
509        current_task: &CurrentTask,
510    ) -> Result<FdEvents, Errno> {
511        let mut events = FdEvents::empty();
512        let state = self.state.lock();
513        if !state.processing_list.is_empty() || !self.waitable_state.lock().trigger_list.is_empty()
514        {
515            events |= FdEvents::POLLIN;
516        } else {
517            for key in &state.recheck_list {
518                let wait_object = state.wait_objects.get(&key).unwrap();
519                let Some(target) = wait_object.target() else { continue };
520                if !(target.query_events(locked, current_task)? & wait_object.events).is_empty() {
521                    events |= FdEvents::POLLIN;
522                    break;
523                }
524            }
525        }
526        Ok(events)
527    }
528
529    fn close(
530        self: Box<Self>,
531        _locked: &mut Locked<FileOpsCore>,
532        _file: &FileObjectState,
533        current_task: &CurrentTask,
534    ) {
535        let guard = self.state.lock();
536        for (key, _wait_object) in guard.wait_objects.iter() {
537            if let ReadyItemKey::Usize(key) = key {
538                current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
539                    &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, *key)),
540                );
541            }
542        }
543    }
544}
545
546#[derive(Clone)]
547pub struct EpollEventHandler {
548    key: ReadyItemKey,
549    waitable_state: Arc<Mutex<EpollWaitableState>>,
550}
551
552impl EpollEventHandler {
553    pub fn handle(self, events: FdEvents) {
554        let mut waitable_state = self.waitable_state.lock();
555        waitable_state.trigger_list.push_back(ReadyItem { key: self.key, events });
556        waitable_state.waiters.notify_fd_events(FdEvents::POLLIN);
557    }
558}
559
560impl EpollFileObject {
561    fn new_wait_handler(&self, key: ReadyItemKey) -> EventHandler {
562        EventHandler::Epoll(EpollEventHandler {
563            key,
564            waitable_state: Arc::clone(&self.waitable_state),
565        })
566    }
567}
568
569#[cfg(test)]
570mod tests {
571    use super::*;
572    use crate::fs::fuchsia::create_fuchsia_pipe;
573    use crate::task::Waiter;
574    use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
575    use crate::testing::spawn_kernel_and_run;
576    use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
577    use crate::vfs::eventfd::{EventFdType, new_eventfd};
578    use crate::vfs::fs_registry::FsRegistry;
579    use crate::vfs::pipe::{new_pipe, register_pipe_fs};
580    use crate::vfs::socket::{SocketDomain, SocketType, UnixSocket};
581    use starnix_lifecycle::AtomicUsizeCounter;
582    use starnix_sync::Unlocked;
583    use starnix_uapi::vfs::{EpollEvent, FdEvents};
584    use syncio::Zxio;
585    use zx::{
586        HandleBased, {self as zx},
587    };
588
589    #[::fuchsia::test]
590    async fn test_epoll_read_ready() {
591        static WRITE_COUNT: AtomicUsizeCounter = AtomicUsizeCounter::new(0);
592        const EVENT_DATA: u64 = 42;
593
594        spawn_kernel_and_run(async |locked, current_task| {
595            let kernel = current_task.kernel();
596            register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
597
598            let (pipe_out, pipe_in) = new_pipe(locked, &current_task).unwrap();
599
600            let test_string = "hello starnix".to_string();
601            let test_len = test_string.len();
602
603            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
604            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
605            epoll_file
606                .add(
607                    locked,
608                    &current_task,
609                    &pipe_out,
610                    &epoll_file_handle,
611                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
612                )
613                .unwrap();
614
615            let (sender, receiver) = std::sync::mpsc::channel();
616            let value = test_string.clone();
617            let closure = move |locked: &mut Locked<Unlocked>, task: &CurrentTask| {
618                let bytes_written = pipe_in
619                    .write(locked, &task, &mut VecInputBuffer::new(value.as_bytes()))
620                    .unwrap();
621                assert_eq!(bytes_written, test_len);
622                WRITE_COUNT.add(bytes_written);
623                sender.send(()).unwrap();
624            };
625            let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
626            kernel.kthreads.spawner().spawn_from_request(req);
627            let events =
628                epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
629            receiver.recv().unwrap();
630            assert_eq!(1, events.len());
631            let event = &events[0];
632            assert!(event.events().contains(FdEvents::POLLIN));
633            assert_eq!(event.data(), EVENT_DATA);
634
635            let mut buffer = VecOutputBuffer::new(test_len);
636            let bytes_read = pipe_out.read(locked, &current_task, &mut buffer).unwrap();
637            assert_eq!(bytes_read, WRITE_COUNT.get());
638            assert_eq!(bytes_read, test_len);
639            assert_eq!(buffer.data(), test_string.as_bytes());
640        })
641        .await;
642    }
643
644    #[::fuchsia::test]
645    async fn test_epoll_ready_then_wait() {
646        const EVENT_DATA: u64 = 42;
647
648        spawn_kernel_and_run(async |locked, current_task| {
649            let kernel = current_task.kernel();
650            register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
651
652            let (pipe_out, pipe_in) = new_pipe(locked, &current_task).unwrap();
653
654            let test_string = "hello starnix".to_string();
655            let test_bytes = test_string.as_bytes();
656            let test_len = test_bytes.len();
657
658            assert_eq!(
659                pipe_in.write(locked, &current_task, &mut VecInputBuffer::new(test_bytes)).unwrap(),
660                test_bytes.len()
661            );
662
663            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
664            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
665            epoll_file
666                .add(
667                    locked,
668                    &current_task,
669                    &pipe_out,
670                    &epoll_file_handle,
671                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
672                )
673                .unwrap();
674
675            let events =
676                epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
677            assert_eq!(1, events.len());
678            let event = &events[0];
679            assert!(event.events().contains(FdEvents::POLLIN));
680            assert_eq!(event.data(), EVENT_DATA);
681
682            let mut buffer = VecOutputBuffer::new(test_len);
683            let bytes_read = pipe_out.read(locked, &current_task, &mut buffer).unwrap();
684            assert_eq!(bytes_read, test_len);
685            assert_eq!(buffer.data(), test_bytes);
686        })
687        .await;
688    }
689
690    #[::fuchsia::test]
691    async fn test_epoll_ctl_cancel() {
692        spawn_kernel_and_run(async |locked, current_task| {
693            for do_cancel in [true, false] {
694                let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
695                let waiter = Waiter::new();
696
697                let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
698                let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
699                const EVENT_DATA: u64 = 42;
700                epoll_file
701                    .add(
702                        locked,
703                        &current_task,
704                        &event,
705                        &epoll_file_handle,
706                        EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
707                    )
708                    .unwrap();
709
710                if do_cancel {
711                    epoll_file.delete(&current_task, &event).unwrap();
712                }
713
714                let wait_canceler = event
715                    .wait_async(
716                        locked,
717                        &current_task,
718                        &waiter,
719                        FdEvents::POLLIN,
720                        EventHandler::None,
721                    )
722                    .expect("wait_async");
723                if do_cancel {
724                    wait_canceler.cancel();
725                }
726
727                let add_val = 1u64;
728
729                assert_eq!(
730                    event
731                        .write(
732                            locked,
733                            &current_task,
734                            &mut VecInputBuffer::new(&add_val.to_ne_bytes())
735                        )
736                        .unwrap(),
737                    std::mem::size_of::<u64>()
738                );
739
740                let events =
741                    epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO).unwrap();
742
743                if do_cancel {
744                    assert_eq!(0, events.len());
745                } else {
746                    assert_eq!(1, events.len());
747                    let event = &events[0];
748                    assert!(event.events().contains(FdEvents::POLLIN));
749                    assert_eq!(event.data(), EVENT_DATA);
750                }
751            }
752        })
753        .await;
754    }
755
756    #[::fuchsia::test]
757    async fn test_multiple_events() {
758        spawn_kernel_and_run(async |locked, current_task| {
759            let (client1, server1) = zx::Socket::create_stream();
760            let (client2, server2) = zx::Socket::create_stream();
761            let pipe1 = create_fuchsia_pipe(locked, &current_task, client1, OpenFlags::RDWR)
762                .expect("create_fuchsia_pipe");
763            let pipe2 = create_fuchsia_pipe(locked, &current_task, client2, OpenFlags::RDWR)
764                .expect("create_fuchsia_pipe");
765            let server1_zxio = Zxio::create(server1.into_handle()).expect("Zxio::create");
766            let server2_zxio = Zxio::create(server2.into_handle()).expect("Zxio::create");
767
768            let poll = |locked: &mut Locked<Unlocked>| {
769                let epoll_object = EpollFileObject::new_file(locked, &current_task);
770                let epoll_file = epoll_object.downcast_file::<EpollFileObject>().unwrap();
771                epoll_file
772                    .add(
773                        locked,
774                        &current_task,
775                        &pipe1,
776                        &epoll_object,
777                        EpollEvent::new(FdEvents::POLLIN, 1),
778                    )
779                    .expect("epoll_file.add");
780                epoll_file
781                    .add(
782                        locked,
783                        &current_task,
784                        &pipe2,
785                        &epoll_object,
786                        EpollEvent::new(FdEvents::POLLIN, 2),
787                    )
788                    .expect("epoll_file.add");
789                epoll_file.wait(locked, &current_task, 2, zx::MonotonicInstant::ZERO).expect("wait")
790            };
791
792            let fds = poll(locked);
793            assert!(fds.is_empty());
794
795            assert_eq!(server1_zxio.write(&[0]).expect("write"), 1);
796
797            let fds = poll(locked);
798            assert_eq!(fds.len(), 1);
799            assert_eq!(fds[0].events(), FdEvents::POLLIN);
800            assert_eq!(fds[0].data(), 1);
801            assert_eq!(
802                pipe1.read(locked, &current_task, &mut VecOutputBuffer::new(64)).expect("read"),
803                1
804            );
805
806            let fds = poll(locked);
807            assert!(fds.is_empty());
808
809            assert_eq!(server2_zxio.write(&[0]).expect("write"), 1);
810
811            let fds = poll(locked);
812            assert_eq!(fds.len(), 1);
813            assert_eq!(fds[0].events(), FdEvents::POLLIN);
814            assert_eq!(fds[0].data(), 2);
815            assert_eq!(
816                pipe2.read(locked, &current_task, &mut VecOutputBuffer::new(64)).expect("read"),
817                1
818            );
819
820            let fds = poll(locked);
821            assert!(fds.is_empty());
822        })
823        .await;
824    }
825
826    #[::fuchsia::test]
827    async fn test_cancel_after_notify() {
828        spawn_kernel_and_run(async |locked, current_task| {
829            let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
830            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
831            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
832
833            // Add a thing
834            const EVENT_DATA: u64 = 42;
835            epoll_file
836                .add(
837                    locked,
838                    &current_task,
839                    &event,
840                    &epoll_file_handle,
841                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
842                )
843                .unwrap();
844
845            // Make the thing send a notification, wait for it
846            let add_val = 1u64;
847            assert_eq!(
848                event
849                    .write(locked, &current_task, &mut VecInputBuffer::new(&add_val.to_ne_bytes()))
850                    .unwrap(),
851                std::mem::size_of::<u64>()
852            );
853
854            assert_eq!(
855                epoll_file
856                    .wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO)
857                    .unwrap()
858                    .len(),
859                1
860            );
861
862            // Remove the thing
863            epoll_file.delete(&current_task, &event).unwrap();
864
865            // Wait for new notifications
866            assert_eq!(
867                epoll_file
868                    .wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO)
869                    .unwrap()
870                    .len(),
871                0
872            );
873            // That shouldn't crash
874        })
875        .await;
876    }
877
878    #[::fuchsia::test]
879    async fn test_add_then_modify() {
880        spawn_kernel_and_run(async |locked, current_task| {
881            let (socket1, _socket2) = UnixSocket::new_pair(
882                locked,
883                &current_task,
884                SocketDomain::Unix,
885                SocketType::Stream,
886                OpenFlags::RDWR,
887            )
888            .expect("Failed to create socket pair.");
889
890            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
891            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
892
893            const EVENT_DATA: u64 = 42;
894            epoll_file
895                .add(
896                    locked,
897                    &current_task,
898                    &socket1,
899                    &epoll_file_handle,
900                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
901                )
902                .unwrap();
903            assert_eq!(
904                epoll_file
905                    .wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO)
906                    .unwrap()
907                    .len(),
908                0
909            );
910
911            let read_write_event = FdEvents::POLLIN | FdEvents::POLLOUT;
912            epoll_file
913                .modify(
914                    locked,
915                    &current_task,
916                    &socket1,
917                    EpollEvent::new(read_write_event, EVENT_DATA),
918                )
919                .unwrap();
920            let triggered_events =
921                epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO).unwrap();
922            assert_eq!(1, triggered_events.len());
923            let event = &triggered_events[0];
924            assert_eq!(event.events(), FdEvents::POLLOUT);
925            assert_eq!(event.data(), EVENT_DATA);
926        })
927        .await;
928    }
929
930    #[::fuchsia::test]
931    async fn test_waiter_removal() {
932        spawn_kernel_and_run(async |locked, current_task| {
933            let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
934            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
935            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
936
937            const EVENT_DATA: u64 = 42;
938            epoll_file
939                .add(
940                    locked,
941                    &current_task,
942                    &event,
943                    &epoll_file_handle,
944                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
945                )
946                .unwrap();
947
948            std::mem::drop(event);
949
950            assert!(epoll_file.waitable_state.lock().waiters.is_empty());
951        })
952        .await;
953    }
954}