Skip to main content

starnix_core/vfs/
epoll.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::power::WakeupSourceOrigin;
6use crate::task::{
7    CurrentTask, EventHandler, ReadyItem, ReadyItemKey, WaitCanceler, WaitQueue, Waiter,
8};
9use crate::vfs::{
10    Anon, FileHandle, FileObject, FileObjectState, FileOps, WeakFileHandle, fileops_impl_dataless,
11    fileops_impl_nonseekable, fileops_impl_noop_sync,
12};
13use itertools::Itertools;
14use starnix_logging::log_warn;
15use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex};
16use starnix_uapi::error;
17use starnix_uapi::errors::{EINTR, ETIMEDOUT, Errno};
18use starnix_uapi::open_flags::OpenFlags;
19use starnix_uapi::vfs::{EpollEvent, FdEvents};
20use std::collections::hash_map::Entry;
21use std::collections::{HashMap, VecDeque};
22use std::sync::Arc;
23
24/// Maximum depth of epoll instances monitoring one another.
25/// From https://man7.org/linux/man-pages/man2/epoll_ctl.2.html
26const MAX_NESTED_DEPTH: u32 = 5;
27
28/// WaitObject represents a FileHandle that is being waited upon.
29/// The `data` field is a user defined quantity passed in
30/// via `sys_epoll_ctl`. Typically C programs could use this
31/// to store a pointer to the data that needs to be processed
32/// after an event.
33struct WaitObject {
34    target: WeakFileHandle,
35    events: FdEvents,
36    data: u64,
37    wait_canceler: Option<WaitCanceler>,
38}
39
40impl WaitObject {
41    /// Returns the target `FileHandle` of the `WaitObject`, or `None` if the file has been closed.
42    ///
43    /// It is fine for the `FileHandle` to be closed after being added to an epoll, and subsequent
44    /// epoll_waits end up timing out (importantly not returning EBADF).
45    fn target(&self) -> Option<FileHandle> {
46        self.target.upgrade()
47    }
48}
49
50/// EpollKey acts as an key to a map of WaitObject.
51/// In reality it is a pointer to a FileHandle object.
52pub type EpollKey = usize;
53
54pub fn wakeup_source_name_for_epoll(current_task: &CurrentTask, key: EpollKey) -> String {
55    format!("[{}] {}", current_task.command(), key)
56}
57
58/// EpollFileObject represents the FileObject used to
59/// implement epoll_create1/epoll_ctl/epoll_pwait.
60#[derive(Default)]
61pub struct EpollFileObject {
62    waiter: Waiter,
63    /// Mutable state of this epoll object.
64    state: Mutex<EpollState>,
65    waitable_state: Arc<Mutex<EpollWaitableState>>,
66}
67
68#[derive(Default)]
69struct EpollState {
70    /// Any file tracked by this epoll instance
71    /// will exist as a key in `wait_objects`.
72    wait_objects: HashMap<ReadyItemKey, WaitObject>,
73    /// processing_list is a FIFO of events that are being
74    /// processed.
75    ///
76    /// Objects from the `EpollFileObject`'s `trigger_list` are moved into this
77    /// list so that we can handle triggered events without holding its lock
78    /// longer than we need to. This reduces contention with waited-on objects
79    /// that tries to notify this epoll object on subscribed events.
80    processing_list: VecDeque<ReadyItem>,
81    /// recheck_list is the list of items that need to have query_events checked
82    /// at the start of the next EpollFileObject::wait call. This is only items
83    /// that were returned from the last wait call, because those are the only
84    /// ones that might need to be returned even if no events come in between
85    /// wait calls.
86    recheck_list: Vec<ReadyItemKey>,
87}
88
89#[derive(Default)]
90struct EpollWaitableState {
91    /// trigger_list is a FIFO of events that have
92    /// happened, but have not yet been processed.
93    trigger_list: VecDeque<ReadyItem>,
94    /// A list of waiters waiting for events from this
95    /// epoll instance.
96    waiters: WaitQueue,
97}
98
99impl EpollFileObject {
100    /// Allocate a new, empty epoll object.
101    pub fn new_file<L>(locked: &mut Locked<L>, current_task: &CurrentTask) -> FileHandle
102    where
103        L: LockEqualOrBefore<FileOpsCore>,
104    {
105        let epoll = Box::new(EpollFileObject::default());
106
107        #[cfg(any(test, debug_assertions))]
108        {
109            let _l1 = epoll.state.lock();
110            let _l2 = epoll.waitable_state.lock();
111        }
112
113        Anon::new_private_file(locked, current_task, epoll, OpenFlags::RDWR, "[eventpoll]")
114    }
115
116    fn wait_on_file<L>(
117        &self,
118        locked: &mut Locked<L>,
119        current_task: &CurrentTask,
120        key: ReadyItemKey,
121        wait_object: &mut WaitObject,
122    ) -> Result<(), Errno>
123    where
124        L: LockEqualOrBefore<FileOpsCore>,
125    {
126        // First start the wait. If an event happens after this, we'll get it.
127        self.wait_on_file_edge_triggered(locked, current_task, key, wait_object)?;
128
129        self.do_recheck(locked, current_task, wait_object, key)?;
130
131        Ok(())
132    }
133
134    fn do_recheck<L>(
135        &self,
136        locked: &mut Locked<L>,
137        current_task: &CurrentTask,
138        wait_object: &mut WaitObject,
139        key: ReadyItemKey,
140    ) -> Result<(), Errno>
141    where
142        L: LockEqualOrBefore<FileOpsCore>,
143    {
144        let Some(target) = wait_object.target() else { return Ok(()) };
145        let events = target.query_events(locked, current_task)?;
146        if !(events & wait_object.events).is_empty() {
147            self.waiter.wake_immediately(events, self.new_wait_handler(key));
148            if let Some(wait_canceler) = wait_object.wait_canceler.take() {
149                wait_canceler.cancel();
150            } else {
151                log_warn!("wait canceler should have been set by `wait_on_file_edge_triggered`");
152            }
153        }
154        Ok(())
155    }
156
157    fn wait_on_file_edge_triggered<L>(
158        &self,
159        locked: &mut Locked<L>,
160        current_task: &CurrentTask,
161        key: ReadyItemKey,
162        wait_object: &mut WaitObject,
163    ) -> Result<(), Errno>
164    where
165        L: LockEqualOrBefore<FileOpsCore>,
166    {
167        let Some(target) = wait_object.target() else {
168            return Ok(());
169        };
170
171        wait_object.wait_canceler = target.wait_async(
172            locked,
173            current_task,
174            &self.waiter,
175            wait_object.events,
176            self.new_wait_handler(key),
177        );
178        if wait_object.wait_canceler.is_none() {
179            return error!(EPERM);
180        }
181        Ok(())
182    }
183
184    /// Checks if adding self to the `epoll_file_object` at `epoll_file_handle` would cause a loop
185    /// or exceed max depth.
186    fn check_eloop(&self, parent: &FileHandle, depth_left: u32) -> Result<(), Errno> {
187        if depth_left == 0 {
188            return error!(ELOOP);
189        }
190
191        let state = self.state.lock();
192        for nested_object in state.wait_objects.values() {
193            let Some(child) = nested_object.target() else {
194                continue;
195            };
196            let Some(child_file) = child.downcast_file::<EpollFileObject>() else {
197                continue;
198            };
199
200            if Arc::ptr_eq(&child, parent) {
201                return error!(ELOOP);
202            }
203            child_file.check_eloop(parent, depth_left - 1)?;
204        }
205
206        Ok(())
207    }
208
209    /// Asynchronously wait on certain events happening on a FileHandle.
210    pub fn add<L>(
211        &self,
212        locked: &mut Locked<L>,
213        current_task: &CurrentTask,
214        file: &FileHandle,
215        epoll_file_handle: &FileHandle,
216        epoll_event: EpollEvent,
217    ) -> Result<(), Errno>
218    where
219        L: LockEqualOrBefore<FileOpsCore>,
220    {
221        // Check if adding this file would cause a cycle at a max depth of 5.
222        if let Some(epoll_to_add) = file.downcast_file::<EpollFileObject>() {
223            // We need to check for `MAX_NESTED_DEPTH - 1` because adding `epoll_to_add` to self
224            // would result in a total depth of one more.
225            epoll_to_add.check_eloop(epoll_file_handle, MAX_NESTED_DEPTH - 1)?;
226        }
227
228        let mut state = self.state.lock();
229        let key = file.id.as_epoll_key().into();
230        match state.wait_objects.entry(key) {
231            Entry::Occupied(_) => error!(EEXIST),
232            Entry::Vacant(entry) => {
233                let wait_object = entry.insert(WaitObject {
234                    target: Arc::downgrade(file),
235                    events: epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR,
236                    data: epoll_event.data(),
237                    wait_canceler: None,
238                });
239                self.wait_on_file(locked, current_task, key, wait_object)
240            }
241        }
242    }
243
244    /// Modify the events we are looking for on a Filehandle.
245    pub fn modify<L>(
246        &self,
247        locked: &mut Locked<L>,
248        current_task: &CurrentTask,
249        file: &FileHandle,
250        epoll_event: EpollEvent,
251    ) -> Result<(), Errno>
252    where
253        L: LockEqualOrBefore<FileOpsCore>,
254    {
255        let mut state = self.state.lock();
256        let key = file.id.as_epoll_key();
257        state.recheck_list.retain(|x| *x != key.into());
258        let Some(wait_object) = state.wait_objects.get_mut(&key.into()) else {
259            return error!(ENOENT);
260        };
261        if let Some(wait_canceler) = wait_object.wait_canceler.take() {
262            wait_canceler.cancel();
263        }
264        wait_object.events = epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR;
265        wait_object.data = epoll_event.data();
266        // If the new epoll event doesn't include EPOLLWAKEUP, we need to take down the
267        // wake lease. This ensures that the system doesn't stay awake unnecessarily when
268        // the event no longer requires it to be awake.
269        if wait_object.events.contains(FdEvents::EPOLLWAKEUP)
270            && !epoll_event.events().contains(FdEvents::EPOLLWAKEUP)
271        {
272            current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
273                &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key)),
274            );
275        }
276        self.wait_on_file(locked, current_task, key.into(), wait_object)
277    }
278
279    /// Cancel an asynchronous wait on an object. Events triggered before
280    /// calling this will still be delivered.
281    pub fn delete(&self, current_task: &CurrentTask, file: &FileObject) -> Result<(), Errno> {
282        let mut state = self.state.lock();
283        let key = file.id.as_epoll_key().into();
284        let ReadyItemKey::Usize(key_usize) = key else {
285            // This should never happen as we only use Usize keys for files in epoll.
286            return error!(EINVAL);
287        };
288        if let Some(mut wait_object) = state.wait_objects.remove(&key) {
289            if let Some(wait_canceler) = wait_object.wait_canceler.take() {
290                wait_canceler.cancel();
291            }
292            state.recheck_list.retain(|x| *x != key);
293            // Deactivate the wake lock if it was active.
294            current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
295                &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key_usize)),
296            );
297            Ok(())
298        } else {
299            error!(ENOENT)
300        }
301    }
302
303    /// Stores events from the Epoll's trigger list to the parameter `pending_list`. This does not
304    /// actually invoke the waiter which is how items are added to the trigger list. The caller
305    /// will have to do that before calling if needed.
306    ///
307    /// If an event in the trigger list is stale, the event will be re-added to the waiter.
308    ///
309    /// Returns true if any events were added. False means there was nothing in the trigger list.
310    fn process_triggered_events<L>(
311        &self,
312        locked: &mut Locked<L>,
313        current_task: &CurrentTask,
314        pending_list: &mut Vec<ReadyItem>,
315        max_events: usize,
316    ) -> Result<(), Errno>
317    where
318        L: LockEqualOrBefore<FileOpsCore>,
319    {
320        let mut state = self.state.lock();
321        // Move all the elements from `self.trigger_list` to this intermediary
322        // queue that we handle events from. This reduces the time spent holding
323        // `self.trigger_list`'s lock which reduces contention with objects that
324        // this epoll object has subscribed for notifications from.
325        state.processing_list.append(&mut self.waitable_state.lock().trigger_list);
326        while pending_list.len() < max_events && !state.processing_list.is_empty() {
327            if let Some(pending) = state.processing_list.pop_front() {
328                if let Some(wait) = state.wait_objects.get_mut(&pending.key) {
329                    // The weak pointer to the FileObject target can be gone if the file was closed
330                    // out from under us. If this happens it is not an error: ignore it and
331                    // continue.
332                    if let Some(target) = wait.target.upgrade() {
333                        let ready = ReadyItem {
334                            key: pending.key,
335                            events: target.query_events(locked, current_task)?,
336                        };
337                        if ready.events.intersects(wait.events) {
338                            pending_list.push(ready);
339                        } else {
340                            // Another thread already handled this event, wait for another one.
341                            self.wait_on_file(locked, current_task, pending.key, wait)?;
342                        }
343                    }
344                }
345            }
346        }
347        Ok(())
348    }
349
350    /// Waits until an event exists in `pending_list` or until `timeout` has
351    /// been reached.
352    fn wait_until_pending_event<L>(
353        &self,
354        locked: &mut Locked<L>,
355        current_task: &CurrentTask,
356        max_events: usize,
357        mut wait_deadline: zx::MonotonicInstant,
358    ) -> Result<Vec<ReadyItem>, Errno>
359    where
360        L: LockEqualOrBefore<FileOpsCore>,
361    {
362        let mut pending_list = Vec::new();
363
364        loop {
365            self.process_triggered_events(locked, current_task, &mut pending_list, max_events)?;
366
367            if pending_list.len() == max_events {
368                break; // No input events or output list full, nothing more we can do.
369            }
370
371            if !pending_list.is_empty() {
372                // We now know we have at least one event to return. We shouldn't return
373                // immediately, in case there are more events available, but the next loop should
374                // wait with a 0 timeout to prevent further blocking.
375                wait_deadline = zx::MonotonicInstant::ZERO;
376            }
377
378            // Loop back to check if there are more items in the Waiter's queue. Every wait_until()
379            // call will process a single event. In order to drain as many events as we can that
380            // are synchronously available, keep trying until it reports empty.
381            //
382            // The handlers in the waits cause items to be appended to trigger_list. See the closure
383            // in `wait_on_file` to see how this happens.
384            //
385            // This wait may return EINTR for nonzero timeouts which is not an error. We must be
386            // careful not to lose events if this happens.
387            //
388            // The first time through this loop we'll use the timeout passed into this function so
389            // can get EINTR. But since we haven't done anything or accumulated any results yet it's
390            // OK to immediately return and no information will be lost.
391            match self.waiter.wait_until(locked, current_task, wait_deadline) {
392                Err(err) if err == ETIMEDOUT => break,
393                Err(err) if err == EINTR => {
394                    // Terminating early will lose any events in the pending_list so that should
395                    // only be for unrecoverable errors (not EINTR). The only time there should be a
396                    // nonzero wait_deadline (and hence the ability to encounter EINTR) is when the
397                    // pending list is empty.
398                    debug_assert!(
399                        pending_list.is_empty(),
400                        "Got EINTR from wait of {}ns with {} items pending.",
401                        wait_deadline.into_nanos(),
402                        pending_list.len()
403                    );
404                    return Err(err);
405                }
406                // TODO check if this is supposed to actually fail!
407                result => result?,
408            }
409        }
410
411        Ok(pending_list)
412    }
413
414    /// Blocking wait on all waited upon events with a timeout.
415    pub fn wait<L>(
416        &self,
417        locked: &mut Locked<L>,
418        current_task: &CurrentTask,
419        max_events: usize,
420        deadline: zx::MonotonicInstant,
421    ) -> Result<Vec<EpollEvent>, Errno>
422    where
423        L: LockEqualOrBefore<FileOpsCore>,
424    {
425        {
426            let mut state = self.state.lock();
427            let recheck_list = std::mem::take(&mut state.recheck_list);
428            for key in recheck_list {
429                if let ReadyItemKey::Usize(key) = key {
430                    current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
431                        &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key)),
432                    );
433                }
434                let wait_object = state.wait_objects.get_mut(&key).unwrap();
435                self.do_recheck(locked, current_task, wait_object, key)?;
436            }
437        }
438
439        let pending_list =
440            self.wait_until_pending_event(locked, current_task, max_events, deadline)?;
441
442        // Process the pending list and add processed ReadyItem
443        // entries to the rearm_list for the next wait.
444        let mut result = vec![];
445        let mut state = self.state.lock();
446        let state = &mut *state;
447        for pending_event in pending_list.iter().unique_by(|e| e.key) {
448            // The wait could have been deleted by here,
449            // so ignore the None case.
450            let Some(wait) = state.wait_objects.get_mut(&pending_event.key) else { continue };
451
452            let reported_events = pending_event.events & wait.events;
453            result.push(EpollEvent::new(reported_events, wait.data));
454
455            // Files marked with `EPOLLONESHOT` should only notify
456            // once and need to be rearmed manually with epoll_ctl_mod().
457            if wait.events.contains(FdEvents::EPOLLONESHOT) {
458                continue;
459            }
460
461            self.wait_on_file_edge_triggered(locked, current_task, pending_event.key, wait)?;
462
463            if !wait.events.contains(FdEvents::EPOLLET) {
464                state.recheck_list.push(pending_event.key);
465            }
466
467            // TODO: is this really only supposed to happen for level-triggered events?
468            if !wait.events.contains(FdEvents::EPOLLET) {
469                // When this is the first time epoll_wait on this epoll fd, create and
470                // hold a wake lease until the next epoll_wait.
471                if wait.events.contains(FdEvents::EPOLLWAKEUP) {
472                    if let ReadyItemKey::Usize(key) = pending_event.key {
473                        current_task.kernel().suspend_resume_manager.activate_wakeup_source(
474                            WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(
475                                current_task,
476                                key,
477                            )),
478                        );
479                    }
480                }
481            }
482        }
483
484        Ok(result)
485    }
486}
487
488impl FileOps for EpollFileObject {
489    fileops_impl_nonseekable!();
490    fileops_impl_noop_sync!();
491    fileops_impl_dataless!();
492
493    fn wait_async(
494        &self,
495        _locked: &mut Locked<FileOpsCore>,
496        _file: &FileObject,
497        _current_task: &CurrentTask,
498        waiter: &Waiter,
499        events: FdEvents,
500        handler: EventHandler,
501    ) -> Option<WaitCanceler> {
502        Some(self.waitable_state.lock().waiters.wait_async_fd_events(waiter, events, handler))
503    }
504
505    fn query_events(
506        &self,
507        locked: &mut Locked<FileOpsCore>,
508        _file: &FileObject,
509        current_task: &CurrentTask,
510    ) -> Result<FdEvents, Errno> {
511        let mut events = FdEvents::empty();
512        let state = self.state.lock();
513        if !state.processing_list.is_empty() || !self.waitable_state.lock().trigger_list.is_empty()
514        {
515            events |= FdEvents::POLLIN;
516        } else {
517            for key in &state.recheck_list {
518                let wait_object = state.wait_objects.get(&key).unwrap();
519                let Some(target) = wait_object.target() else { continue };
520                if !(target.query_events(locked, current_task)? & wait_object.events).is_empty() {
521                    events |= FdEvents::POLLIN;
522                    break;
523                }
524            }
525        }
526        Ok(events)
527    }
528
529    fn close(
530        self: Box<Self>,
531        _locked: &mut Locked<FileOpsCore>,
532        _file: &FileObjectState,
533        current_task: &CurrentTask,
534    ) {
535        let guard = self.state.lock();
536        for (key, _wait_object) in guard.wait_objects.iter() {
537            if let ReadyItemKey::Usize(key) = key {
538                current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
539                    &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, *key)),
540                );
541            }
542        }
543    }
544}
545
546#[derive(Clone)]
547pub struct EpollEventHandler {
548    key: ReadyItemKey,
549    waitable_state: Arc<Mutex<EpollWaitableState>>,
550}
551
552impl EpollEventHandler {
553    pub fn handle(self, events: FdEvents) {
554        let mut waitable_state = self.waitable_state.lock();
555        waitable_state.trigger_list.push_back(ReadyItem { key: self.key, events });
556        waitable_state.waiters.notify_fd_events(FdEvents::POLLIN);
557    }
558}
559
560impl EpollFileObject {
561    fn new_wait_handler(&self, key: ReadyItemKey) -> EventHandler {
562        EventHandler::Epoll(EpollEventHandler {
563            key,
564            waitable_state: Arc::clone(&self.waitable_state),
565        })
566    }
567}
568
569#[cfg(test)]
570mod tests {
571    use super::*;
572    use crate::fs::fuchsia::create_fuchsia_pipe;
573    use crate::task::Waiter;
574    use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
575    use crate::testing::spawn_kernel_and_run;
576    use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
577    use crate::vfs::eventfd::{EventFdType, new_eventfd};
578    use crate::vfs::fs_registry::FsRegistry;
579    use crate::vfs::pipe::{new_pipe, register_pipe_fs};
580    use crate::vfs::socket::{SocketDomain, SocketType, UnixSocket};
581    use starnix_lifecycle::AtomicCounter;
582    use starnix_sync::Unlocked;
583    use starnix_uapi::vfs::{EpollEvent, FdEvents};
584    use syncio::Zxio;
585
586    #[::fuchsia::test]
587    async fn test_epoll_read_ready() {
588        static WRITE_COUNT: AtomicCounter<usize> = AtomicCounter::<usize>::new_const(0);
589        const EVENT_DATA: u64 = 42;
590
591        spawn_kernel_and_run(async |locked, current_task| {
592            let kernel = current_task.kernel();
593            register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
594
595            let (pipe_out, pipe_in) = new_pipe(locked, &current_task).unwrap();
596
597            let test_string = "hello starnix".to_string();
598            let test_len = test_string.len();
599
600            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
601            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
602            epoll_file
603                .add(
604                    locked,
605                    &current_task,
606                    &pipe_out,
607                    &epoll_file_handle,
608                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
609                )
610                .unwrap();
611
612            let (sender, receiver) = std::sync::mpsc::channel();
613            let value = test_string.clone();
614            let closure = move |locked: &mut Locked<Unlocked>, task: &CurrentTask| {
615                let bytes_written = pipe_in
616                    .write(locked, &task, &mut VecInputBuffer::new(value.as_bytes()))
617                    .unwrap();
618                assert_eq!(bytes_written, test_len);
619                WRITE_COUNT.add(bytes_written);
620                sender.send(()).unwrap();
621            };
622            let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
623            kernel.kthreads.spawner().spawn_from_request(req);
624            let events =
625                epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
626            receiver.recv().unwrap();
627            assert_eq!(1, events.len());
628            let event = &events[0];
629            assert!(event.events().contains(FdEvents::POLLIN));
630            assert_eq!(event.data(), EVENT_DATA);
631
632            let mut buffer = VecOutputBuffer::new(test_len);
633            let bytes_read = pipe_out.read(locked, &current_task, &mut buffer).unwrap();
634            assert_eq!(bytes_read, WRITE_COUNT.get());
635            assert_eq!(bytes_read, test_len);
636            assert_eq!(buffer.data(), test_string.as_bytes());
637        })
638        .await;
639    }
640
641    #[::fuchsia::test]
642    async fn test_epoll_ready_then_wait() {
643        const EVENT_DATA: u64 = 42;
644
645        spawn_kernel_and_run(async |locked, current_task| {
646            let kernel = current_task.kernel();
647            register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
648
649            let (pipe_out, pipe_in) = new_pipe(locked, &current_task).unwrap();
650
651            let test_string = "hello starnix".to_string();
652            let test_bytes = test_string.as_bytes();
653            let test_len = test_bytes.len();
654
655            assert_eq!(
656                pipe_in.write(locked, &current_task, &mut VecInputBuffer::new(test_bytes)).unwrap(),
657                test_bytes.len()
658            );
659
660            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
661            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
662            epoll_file
663                .add(
664                    locked,
665                    &current_task,
666                    &pipe_out,
667                    &epoll_file_handle,
668                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
669                )
670                .unwrap();
671
672            let events =
673                epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
674            assert_eq!(1, events.len());
675            let event = &events[0];
676            assert!(event.events().contains(FdEvents::POLLIN));
677            assert_eq!(event.data(), EVENT_DATA);
678
679            let mut buffer = VecOutputBuffer::new(test_len);
680            let bytes_read = pipe_out.read(locked, &current_task, &mut buffer).unwrap();
681            assert_eq!(bytes_read, test_len);
682            assert_eq!(buffer.data(), test_bytes);
683        })
684        .await;
685    }
686
687    #[::fuchsia::test]
688    async fn test_epoll_ctl_cancel() {
689        spawn_kernel_and_run(async |locked, current_task| {
690            for do_cancel in [true, false] {
691                let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
692                let waiter = Waiter::new();
693
694                let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
695                let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
696                const EVENT_DATA: u64 = 42;
697                epoll_file
698                    .add(
699                        locked,
700                        &current_task,
701                        &event,
702                        &epoll_file_handle,
703                        EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
704                    )
705                    .unwrap();
706
707                if do_cancel {
708                    epoll_file.delete(&current_task, &event).unwrap();
709                }
710
711                let wait_canceler = event
712                    .wait_async(
713                        locked,
714                        &current_task,
715                        &waiter,
716                        FdEvents::POLLIN,
717                        EventHandler::None,
718                    )
719                    .expect("wait_async");
720                if do_cancel {
721                    wait_canceler.cancel();
722                }
723
724                let add_val = 1u64;
725
726                assert_eq!(
727                    event
728                        .write(
729                            locked,
730                            &current_task,
731                            &mut VecInputBuffer::new(&add_val.to_ne_bytes())
732                        )
733                        .unwrap(),
734                    std::mem::size_of::<u64>()
735                );
736
737                let events =
738                    epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO).unwrap();
739
740                if do_cancel {
741                    assert_eq!(0, events.len());
742                } else {
743                    assert_eq!(1, events.len());
744                    let event = &events[0];
745                    assert!(event.events().contains(FdEvents::POLLIN));
746                    assert_eq!(event.data(), EVENT_DATA);
747                }
748            }
749        })
750        .await;
751    }
752
753    #[::fuchsia::test]
754    async fn test_multiple_events() {
755        spawn_kernel_and_run(async |locked, current_task| {
756            let (client1, server1) = zx::Socket::create_stream();
757            let (client2, server2) = zx::Socket::create_stream();
758            let pipe1 = create_fuchsia_pipe(locked, &current_task, client1, OpenFlags::RDWR)
759                .expect("create_fuchsia_pipe");
760            let pipe2 = create_fuchsia_pipe(locked, &current_task, client2, OpenFlags::RDWR)
761                .expect("create_fuchsia_pipe");
762            let server1_zxio = Zxio::create(server1.into_handle()).expect("Zxio::create");
763            let server2_zxio = Zxio::create(server2.into_handle()).expect("Zxio::create");
764
765            let poll = |locked: &mut Locked<Unlocked>| {
766                let epoll_object = EpollFileObject::new_file(locked, &current_task);
767                let epoll_file = epoll_object.downcast_file::<EpollFileObject>().unwrap();
768                epoll_file
769                    .add(
770                        locked,
771                        &current_task,
772                        &pipe1,
773                        &epoll_object,
774                        EpollEvent::new(FdEvents::POLLIN, 1),
775                    )
776                    .expect("epoll_file.add");
777                epoll_file
778                    .add(
779                        locked,
780                        &current_task,
781                        &pipe2,
782                        &epoll_object,
783                        EpollEvent::new(FdEvents::POLLIN, 2),
784                    )
785                    .expect("epoll_file.add");
786                epoll_file.wait(locked, &current_task, 2, zx::MonotonicInstant::ZERO).expect("wait")
787            };
788
789            let fds = poll(locked);
790            assert!(fds.is_empty());
791
792            assert_eq!(server1_zxio.write(&[0]).expect("write"), 1);
793
794            let fds = poll(locked);
795            assert_eq!(fds.len(), 1);
796            assert_eq!(fds[0].events(), FdEvents::POLLIN);
797            assert_eq!(fds[0].data(), 1);
798            assert_eq!(
799                pipe1.read(locked, &current_task, &mut VecOutputBuffer::new(64)).expect("read"),
800                1
801            );
802
803            let fds = poll(locked);
804            assert!(fds.is_empty());
805
806            assert_eq!(server2_zxio.write(&[0]).expect("write"), 1);
807
808            let fds = poll(locked);
809            assert_eq!(fds.len(), 1);
810            assert_eq!(fds[0].events(), FdEvents::POLLIN);
811            assert_eq!(fds[0].data(), 2);
812            assert_eq!(
813                pipe2.read(locked, &current_task, &mut VecOutputBuffer::new(64)).expect("read"),
814                1
815            );
816
817            let fds = poll(locked);
818            assert!(fds.is_empty());
819        })
820        .await;
821    }
822
823    #[::fuchsia::test]
824    async fn test_cancel_after_notify() {
825        spawn_kernel_and_run(async |locked, current_task| {
826            let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
827            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
828            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
829
830            // Add a thing
831            const EVENT_DATA: u64 = 42;
832            epoll_file
833                .add(
834                    locked,
835                    &current_task,
836                    &event,
837                    &epoll_file_handle,
838                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
839                )
840                .unwrap();
841
842            // Make the thing send a notification, wait for it
843            let add_val = 1u64;
844            assert_eq!(
845                event
846                    .write(locked, &current_task, &mut VecInputBuffer::new(&add_val.to_ne_bytes()))
847                    .unwrap(),
848                std::mem::size_of::<u64>()
849            );
850
851            assert_eq!(
852                epoll_file
853                    .wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO)
854                    .unwrap()
855                    .len(),
856                1
857            );
858
859            // Remove the thing
860            epoll_file.delete(&current_task, &event).unwrap();
861
862            // Wait for new notifications
863            assert_eq!(
864                epoll_file
865                    .wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO)
866                    .unwrap()
867                    .len(),
868                0
869            );
870            // That shouldn't crash
871        })
872        .await;
873    }
874
875    #[::fuchsia::test]
876    async fn test_add_then_modify() {
877        spawn_kernel_and_run(async |locked, current_task| {
878            let (socket1, _socket2) = UnixSocket::new_pair(
879                locked,
880                &current_task,
881                SocketDomain::Unix,
882                SocketType::Stream,
883                OpenFlags::RDWR,
884            )
885            .expect("Failed to create socket pair.");
886
887            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
888            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
889
890            const EVENT_DATA: u64 = 42;
891            epoll_file
892                .add(
893                    locked,
894                    &current_task,
895                    &socket1,
896                    &epoll_file_handle,
897                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
898                )
899                .unwrap();
900            assert_eq!(
901                epoll_file
902                    .wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO)
903                    .unwrap()
904                    .len(),
905                0
906            );
907
908            let read_write_event = FdEvents::POLLIN | FdEvents::POLLOUT;
909            epoll_file
910                .modify(
911                    locked,
912                    &current_task,
913                    &socket1,
914                    EpollEvent::new(read_write_event, EVENT_DATA),
915                )
916                .unwrap();
917            let triggered_events =
918                epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO).unwrap();
919            assert_eq!(1, triggered_events.len());
920            let event = &triggered_events[0];
921            assert_eq!(event.events(), FdEvents::POLLOUT);
922            assert_eq!(event.data(), EVENT_DATA);
923        })
924        .await;
925    }
926
927    #[::fuchsia::test]
928    async fn test_waiter_removal() {
929        spawn_kernel_and_run(async |locked, current_task| {
930            let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
931            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
932            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
933
934            const EVENT_DATA: u64 = 42;
935            epoll_file
936                .add(
937                    locked,
938                    &current_task,
939                    &event,
940                    &epoll_file_handle,
941                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
942                )
943                .unwrap();
944
945            std::mem::drop(event);
946
947            assert!(epoll_file.waitable_state.lock().waiters.is_empty());
948        })
949        .await;
950    }
951}