starnix_core/vfs/
epoll.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::task::{
6    CurrentTask, EventHandler, ReadyItem, ReadyItemKey, WaitCanceler, WaitQueue, Waiter,
7};
8use crate::vfs::{
9    Anon, FileHandle, FileObject, FileObjectState, FileOps, WeakFileHandle, fileops_impl_dataless,
10    fileops_impl_nonseekable, fileops_impl_noop_sync,
11};
12use itertools::Itertools;
13use starnix_logging::log_warn;
14use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex};
15use starnix_uapi::error;
16use starnix_uapi::errors::{EINTR, ETIMEDOUT, Errno};
17use starnix_uapi::open_flags::OpenFlags;
18use starnix_uapi::vfs::{EpollEvent, FdEvents};
19use std::collections::hash_map::Entry;
20use std::collections::{HashMap, VecDeque};
21use std::sync::Arc;
22
23/// Maximum depth of epoll instances monitoring one another.
24/// From https://man7.org/linux/man-pages/man2/epoll_ctl.2.html
25const MAX_NESTED_DEPTH: u32 = 5;
26
27/// WaitObject represents a FileHandle that is being waited upon.
28/// The `data` field is a user defined quantity passed in
29/// via `sys_epoll_ctl`. Typically C programs could use this
30/// to store a pointer to the data that needs to be processed
31/// after an event.
32struct WaitObject {
33    target: WeakFileHandle,
34    events: FdEvents,
35    data: u64,
36    wait_canceler: Option<WaitCanceler>,
37}
38
39impl WaitObject {
40    /// Returns the target `FileHandle` of the `WaitObject`, or `None` if the file has been closed.
41    ///
42    /// It is fine for the `FileHandle` to be closed after being added to an epoll, and subsequent
43    /// epoll_waits end up timing out (importantly not returning EBADF).
44    fn target(&self) -> Option<FileHandle> {
45        self.target.upgrade()
46    }
47}
48
49/// EpollKey acts as an key to a map of WaitObject.
50/// In reality it is a pointer to a FileHandle object.
51pub type EpollKey = usize;
52
53/// EpollFileObject represents the FileObject used to
54/// implement epoll_create1/epoll_ctl/epoll_pwait.
55#[derive(Default)]
56pub struct EpollFileObject {
57    waiter: Waiter,
58    /// Mutable state of this epoll object.
59    state: Mutex<EpollState>,
60    waitable_state: Arc<Mutex<EpollWaitableState>>,
61}
62
63#[derive(Default)]
64struct EpollState {
65    /// Any file tracked by this epoll instance
66    /// will exist as a key in `wait_objects`.
67    wait_objects: HashMap<ReadyItemKey, WaitObject>,
68    /// processing_list is a FIFO of events that are being
69    /// processed.
70    ///
71    /// Objects from the `EpollFileObject`'s `trigger_list` are moved into this
72    /// list so that we can handle triggered events without holding its lock
73    /// longer than we need to. This reduces contention with waited-on objects
74    /// that tries to notify this epoll object on subscribed events.
75    processing_list: VecDeque<ReadyItem>,
76    /// recheck_list is the list of items that need to have query_events checked
77    /// at the start of the next EpollFileObject::wait call. This is only items
78    /// that were returned from the last wait call, because those are the only
79    /// ones that might need to be returned even if no events come in between
80    /// wait calls.
81    recheck_list: Vec<ReadyItemKey>,
82}
83
84#[derive(Default)]
85struct EpollWaitableState {
86    /// trigger_list is a FIFO of events that have
87    /// happened, but have not yet been processed.
88    trigger_list: VecDeque<ReadyItem>,
89    /// A list of waiters waiting for events from this
90    /// epoll instance.
91    waiters: WaitQueue,
92}
93
94impl EpollFileObject {
95    /// Allocate a new, empty epoll object.
96    pub fn new_file<L>(locked: &mut Locked<L>, current_task: &CurrentTask) -> FileHandle
97    where
98        L: LockEqualOrBefore<FileOpsCore>,
99    {
100        let epoll = Box::new(EpollFileObject::default());
101
102        #[cfg(any(test, debug_assertions))]
103        {
104            let _l1 = epoll.state.lock();
105            let _l2 = epoll.waitable_state.lock();
106        }
107
108        Anon::new_private_file(locked, current_task, epoll, OpenFlags::RDWR, "[eventpoll]")
109    }
110
111    fn wait_on_file<L>(
112        &self,
113        locked: &mut Locked<L>,
114        current_task: &CurrentTask,
115        key: ReadyItemKey,
116        wait_object: &mut WaitObject,
117    ) -> Result<(), Errno>
118    where
119        L: LockEqualOrBefore<FileOpsCore>,
120    {
121        // First start the wait. If an event happens after this, we'll get it.
122        self.wait_on_file_edge_triggered(locked, current_task, key, wait_object)?;
123
124        self.do_recheck(locked, current_task, wait_object, key)?;
125
126        Ok(())
127    }
128
129    fn do_recheck<L>(
130        &self,
131        locked: &mut Locked<L>,
132        current_task: &CurrentTask,
133        wait_object: &mut WaitObject,
134        key: ReadyItemKey,
135    ) -> Result<(), Errno>
136    where
137        L: LockEqualOrBefore<FileOpsCore>,
138    {
139        let Some(target) = wait_object.target() else { return Ok(()) };
140        let events = target.query_events(locked, current_task)?;
141        if !(events & wait_object.events).is_empty() {
142            self.waiter.wake_immediately(events, self.new_wait_handler(key));
143            if let Some(wait_canceler) = wait_object.wait_canceler.take() {
144                wait_canceler.cancel();
145            } else {
146                log_warn!("wait canceler should have been set by `wait_on_file_edge_triggered`");
147            }
148        }
149        Ok(())
150    }
151
152    fn wait_on_file_edge_triggered<L>(
153        &self,
154        locked: &mut Locked<L>,
155        current_task: &CurrentTask,
156        key: ReadyItemKey,
157        wait_object: &mut WaitObject,
158    ) -> Result<(), Errno>
159    where
160        L: LockEqualOrBefore<FileOpsCore>,
161    {
162        let Some(target) = wait_object.target() else {
163            return Ok(());
164        };
165
166        wait_object.wait_canceler = target.wait_async(
167            locked,
168            current_task,
169            &self.waiter,
170            wait_object.events,
171            self.new_wait_handler(key),
172        );
173        if wait_object.wait_canceler.is_none() {
174            return error!(EPERM);
175        }
176        Ok(())
177    }
178
179    /// Checks if adding self to the `epoll_file_object` at `epoll_file_handle` would cause a loop
180    /// or exceed max depth.
181    fn check_eloop(&self, parent: &FileHandle, depth_left: u32) -> Result<(), Errno> {
182        if depth_left == 0 {
183            return error!(ELOOP);
184        }
185
186        let state = self.state.lock();
187        for nested_object in state.wait_objects.values() {
188            let Some(child) = nested_object.target() else {
189                continue;
190            };
191            let Some(child_file) = child.downcast_file::<EpollFileObject>() else {
192                continue;
193            };
194
195            if Arc::ptr_eq(&child, parent) {
196                return error!(ELOOP);
197            }
198            child_file.check_eloop(parent, depth_left - 1)?;
199        }
200
201        Ok(())
202    }
203
204    /// Asynchronously wait on certain events happening on a FileHandle.
205    pub fn add<L>(
206        &self,
207        locked: &mut Locked<L>,
208        current_task: &CurrentTask,
209        file: &FileHandle,
210        epoll_file_handle: &FileHandle,
211        epoll_event: EpollEvent,
212    ) -> Result<(), Errno>
213    where
214        L: LockEqualOrBefore<FileOpsCore>,
215    {
216        // Check if adding this file would cause a cycle at a max depth of 5.
217        if let Some(epoll_to_add) = file.downcast_file::<EpollFileObject>() {
218            // We need to check for `MAX_NESTED_DEPTH - 1` because adding `epoll_to_add` to self
219            // would result in a total depth of one more.
220            epoll_to_add.check_eloop(epoll_file_handle, MAX_NESTED_DEPTH - 1)?;
221        }
222
223        let mut state = self.state.lock();
224        let key = file.id.as_epoll_key().into();
225        match state.wait_objects.entry(key) {
226            Entry::Occupied(_) => error!(EEXIST),
227            Entry::Vacant(entry) => {
228                let wait_object = entry.insert(WaitObject {
229                    target: Arc::downgrade(file),
230                    events: epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR,
231                    data: epoll_event.data(),
232                    wait_canceler: None,
233                });
234                self.wait_on_file(locked, current_task, key, wait_object)
235            }
236        }
237    }
238
239    /// Modify the events we are looking for on a Filehandle.
240    pub fn modify<L>(
241        &self,
242        locked: &mut Locked<L>,
243        current_task: &CurrentTask,
244        file: &FileHandle,
245        epoll_event: EpollEvent,
246    ) -> Result<(), Errno>
247    where
248        L: LockEqualOrBefore<FileOpsCore>,
249    {
250        let mut state = self.state.lock();
251        let key = file.id.as_epoll_key();
252        state.recheck_list.retain(|x| *x != key.into());
253        let Some(wait_object) = state.wait_objects.get_mut(&key.into()) else {
254            return error!(ENOENT);
255        };
256        if let Some(wait_canceler) = wait_object.wait_canceler.take() {
257            wait_canceler.cancel();
258        }
259        wait_object.events = epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR;
260        wait_object.data = epoll_event.data();
261        // If the new epoll event doesn't include EPOLLWAKEUP, we need to take down the
262        // wake lease. This ensures that the system doesn't stay awake unnecessarily when
263        // the event no longer requires it to be awake.
264        if wait_object.events.contains(FdEvents::EPOLLWAKEUP)
265            && !epoll_event.events().contains(FdEvents::EPOLLWAKEUP)
266        {
267            current_task.kernel().suspend_resume_manager.remove_epoll(key);
268        }
269        self.wait_on_file(locked, current_task, key.into(), wait_object)
270    }
271
272    /// Cancel an asynchronous wait on an object. Events triggered before
273    /// calling this will still be delivered.
274    pub fn delete(&self, file: &FileObject) -> Result<(), Errno> {
275        let mut state = self.state.lock();
276        let key = file.id.as_epoll_key().into();
277        if let Some(mut wait_object) = state.wait_objects.remove(&key) {
278            if let Some(wait_canceler) = wait_object.wait_canceler.take() {
279                wait_canceler.cancel();
280            }
281            state.recheck_list.retain(|x| *x != key);
282            Ok(())
283        } else {
284            error!(ENOENT)
285        }
286    }
287
288    /// Stores events from the Epoll's trigger list to the parameter `pending_list`. This does not
289    /// actually invoke the waiter which is how items are added to the trigger list. The caller
290    /// will have to do that before calling if needed.
291    ///
292    /// If an event in the trigger list is stale, the event will be re-added to the waiter.
293    ///
294    /// Returns true if any events were added. False means there was nothing in the trigger list.
295    fn process_triggered_events<L>(
296        &self,
297        locked: &mut Locked<L>,
298        current_task: &CurrentTask,
299        pending_list: &mut Vec<ReadyItem>,
300        max_events: usize,
301    ) -> Result<(), Errno>
302    where
303        L: LockEqualOrBefore<FileOpsCore>,
304    {
305        let mut state = self.state.lock();
306        // Move all the elements from `self.trigger_list` to this intermediary
307        // queue that we handle events from. This reduces the time spent holding
308        // `self.trigger_list`'s lock which reduces contention with objects that
309        // this epoll object has subscribed for notifications from.
310        state.processing_list.append(&mut self.waitable_state.lock().trigger_list);
311        while pending_list.len() < max_events && !state.processing_list.is_empty() {
312            if let Some(pending) = state.processing_list.pop_front() {
313                if let Some(wait) = state.wait_objects.get_mut(&pending.key) {
314                    // The weak pointer to the FileObject target can be gone if the file was closed
315                    // out from under us. If this happens it is not an error: ignore it and
316                    // continue.
317                    if let Some(target) = wait.target.upgrade() {
318                        let ready = ReadyItem {
319                            key: pending.key,
320                            events: target.query_events(locked, current_task)?,
321                        };
322                        if ready.events.intersects(wait.events) {
323                            pending_list.push(ready);
324                        } else {
325                            // Another thread already handled this event, wait for another one.
326                            self.wait_on_file(locked, current_task, pending.key, wait)?;
327                        }
328                    }
329                }
330            }
331        }
332        Ok(())
333    }
334
335    /// Waits until an event exists in `pending_list` or until `timeout` has
336    /// been reached.
337    fn wait_until_pending_event<L>(
338        &self,
339        locked: &mut Locked<L>,
340        current_task: &CurrentTask,
341        max_events: usize,
342        mut wait_deadline: zx::MonotonicInstant,
343    ) -> Result<Vec<ReadyItem>, Errno>
344    where
345        L: LockEqualOrBefore<FileOpsCore>,
346    {
347        let mut pending_list = Vec::new();
348
349        loop {
350            self.process_triggered_events(locked, current_task, &mut pending_list, max_events)?;
351
352            if pending_list.len() == max_events {
353                break; // No input events or output list full, nothing more we can do.
354            }
355
356            if !pending_list.is_empty() {
357                // We now know we have at least one event to return. We shouldn't return
358                // immediately, in case there are more events available, but the next loop should
359                // wait with a 0 timeout to prevent further blocking.
360                wait_deadline = zx::MonotonicInstant::ZERO;
361            }
362
363            // Loop back to check if there are more items in the Waiter's queue. Every wait_until()
364            // call will process a single event. In order to drain as many events as we can that
365            // are synchronously available, keep trying until it reports empty.
366            //
367            // The handlers in the waits cause items to be appended to trigger_list. See the closure
368            // in `wait_on_file` to see how this happens.
369            //
370            // This wait may return EINTR for nonzero timeouts which is not an error. We must be
371            // careful not to lose events if this happens.
372            //
373            // The first time through this loop we'll use the timeout passed into this function so
374            // can get EINTR. But since we haven't done anything or accumulated any results yet it's
375            // OK to immediately return and no information will be lost.
376            match self.waiter.wait_until(locked, current_task, wait_deadline) {
377                Err(err) if err == ETIMEDOUT => break,
378                Err(err) if err == EINTR => {
379                    // Terminating early will lose any events in the pending_list so that should
380                    // only be for unrecoverable errors (not EINTR). The only time there should be a
381                    // nonzero wait_deadline (and hence the ability to encounter EINTR) is when the
382                    // pending list is empty.
383                    debug_assert!(
384                        pending_list.is_empty(),
385                        "Got EINTR from wait of {}ns with {} items pending.",
386                        wait_deadline.into_nanos(),
387                        pending_list.len()
388                    );
389                    return Err(err);
390                }
391                // TODO check if this is supposed to actually fail!
392                result => result?,
393            }
394        }
395
396        Ok(pending_list)
397    }
398
399    /// Blocking wait on all waited upon events with a timeout.
400    pub fn wait<L>(
401        &self,
402        locked: &mut Locked<L>,
403        current_task: &CurrentTask,
404        max_events: usize,
405        deadline: zx::MonotonicInstant,
406    ) -> Result<Vec<EpollEvent>, Errno>
407    where
408        L: LockEqualOrBefore<FileOpsCore>,
409    {
410        {
411            let mut state = self.state.lock();
412            let recheck_list = std::mem::take(&mut state.recheck_list);
413            for key in recheck_list {
414                if let ReadyItemKey::Usize(key) = key {
415                    current_task.kernel().suspend_resume_manager.remove_epoll(key);
416                }
417                let wait_object = state.wait_objects.get_mut(&key).unwrap();
418                self.do_recheck(locked, current_task, wait_object, key)?;
419            }
420        }
421
422        let pending_list =
423            self.wait_until_pending_event(locked, current_task, max_events, deadline)?;
424
425        // Process the pending list and add processed ReadyItem
426        // entries to the rearm_list for the next wait.
427        let mut result = vec![];
428        let mut state = self.state.lock();
429        let state = &mut *state;
430        for pending_event in pending_list.iter().unique_by(|e| e.key) {
431            // The wait could have been deleted by here,
432            // so ignore the None case.
433            let Some(wait) = state.wait_objects.get_mut(&pending_event.key) else { continue };
434
435            let reported_events = pending_event.events & wait.events;
436            result.push(EpollEvent::new(reported_events, wait.data));
437
438            // Files marked with `EPOLLONESHOT` should only notify
439            // once and need to be rearmed manually with epoll_ctl_mod().
440            if wait.events.contains(FdEvents::EPOLLONESHOT) {
441                continue;
442            }
443
444            self.wait_on_file_edge_triggered(locked, current_task, pending_event.key, wait)?;
445
446            if !wait.events.contains(FdEvents::EPOLLET) {
447                state.recheck_list.push(pending_event.key);
448            }
449
450            // TODO: is this really only supposed to happen for level-triggered events?
451            if !wait.events.contains(FdEvents::EPOLLET) {
452                // When this is the first time epoll_wait on this epoll fd, create and
453                // hold a wake lease until the next epoll_wait.
454                if wait.events.contains(FdEvents::EPOLLWAKEUP) {
455                    if let ReadyItemKey::Usize(key) = pending_event.key {
456                        current_task.kernel().suspend_resume_manager.add_epoll(current_task, key)
457                    }
458                }
459            }
460        }
461
462        Ok(result)
463    }
464
465    /// Drop the wake lease associated with the `file`.
466    pub fn drop_lease(&self, current_task: &CurrentTask, file: &FileHandle) {
467        let mut guard = self.state.lock();
468        let key = file.id.as_epoll_key();
469        if let Entry::Occupied(_) = guard.wait_objects.entry(key.into()) {
470            current_task.kernel().suspend_resume_manager.remove_epoll(key);
471        }
472    }
473
474    /// Activate the wake lease associated with the `file`.
475    ///
476    /// `baton_lease` is passed by reference to ensure that the lease remains on hold until
477    /// this function returns.
478    pub fn activate_lease(
479        &self,
480        current_task: &CurrentTask,
481        file: &FileHandle,
482        _baton_lease: &zx::NullableHandle,
483    ) -> Result<(), Errno> {
484        let key = file.id.as_epoll_key();
485        // Only add a wake lock if the wake event is marked as EPOLLWAKEUP. We promise
486        // not to suspend the system after epoll results in a wake, and userspace
487        // promises a quick turnaround for an epoll reschedule.
488        let add_epoll = self
489            .state
490            .lock()
491            .wait_objects
492            .get(&key.into())
493            .map(|obj| obj.events.contains(FdEvents::EPOLLWAKEUP))
494            .unwrap_or(false);
495        if add_epoll {
496            current_task.kernel().suspend_resume_manager.add_epoll(current_task, key);
497        }
498        Ok(())
499    }
500}
501
502impl FileOps for EpollFileObject {
503    fileops_impl_nonseekable!();
504    fileops_impl_noop_sync!();
505    fileops_impl_dataless!();
506
507    fn wait_async(
508        &self,
509        _locked: &mut Locked<FileOpsCore>,
510        _file: &FileObject,
511        _current_task: &CurrentTask,
512        waiter: &Waiter,
513        events: FdEvents,
514        handler: EventHandler,
515    ) -> Option<WaitCanceler> {
516        Some(self.waitable_state.lock().waiters.wait_async_fd_events(waiter, events, handler))
517    }
518
519    fn query_events(
520        &self,
521        locked: &mut Locked<FileOpsCore>,
522        _file: &FileObject,
523        current_task: &CurrentTask,
524    ) -> Result<FdEvents, Errno> {
525        let mut events = FdEvents::empty();
526        let state = self.state.lock();
527        if !state.processing_list.is_empty() || !self.waitable_state.lock().trigger_list.is_empty()
528        {
529            events |= FdEvents::POLLIN;
530        } else {
531            for key in &state.recheck_list {
532                let wait_object = state.wait_objects.get(&key).unwrap();
533                let Some(target) = wait_object.target() else { continue };
534                if !(target.query_events(locked, current_task)? & wait_object.events).is_empty() {
535                    events |= FdEvents::POLLIN;
536                    break;
537                }
538            }
539        }
540        Ok(events)
541    }
542
543    fn close(
544        self: Box<Self>,
545        _locked: &mut Locked<FileOpsCore>,
546        _file: &FileObjectState,
547        current_task: &CurrentTask,
548    ) {
549        let guard = self.state.lock();
550        for (key, _wait_object) in guard.wait_objects.iter() {
551            if let ReadyItemKey::Usize(key) = key {
552                current_task.kernel().suspend_resume_manager.remove_epoll(*key);
553            }
554        }
555    }
556}
557
558#[derive(Clone)]
559pub struct EpollEventHandler {
560    key: ReadyItemKey,
561    waitable_state: Arc<Mutex<EpollWaitableState>>,
562}
563
564impl EpollEventHandler {
565    pub fn handle(self, events: FdEvents) {
566        let mut waitable_state = self.waitable_state.lock();
567        waitable_state.trigger_list.push_back(ReadyItem { key: self.key, events });
568        waitable_state.waiters.notify_fd_events(FdEvents::POLLIN);
569    }
570}
571
572impl EpollFileObject {
573    fn new_wait_handler(&self, key: ReadyItemKey) -> EventHandler {
574        EventHandler::Epoll(EpollEventHandler {
575            key,
576            waitable_state: Arc::clone(&self.waitable_state),
577        })
578    }
579}
580
581#[cfg(test)]
582mod tests {
583    use super::*;
584    use crate::fs::fuchsia::create_fuchsia_pipe;
585    use crate::task::Waiter;
586    use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
587    use crate::testing::spawn_kernel_and_run;
588    use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
589    use crate::vfs::eventfd::{EventFdType, new_eventfd};
590    use crate::vfs::fs_registry::FsRegistry;
591    use crate::vfs::pipe::{new_pipe, register_pipe_fs};
592    use crate::vfs::socket::{SocketDomain, SocketType, UnixSocket};
593    use starnix_lifecycle::AtomicUsizeCounter;
594    use starnix_sync::Unlocked;
595    use starnix_uapi::vfs::{EpollEvent, FdEvents};
596    use syncio::Zxio;
597    use zx::{
598        HandleBased, {self as zx},
599    };
600
601    #[::fuchsia::test]
602    async fn test_epoll_read_ready() {
603        static WRITE_COUNT: AtomicUsizeCounter = AtomicUsizeCounter::new(0);
604        const EVENT_DATA: u64 = 42;
605
606        spawn_kernel_and_run(async |locked, current_task| {
607            let kernel = current_task.kernel();
608            register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
609
610            let (pipe_out, pipe_in) = new_pipe(locked, &current_task).unwrap();
611
612            let test_string = "hello starnix".to_string();
613            let test_len = test_string.len();
614
615            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
616            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
617            epoll_file
618                .add(
619                    locked,
620                    &current_task,
621                    &pipe_out,
622                    &epoll_file_handle,
623                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
624                )
625                .unwrap();
626
627            let (sender, receiver) = std::sync::mpsc::channel();
628            let value = test_string.clone();
629            let closure = move |locked: &mut Locked<Unlocked>, task: &CurrentTask| {
630                let bytes_written = pipe_in
631                    .write(locked, &task, &mut VecInputBuffer::new(value.as_bytes()))
632                    .unwrap();
633                assert_eq!(bytes_written, test_len);
634                WRITE_COUNT.add(bytes_written);
635                sender.send(()).unwrap();
636            };
637            let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
638            kernel.kthreads.spawner().spawn_from_request(req);
639            let events =
640                epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
641            receiver.recv().unwrap();
642            assert_eq!(1, events.len());
643            let event = &events[0];
644            assert!(event.events().contains(FdEvents::POLLIN));
645            assert_eq!(event.data(), EVENT_DATA);
646
647            let mut buffer = VecOutputBuffer::new(test_len);
648            let bytes_read = pipe_out.read(locked, &current_task, &mut buffer).unwrap();
649            assert_eq!(bytes_read, WRITE_COUNT.get());
650            assert_eq!(bytes_read, test_len);
651            assert_eq!(buffer.data(), test_string.as_bytes());
652        })
653        .await;
654    }
655
656    #[::fuchsia::test]
657    async fn test_epoll_ready_then_wait() {
658        const EVENT_DATA: u64 = 42;
659
660        spawn_kernel_and_run(async |locked, current_task| {
661            let kernel = current_task.kernel();
662            register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
663
664            let (pipe_out, pipe_in) = new_pipe(locked, &current_task).unwrap();
665
666            let test_string = "hello starnix".to_string();
667            let test_bytes = test_string.as_bytes();
668            let test_len = test_bytes.len();
669
670            assert_eq!(
671                pipe_in.write(locked, &current_task, &mut VecInputBuffer::new(test_bytes)).unwrap(),
672                test_bytes.len()
673            );
674
675            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
676            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
677            epoll_file
678                .add(
679                    locked,
680                    &current_task,
681                    &pipe_out,
682                    &epoll_file_handle,
683                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
684                )
685                .unwrap();
686
687            let events =
688                epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
689            assert_eq!(1, events.len());
690            let event = &events[0];
691            assert!(event.events().contains(FdEvents::POLLIN));
692            assert_eq!(event.data(), EVENT_DATA);
693
694            let mut buffer = VecOutputBuffer::new(test_len);
695            let bytes_read = pipe_out.read(locked, &current_task, &mut buffer).unwrap();
696            assert_eq!(bytes_read, test_len);
697            assert_eq!(buffer.data(), test_bytes);
698        })
699        .await;
700    }
701
702    #[::fuchsia::test]
703    async fn test_epoll_ctl_cancel() {
704        spawn_kernel_and_run(async |locked, current_task| {
705            for do_cancel in [true, false] {
706                let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
707                let waiter = Waiter::new();
708
709                let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
710                let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
711                const EVENT_DATA: u64 = 42;
712                epoll_file
713                    .add(
714                        locked,
715                        &current_task,
716                        &event,
717                        &epoll_file_handle,
718                        EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
719                    )
720                    .unwrap();
721
722                if do_cancel {
723                    epoll_file.delete(&event).unwrap();
724                }
725
726                let wait_canceler = event
727                    .wait_async(
728                        locked,
729                        &current_task,
730                        &waiter,
731                        FdEvents::POLLIN,
732                        EventHandler::None,
733                    )
734                    .expect("wait_async");
735                if do_cancel {
736                    wait_canceler.cancel();
737                }
738
739                let add_val = 1u64;
740
741                assert_eq!(
742                    event
743                        .write(
744                            locked,
745                            &current_task,
746                            &mut VecInputBuffer::new(&add_val.to_ne_bytes())
747                        )
748                        .unwrap(),
749                    std::mem::size_of::<u64>()
750                );
751
752                let events =
753                    epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO).unwrap();
754
755                if do_cancel {
756                    assert_eq!(0, events.len());
757                } else {
758                    assert_eq!(1, events.len());
759                    let event = &events[0];
760                    assert!(event.events().contains(FdEvents::POLLIN));
761                    assert_eq!(event.data(), EVENT_DATA);
762                }
763            }
764        })
765        .await;
766    }
767
768    #[::fuchsia::test]
769    async fn test_multiple_events() {
770        spawn_kernel_and_run(async |locked, current_task| {
771            let (client1, server1) = zx::Socket::create_stream();
772            let (client2, server2) = zx::Socket::create_stream();
773            let pipe1 = create_fuchsia_pipe(locked, &current_task, client1, OpenFlags::RDWR)
774                .expect("create_fuchsia_pipe");
775            let pipe2 = create_fuchsia_pipe(locked, &current_task, client2, OpenFlags::RDWR)
776                .expect("create_fuchsia_pipe");
777            let server1_zxio = Zxio::create(server1.into_handle()).expect("Zxio::create");
778            let server2_zxio = Zxio::create(server2.into_handle()).expect("Zxio::create");
779
780            let poll = |locked: &mut Locked<Unlocked>| {
781                let epoll_object = EpollFileObject::new_file(locked, &current_task);
782                let epoll_file = epoll_object.downcast_file::<EpollFileObject>().unwrap();
783                epoll_file
784                    .add(
785                        locked,
786                        &current_task,
787                        &pipe1,
788                        &epoll_object,
789                        EpollEvent::new(FdEvents::POLLIN, 1),
790                    )
791                    .expect("epoll_file.add");
792                epoll_file
793                    .add(
794                        locked,
795                        &current_task,
796                        &pipe2,
797                        &epoll_object,
798                        EpollEvent::new(FdEvents::POLLIN, 2),
799                    )
800                    .expect("epoll_file.add");
801                epoll_file.wait(locked, &current_task, 2, zx::MonotonicInstant::ZERO).expect("wait")
802            };
803
804            let fds = poll(locked);
805            assert!(fds.is_empty());
806
807            assert_eq!(server1_zxio.write(&[0]).expect("write"), 1);
808
809            let fds = poll(locked);
810            assert_eq!(fds.len(), 1);
811            assert_eq!(fds[0].events(), FdEvents::POLLIN);
812            assert_eq!(fds[0].data(), 1);
813            assert_eq!(
814                pipe1.read(locked, &current_task, &mut VecOutputBuffer::new(64)).expect("read"),
815                1
816            );
817
818            let fds = poll(locked);
819            assert!(fds.is_empty());
820
821            assert_eq!(server2_zxio.write(&[0]).expect("write"), 1);
822
823            let fds = poll(locked);
824            assert_eq!(fds.len(), 1);
825            assert_eq!(fds[0].events(), FdEvents::POLLIN);
826            assert_eq!(fds[0].data(), 2);
827            assert_eq!(
828                pipe2.read(locked, &current_task, &mut VecOutputBuffer::new(64)).expect("read"),
829                1
830            );
831
832            let fds = poll(locked);
833            assert!(fds.is_empty());
834        })
835        .await;
836    }
837
838    #[::fuchsia::test]
839    async fn test_cancel_after_notify() {
840        spawn_kernel_and_run(async |locked, current_task| {
841            let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
842            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
843            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
844
845            // Add a thing
846            const EVENT_DATA: u64 = 42;
847            epoll_file
848                .add(
849                    locked,
850                    &current_task,
851                    &event,
852                    &epoll_file_handle,
853                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
854                )
855                .unwrap();
856
857            // Make the thing send a notification, wait for it
858            let add_val = 1u64;
859            assert_eq!(
860                event
861                    .write(locked, &current_task, &mut VecInputBuffer::new(&add_val.to_ne_bytes()))
862                    .unwrap(),
863                std::mem::size_of::<u64>()
864            );
865
866            assert_eq!(
867                epoll_file
868                    .wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO)
869                    .unwrap()
870                    .len(),
871                1
872            );
873
874            // Remove the thing
875            epoll_file.delete(&event).unwrap();
876
877            // Wait for new notifications
878            assert_eq!(
879                epoll_file
880                    .wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO)
881                    .unwrap()
882                    .len(),
883                0
884            );
885            // That shouldn't crash
886        })
887        .await;
888    }
889
890    #[::fuchsia::test]
891    async fn test_add_then_modify() {
892        spawn_kernel_and_run(async |locked, current_task| {
893            let (socket1, _socket2) = UnixSocket::new_pair(
894                locked,
895                &current_task,
896                SocketDomain::Unix,
897                SocketType::Stream,
898                OpenFlags::RDWR,
899            )
900            .expect("Failed to create socket pair.");
901
902            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
903            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
904
905            const EVENT_DATA: u64 = 42;
906            epoll_file
907                .add(
908                    locked,
909                    &current_task,
910                    &socket1,
911                    &epoll_file_handle,
912                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
913                )
914                .unwrap();
915            assert_eq!(
916                epoll_file
917                    .wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO)
918                    .unwrap()
919                    .len(),
920                0
921            );
922
923            let read_write_event = FdEvents::POLLIN | FdEvents::POLLOUT;
924            epoll_file
925                .modify(
926                    locked,
927                    &current_task,
928                    &socket1,
929                    EpollEvent::new(read_write_event, EVENT_DATA),
930                )
931                .unwrap();
932            let triggered_events =
933                epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO).unwrap();
934            assert_eq!(1, triggered_events.len());
935            let event = &triggered_events[0];
936            assert_eq!(event.events(), FdEvents::POLLOUT);
937            assert_eq!(event.data(), EVENT_DATA);
938        })
939        .await;
940    }
941
942    #[::fuchsia::test]
943    async fn test_waiter_removal() {
944        spawn_kernel_and_run(async |locked, current_task| {
945            let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
946            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
947            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
948
949            const EVENT_DATA: u64 = 42;
950            epoll_file
951                .add(
952                    locked,
953                    &current_task,
954                    &event,
955                    &epoll_file_handle,
956                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
957                )
958                .unwrap();
959
960            std::mem::drop(event);
961
962            assert!(epoll_file.waitable_state.lock().waiters.is_empty());
963        })
964        .await;
965    }
966}