starnix_core/vfs/
epoll.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::power::WakeupSourceOrigin;
6use crate::task::{
7    CurrentTask, EventHandler, ReadyItem, ReadyItemKey, WaitCanceler, WaitQueue, Waiter,
8};
9use crate::vfs::{
10    Anon, FileHandle, FileObject, FileObjectState, FileOps, WeakFileHandle, fileops_impl_dataless,
11    fileops_impl_nonseekable, fileops_impl_noop_sync,
12};
13use itertools::Itertools;
14use starnix_logging::log_warn;
15use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex};
16use starnix_uapi::error;
17use starnix_uapi::errors::{EINTR, ETIMEDOUT, Errno};
18use starnix_uapi::open_flags::OpenFlags;
19use starnix_uapi::vfs::{EpollEvent, FdEvents};
20use std::collections::hash_map::Entry;
21use std::collections::{HashMap, VecDeque};
22use std::sync::Arc;
23
24/// Maximum depth of epoll instances monitoring one another.
25/// From https://man7.org/linux/man-pages/man2/epoll_ctl.2.html
26const MAX_NESTED_DEPTH: u32 = 5;
27
28/// WaitObject represents a FileHandle that is being waited upon.
29/// The `data` field is a user defined quantity passed in
30/// via `sys_epoll_ctl`. Typically C programs could use this
31/// to store a pointer to the data that needs to be processed
32/// after an event.
33struct WaitObject {
34    target: WeakFileHandle,
35    events: FdEvents,
36    data: u64,
37    wait_canceler: Option<WaitCanceler>,
38}
39
40impl WaitObject {
41    /// Returns the target `FileHandle` of the `WaitObject`, or `None` if the file has been closed.
42    ///
43    /// It is fine for the `FileHandle` to be closed after being added to an epoll, and subsequent
44    /// epoll_waits end up timing out (importantly not returning EBADF).
45    fn target(&self) -> Option<FileHandle> {
46        self.target.upgrade()
47    }
48}
49
50/// EpollKey acts as an key to a map of WaitObject.
51/// In reality it is a pointer to a FileHandle object.
52pub type EpollKey = usize;
53
54pub fn wakeup_source_name_for_epoll(current_task: &CurrentTask, key: EpollKey) -> String {
55    format!("[{}] {}", current_task.command(), key)
56}
57
58/// EpollFileObject represents the FileObject used to
59/// implement epoll_create1/epoll_ctl/epoll_pwait.
60#[derive(Default)]
61pub struct EpollFileObject {
62    waiter: Waiter,
63    /// Mutable state of this epoll object.
64    state: Mutex<EpollState>,
65    waitable_state: Arc<Mutex<EpollWaitableState>>,
66}
67
68#[derive(Default)]
69struct EpollState {
70    /// Any file tracked by this epoll instance
71    /// will exist as a key in `wait_objects`.
72    wait_objects: HashMap<ReadyItemKey, WaitObject>,
73    /// processing_list is a FIFO of events that are being
74    /// processed.
75    ///
76    /// Objects from the `EpollFileObject`'s `trigger_list` are moved into this
77    /// list so that we can handle triggered events without holding its lock
78    /// longer than we need to. This reduces contention with waited-on objects
79    /// that tries to notify this epoll object on subscribed events.
80    processing_list: VecDeque<ReadyItem>,
81    /// recheck_list is the list of items that need to have query_events checked
82    /// at the start of the next EpollFileObject::wait call. This is only items
83    /// that were returned from the last wait call, because those are the only
84    /// ones that might need to be returned even if no events come in between
85    /// wait calls.
86    recheck_list: Vec<ReadyItemKey>,
87}
88
89#[derive(Default)]
90struct EpollWaitableState {
91    /// trigger_list is a FIFO of events that have
92    /// happened, but have not yet been processed.
93    trigger_list: VecDeque<ReadyItem>,
94    /// A list of waiters waiting for events from this
95    /// epoll instance.
96    waiters: WaitQueue,
97}
98
99impl EpollFileObject {
100    /// Allocate a new, empty epoll object.
101    pub fn new_file<L>(locked: &mut Locked<L>, current_task: &CurrentTask) -> FileHandle
102    where
103        L: LockEqualOrBefore<FileOpsCore>,
104    {
105        let epoll = Box::new(EpollFileObject::default());
106
107        #[cfg(any(test, debug_assertions))]
108        {
109            let _l1 = epoll.state.lock();
110            let _l2 = epoll.waitable_state.lock();
111        }
112
113        Anon::new_private_file(locked, current_task, epoll, OpenFlags::RDWR, "[eventpoll]")
114    }
115
116    fn wait_on_file<L>(
117        &self,
118        locked: &mut Locked<L>,
119        current_task: &CurrentTask,
120        key: ReadyItemKey,
121        wait_object: &mut WaitObject,
122    ) -> Result<(), Errno>
123    where
124        L: LockEqualOrBefore<FileOpsCore>,
125    {
126        // First start the wait. If an event happens after this, we'll get it.
127        self.wait_on_file_edge_triggered(locked, current_task, key, wait_object)?;
128
129        self.do_recheck(locked, current_task, wait_object, key)?;
130
131        Ok(())
132    }
133
134    fn do_recheck<L>(
135        &self,
136        locked: &mut Locked<L>,
137        current_task: &CurrentTask,
138        wait_object: &mut WaitObject,
139        key: ReadyItemKey,
140    ) -> Result<(), Errno>
141    where
142        L: LockEqualOrBefore<FileOpsCore>,
143    {
144        let Some(target) = wait_object.target() else { return Ok(()) };
145        let events = target.query_events(locked, current_task)?;
146        if !(events & wait_object.events).is_empty() {
147            self.waiter.wake_immediately(events, self.new_wait_handler(key));
148            if let Some(wait_canceler) = wait_object.wait_canceler.take() {
149                wait_canceler.cancel();
150            } else {
151                log_warn!("wait canceler should have been set by `wait_on_file_edge_triggered`");
152            }
153        }
154        Ok(())
155    }
156
157    fn wait_on_file_edge_triggered<L>(
158        &self,
159        locked: &mut Locked<L>,
160        current_task: &CurrentTask,
161        key: ReadyItemKey,
162        wait_object: &mut WaitObject,
163    ) -> Result<(), Errno>
164    where
165        L: LockEqualOrBefore<FileOpsCore>,
166    {
167        let Some(target) = wait_object.target() else {
168            return Ok(());
169        };
170
171        wait_object.wait_canceler = target.wait_async(
172            locked,
173            current_task,
174            &self.waiter,
175            wait_object.events,
176            self.new_wait_handler(key),
177        );
178        if wait_object.wait_canceler.is_none() {
179            return error!(EPERM);
180        }
181        Ok(())
182    }
183
184    /// Checks if adding self to the `epoll_file_object` at `epoll_file_handle` would cause a loop
185    /// or exceed max depth.
186    fn check_eloop(&self, parent: &FileHandle, depth_left: u32) -> Result<(), Errno> {
187        if depth_left == 0 {
188            return error!(ELOOP);
189        }
190
191        let state = self.state.lock();
192        for nested_object in state.wait_objects.values() {
193            let Some(child) = nested_object.target() else {
194                continue;
195            };
196            let Some(child_file) = child.downcast_file::<EpollFileObject>() else {
197                continue;
198            };
199
200            if Arc::ptr_eq(&child, parent) {
201                return error!(ELOOP);
202            }
203            child_file.check_eloop(parent, depth_left - 1)?;
204        }
205
206        Ok(())
207    }
208
209    /// Asynchronously wait on certain events happening on a FileHandle.
210    pub fn add<L>(
211        &self,
212        locked: &mut Locked<L>,
213        current_task: &CurrentTask,
214        file: &FileHandle,
215        epoll_file_handle: &FileHandle,
216        epoll_event: EpollEvent,
217    ) -> Result<(), Errno>
218    where
219        L: LockEqualOrBefore<FileOpsCore>,
220    {
221        // Check if adding this file would cause a cycle at a max depth of 5.
222        if let Some(epoll_to_add) = file.downcast_file::<EpollFileObject>() {
223            // We need to check for `MAX_NESTED_DEPTH - 1` because adding `epoll_to_add` to self
224            // would result in a total depth of one more.
225            epoll_to_add.check_eloop(epoll_file_handle, MAX_NESTED_DEPTH - 1)?;
226        }
227
228        let mut state = self.state.lock();
229        let key = file.id.as_epoll_key().into();
230        match state.wait_objects.entry(key) {
231            Entry::Occupied(_) => error!(EEXIST),
232            Entry::Vacant(entry) => {
233                let wait_object = entry.insert(WaitObject {
234                    target: Arc::downgrade(file),
235                    events: epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR,
236                    data: epoll_event.data(),
237                    wait_canceler: None,
238                });
239                self.wait_on_file(locked, current_task, key, wait_object)
240            }
241        }
242    }
243
244    /// Modify the events we are looking for on a Filehandle.
245    pub fn modify<L>(
246        &self,
247        locked: &mut Locked<L>,
248        current_task: &CurrentTask,
249        file: &FileHandle,
250        epoll_event: EpollEvent,
251    ) -> Result<(), Errno>
252    where
253        L: LockEqualOrBefore<FileOpsCore>,
254    {
255        let mut state = self.state.lock();
256        let key = file.id.as_epoll_key();
257        state.recheck_list.retain(|x| *x != key.into());
258        let Some(wait_object) = state.wait_objects.get_mut(&key.into()) else {
259            return error!(ENOENT);
260        };
261        if let Some(wait_canceler) = wait_object.wait_canceler.take() {
262            wait_canceler.cancel();
263        }
264        wait_object.events = epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR;
265        wait_object.data = epoll_event.data();
266        // If the new epoll event doesn't include EPOLLWAKEUP, we need to take down the
267        // wake lease. This ensures that the system doesn't stay awake unnecessarily when
268        // the event no longer requires it to be awake.
269        if wait_object.events.contains(FdEvents::EPOLLWAKEUP)
270            && !epoll_event.events().contains(FdEvents::EPOLLWAKEUP)
271        {
272            current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
273                &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key)),
274            );
275        }
276        self.wait_on_file(locked, current_task, key.into(), wait_object)
277    }
278
279    /// Cancel an asynchronous wait on an object. Events triggered before
280    /// calling this will still be delivered.
281    pub fn delete(&self, file: &FileObject) -> Result<(), Errno> {
282        let mut state = self.state.lock();
283        let key = file.id.as_epoll_key().into();
284        if let Some(mut wait_object) = state.wait_objects.remove(&key) {
285            if let Some(wait_canceler) = wait_object.wait_canceler.take() {
286                wait_canceler.cancel();
287            }
288            state.recheck_list.retain(|x| *x != key);
289            Ok(())
290        } else {
291            error!(ENOENT)
292        }
293    }
294
295    /// Stores events from the Epoll's trigger list to the parameter `pending_list`. This does not
296    /// actually invoke the waiter which is how items are added to the trigger list. The caller
297    /// will have to do that before calling if needed.
298    ///
299    /// If an event in the trigger list is stale, the event will be re-added to the waiter.
300    ///
301    /// Returns true if any events were added. False means there was nothing in the trigger list.
302    fn process_triggered_events<L>(
303        &self,
304        locked: &mut Locked<L>,
305        current_task: &CurrentTask,
306        pending_list: &mut Vec<ReadyItem>,
307        max_events: usize,
308    ) -> Result<(), Errno>
309    where
310        L: LockEqualOrBefore<FileOpsCore>,
311    {
312        let mut state = self.state.lock();
313        // Move all the elements from `self.trigger_list` to this intermediary
314        // queue that we handle events from. This reduces the time spent holding
315        // `self.trigger_list`'s lock which reduces contention with objects that
316        // this epoll object has subscribed for notifications from.
317        state.processing_list.append(&mut self.waitable_state.lock().trigger_list);
318        while pending_list.len() < max_events && !state.processing_list.is_empty() {
319            if let Some(pending) = state.processing_list.pop_front() {
320                if let Some(wait) = state.wait_objects.get_mut(&pending.key) {
321                    // The weak pointer to the FileObject target can be gone if the file was closed
322                    // out from under us. If this happens it is not an error: ignore it and
323                    // continue.
324                    if let Some(target) = wait.target.upgrade() {
325                        let ready = ReadyItem {
326                            key: pending.key,
327                            events: target.query_events(locked, current_task)?,
328                        };
329                        if ready.events.intersects(wait.events) {
330                            pending_list.push(ready);
331                        } else {
332                            // Another thread already handled this event, wait for another one.
333                            self.wait_on_file(locked, current_task, pending.key, wait)?;
334                        }
335                    }
336                }
337            }
338        }
339        Ok(())
340    }
341
342    /// Waits until an event exists in `pending_list` or until `timeout` has
343    /// been reached.
344    fn wait_until_pending_event<L>(
345        &self,
346        locked: &mut Locked<L>,
347        current_task: &CurrentTask,
348        max_events: usize,
349        mut wait_deadline: zx::MonotonicInstant,
350    ) -> Result<Vec<ReadyItem>, Errno>
351    where
352        L: LockEqualOrBefore<FileOpsCore>,
353    {
354        let mut pending_list = Vec::new();
355
356        loop {
357            self.process_triggered_events(locked, current_task, &mut pending_list, max_events)?;
358
359            if pending_list.len() == max_events {
360                break; // No input events or output list full, nothing more we can do.
361            }
362
363            if !pending_list.is_empty() {
364                // We now know we have at least one event to return. We shouldn't return
365                // immediately, in case there are more events available, but the next loop should
366                // wait with a 0 timeout to prevent further blocking.
367                wait_deadline = zx::MonotonicInstant::ZERO;
368            }
369
370            // Loop back to check if there are more items in the Waiter's queue. Every wait_until()
371            // call will process a single event. In order to drain as many events as we can that
372            // are synchronously available, keep trying until it reports empty.
373            //
374            // The handlers in the waits cause items to be appended to trigger_list. See the closure
375            // in `wait_on_file` to see how this happens.
376            //
377            // This wait may return EINTR for nonzero timeouts which is not an error. We must be
378            // careful not to lose events if this happens.
379            //
380            // The first time through this loop we'll use the timeout passed into this function so
381            // can get EINTR. But since we haven't done anything or accumulated any results yet it's
382            // OK to immediately return and no information will be lost.
383            match self.waiter.wait_until(locked, current_task, wait_deadline) {
384                Err(err) if err == ETIMEDOUT => break,
385                Err(err) if err == EINTR => {
386                    // Terminating early will lose any events in the pending_list so that should
387                    // only be for unrecoverable errors (not EINTR). The only time there should be a
388                    // nonzero wait_deadline (and hence the ability to encounter EINTR) is when the
389                    // pending list is empty.
390                    debug_assert!(
391                        pending_list.is_empty(),
392                        "Got EINTR from wait of {}ns with {} items pending.",
393                        wait_deadline.into_nanos(),
394                        pending_list.len()
395                    );
396                    return Err(err);
397                }
398                // TODO check if this is supposed to actually fail!
399                result => result?,
400            }
401        }
402
403        Ok(pending_list)
404    }
405
406    /// Blocking wait on all waited upon events with a timeout.
407    pub fn wait<L>(
408        &self,
409        locked: &mut Locked<L>,
410        current_task: &CurrentTask,
411        max_events: usize,
412        deadline: zx::MonotonicInstant,
413    ) -> Result<Vec<EpollEvent>, Errno>
414    where
415        L: LockEqualOrBefore<FileOpsCore>,
416    {
417        {
418            let mut state = self.state.lock();
419            let recheck_list = std::mem::take(&mut state.recheck_list);
420            for key in recheck_list {
421                if let ReadyItemKey::Usize(key) = key {
422                    current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
423                        &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key)),
424                    );
425                }
426                let wait_object = state.wait_objects.get_mut(&key).unwrap();
427                self.do_recheck(locked, current_task, wait_object, key)?;
428            }
429        }
430
431        let pending_list =
432            self.wait_until_pending_event(locked, current_task, max_events, deadline)?;
433
434        // Process the pending list and add processed ReadyItem
435        // entries to the rearm_list for the next wait.
436        let mut result = vec![];
437        let mut state = self.state.lock();
438        let state = &mut *state;
439        for pending_event in pending_list.iter().unique_by(|e| e.key) {
440            // The wait could have been deleted by here,
441            // so ignore the None case.
442            let Some(wait) = state.wait_objects.get_mut(&pending_event.key) else { continue };
443
444            let reported_events = pending_event.events & wait.events;
445            result.push(EpollEvent::new(reported_events, wait.data));
446
447            // Files marked with `EPOLLONESHOT` should only notify
448            // once and need to be rearmed manually with epoll_ctl_mod().
449            if wait.events.contains(FdEvents::EPOLLONESHOT) {
450                continue;
451            }
452
453            self.wait_on_file_edge_triggered(locked, current_task, pending_event.key, wait)?;
454
455            if !wait.events.contains(FdEvents::EPOLLET) {
456                state.recheck_list.push(pending_event.key);
457            }
458
459            // TODO: is this really only supposed to happen for level-triggered events?
460            if !wait.events.contains(FdEvents::EPOLLET) {
461                // When this is the first time epoll_wait on this epoll fd, create and
462                // hold a wake lease until the next epoll_wait.
463                if wait.events.contains(FdEvents::EPOLLWAKEUP) {
464                    if let ReadyItemKey::Usize(key) = pending_event.key {
465                        current_task.kernel().suspend_resume_manager.activate_wakeup_source(
466                            WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(
467                                current_task,
468                                key,
469                            )),
470                        );
471                    }
472                }
473            }
474        }
475
476        Ok(result)
477    }
478}
479
480impl FileOps for EpollFileObject {
481    fileops_impl_nonseekable!();
482    fileops_impl_noop_sync!();
483    fileops_impl_dataless!();
484
485    fn wait_async(
486        &self,
487        _locked: &mut Locked<FileOpsCore>,
488        _file: &FileObject,
489        _current_task: &CurrentTask,
490        waiter: &Waiter,
491        events: FdEvents,
492        handler: EventHandler,
493    ) -> Option<WaitCanceler> {
494        Some(self.waitable_state.lock().waiters.wait_async_fd_events(waiter, events, handler))
495    }
496
497    fn query_events(
498        &self,
499        locked: &mut Locked<FileOpsCore>,
500        _file: &FileObject,
501        current_task: &CurrentTask,
502    ) -> Result<FdEvents, Errno> {
503        let mut events = FdEvents::empty();
504        let state = self.state.lock();
505        if !state.processing_list.is_empty() || !self.waitable_state.lock().trigger_list.is_empty()
506        {
507            events |= FdEvents::POLLIN;
508        } else {
509            for key in &state.recheck_list {
510                let wait_object = state.wait_objects.get(&key).unwrap();
511                let Some(target) = wait_object.target() else { continue };
512                if !(target.query_events(locked, current_task)? & wait_object.events).is_empty() {
513                    events |= FdEvents::POLLIN;
514                    break;
515                }
516            }
517        }
518        Ok(events)
519    }
520
521    fn close(
522        self: Box<Self>,
523        _locked: &mut Locked<FileOpsCore>,
524        _file: &FileObjectState,
525        current_task: &CurrentTask,
526    ) {
527        let guard = self.state.lock();
528        for (key, _wait_object) in guard.wait_objects.iter() {
529            if let ReadyItemKey::Usize(key) = key {
530                current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
531                    &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, *key)),
532                );
533            }
534        }
535    }
536}
537
538#[derive(Clone)]
539pub struct EpollEventHandler {
540    key: ReadyItemKey,
541    waitable_state: Arc<Mutex<EpollWaitableState>>,
542}
543
544impl EpollEventHandler {
545    pub fn handle(self, events: FdEvents) {
546        let mut waitable_state = self.waitable_state.lock();
547        waitable_state.trigger_list.push_back(ReadyItem { key: self.key, events });
548        waitable_state.waiters.notify_fd_events(FdEvents::POLLIN);
549    }
550}
551
552impl EpollFileObject {
553    fn new_wait_handler(&self, key: ReadyItemKey) -> EventHandler {
554        EventHandler::Epoll(EpollEventHandler {
555            key,
556            waitable_state: Arc::clone(&self.waitable_state),
557        })
558    }
559}
560
561#[cfg(test)]
562mod tests {
563    use super::*;
564    use crate::fs::fuchsia::create_fuchsia_pipe;
565    use crate::task::Waiter;
566    use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
567    use crate::testing::spawn_kernel_and_run;
568    use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
569    use crate::vfs::eventfd::{EventFdType, new_eventfd};
570    use crate::vfs::fs_registry::FsRegistry;
571    use crate::vfs::pipe::{new_pipe, register_pipe_fs};
572    use crate::vfs::socket::{SocketDomain, SocketType, UnixSocket};
573    use starnix_lifecycle::AtomicUsizeCounter;
574    use starnix_sync::Unlocked;
575    use starnix_uapi::vfs::{EpollEvent, FdEvents};
576    use syncio::Zxio;
577    use zx::{
578        HandleBased, {self as zx},
579    };
580
581    #[::fuchsia::test]
582    async fn test_epoll_read_ready() {
583        static WRITE_COUNT: AtomicUsizeCounter = AtomicUsizeCounter::new(0);
584        const EVENT_DATA: u64 = 42;
585
586        spawn_kernel_and_run(async |locked, current_task| {
587            let kernel = current_task.kernel();
588            register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
589
590            let (pipe_out, pipe_in) = new_pipe(locked, &current_task).unwrap();
591
592            let test_string = "hello starnix".to_string();
593            let test_len = test_string.len();
594
595            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
596            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
597            epoll_file
598                .add(
599                    locked,
600                    &current_task,
601                    &pipe_out,
602                    &epoll_file_handle,
603                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
604                )
605                .unwrap();
606
607            let (sender, receiver) = std::sync::mpsc::channel();
608            let value = test_string.clone();
609            let closure = move |locked: &mut Locked<Unlocked>, task: &CurrentTask| {
610                let bytes_written = pipe_in
611                    .write(locked, &task, &mut VecInputBuffer::new(value.as_bytes()))
612                    .unwrap();
613                assert_eq!(bytes_written, test_len);
614                WRITE_COUNT.add(bytes_written);
615                sender.send(()).unwrap();
616            };
617            let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
618            kernel.kthreads.spawner().spawn_from_request(req);
619            let events =
620                epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
621            receiver.recv().unwrap();
622            assert_eq!(1, events.len());
623            let event = &events[0];
624            assert!(event.events().contains(FdEvents::POLLIN));
625            assert_eq!(event.data(), EVENT_DATA);
626
627            let mut buffer = VecOutputBuffer::new(test_len);
628            let bytes_read = pipe_out.read(locked, &current_task, &mut buffer).unwrap();
629            assert_eq!(bytes_read, WRITE_COUNT.get());
630            assert_eq!(bytes_read, test_len);
631            assert_eq!(buffer.data(), test_string.as_bytes());
632        })
633        .await;
634    }
635
636    #[::fuchsia::test]
637    async fn test_epoll_ready_then_wait() {
638        const EVENT_DATA: u64 = 42;
639
640        spawn_kernel_and_run(async |locked, current_task| {
641            let kernel = current_task.kernel();
642            register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
643
644            let (pipe_out, pipe_in) = new_pipe(locked, &current_task).unwrap();
645
646            let test_string = "hello starnix".to_string();
647            let test_bytes = test_string.as_bytes();
648            let test_len = test_bytes.len();
649
650            assert_eq!(
651                pipe_in.write(locked, &current_task, &mut VecInputBuffer::new(test_bytes)).unwrap(),
652                test_bytes.len()
653            );
654
655            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
656            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
657            epoll_file
658                .add(
659                    locked,
660                    &current_task,
661                    &pipe_out,
662                    &epoll_file_handle,
663                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
664                )
665                .unwrap();
666
667            let events =
668                epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
669            assert_eq!(1, events.len());
670            let event = &events[0];
671            assert!(event.events().contains(FdEvents::POLLIN));
672            assert_eq!(event.data(), EVENT_DATA);
673
674            let mut buffer = VecOutputBuffer::new(test_len);
675            let bytes_read = pipe_out.read(locked, &current_task, &mut buffer).unwrap();
676            assert_eq!(bytes_read, test_len);
677            assert_eq!(buffer.data(), test_bytes);
678        })
679        .await;
680    }
681
682    #[::fuchsia::test]
683    async fn test_epoll_ctl_cancel() {
684        spawn_kernel_and_run(async |locked, current_task| {
685            for do_cancel in [true, false] {
686                let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
687                let waiter = Waiter::new();
688
689                let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
690                let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
691                const EVENT_DATA: u64 = 42;
692                epoll_file
693                    .add(
694                        locked,
695                        &current_task,
696                        &event,
697                        &epoll_file_handle,
698                        EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
699                    )
700                    .unwrap();
701
702                if do_cancel {
703                    epoll_file.delete(&event).unwrap();
704                }
705
706                let wait_canceler = event
707                    .wait_async(
708                        locked,
709                        &current_task,
710                        &waiter,
711                        FdEvents::POLLIN,
712                        EventHandler::None,
713                    )
714                    .expect("wait_async");
715                if do_cancel {
716                    wait_canceler.cancel();
717                }
718
719                let add_val = 1u64;
720
721                assert_eq!(
722                    event
723                        .write(
724                            locked,
725                            &current_task,
726                            &mut VecInputBuffer::new(&add_val.to_ne_bytes())
727                        )
728                        .unwrap(),
729                    std::mem::size_of::<u64>()
730                );
731
732                let events =
733                    epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO).unwrap();
734
735                if do_cancel {
736                    assert_eq!(0, events.len());
737                } else {
738                    assert_eq!(1, events.len());
739                    let event = &events[0];
740                    assert!(event.events().contains(FdEvents::POLLIN));
741                    assert_eq!(event.data(), EVENT_DATA);
742                }
743            }
744        })
745        .await;
746    }
747
748    #[::fuchsia::test]
749    async fn test_multiple_events() {
750        spawn_kernel_and_run(async |locked, current_task| {
751            let (client1, server1) = zx::Socket::create_stream();
752            let (client2, server2) = zx::Socket::create_stream();
753            let pipe1 = create_fuchsia_pipe(locked, &current_task, client1, OpenFlags::RDWR)
754                .expect("create_fuchsia_pipe");
755            let pipe2 = create_fuchsia_pipe(locked, &current_task, client2, OpenFlags::RDWR)
756                .expect("create_fuchsia_pipe");
757            let server1_zxio = Zxio::create(server1.into_handle()).expect("Zxio::create");
758            let server2_zxio = Zxio::create(server2.into_handle()).expect("Zxio::create");
759
760            let poll = |locked: &mut Locked<Unlocked>| {
761                let epoll_object = EpollFileObject::new_file(locked, &current_task);
762                let epoll_file = epoll_object.downcast_file::<EpollFileObject>().unwrap();
763                epoll_file
764                    .add(
765                        locked,
766                        &current_task,
767                        &pipe1,
768                        &epoll_object,
769                        EpollEvent::new(FdEvents::POLLIN, 1),
770                    )
771                    .expect("epoll_file.add");
772                epoll_file
773                    .add(
774                        locked,
775                        &current_task,
776                        &pipe2,
777                        &epoll_object,
778                        EpollEvent::new(FdEvents::POLLIN, 2),
779                    )
780                    .expect("epoll_file.add");
781                epoll_file.wait(locked, &current_task, 2, zx::MonotonicInstant::ZERO).expect("wait")
782            };
783
784            let fds = poll(locked);
785            assert!(fds.is_empty());
786
787            assert_eq!(server1_zxio.write(&[0]).expect("write"), 1);
788
789            let fds = poll(locked);
790            assert_eq!(fds.len(), 1);
791            assert_eq!(fds[0].events(), FdEvents::POLLIN);
792            assert_eq!(fds[0].data(), 1);
793            assert_eq!(
794                pipe1.read(locked, &current_task, &mut VecOutputBuffer::new(64)).expect("read"),
795                1
796            );
797
798            let fds = poll(locked);
799            assert!(fds.is_empty());
800
801            assert_eq!(server2_zxio.write(&[0]).expect("write"), 1);
802
803            let fds = poll(locked);
804            assert_eq!(fds.len(), 1);
805            assert_eq!(fds[0].events(), FdEvents::POLLIN);
806            assert_eq!(fds[0].data(), 2);
807            assert_eq!(
808                pipe2.read(locked, &current_task, &mut VecOutputBuffer::new(64)).expect("read"),
809                1
810            );
811
812            let fds = poll(locked);
813            assert!(fds.is_empty());
814        })
815        .await;
816    }
817
818    #[::fuchsia::test]
819    async fn test_cancel_after_notify() {
820        spawn_kernel_and_run(async |locked, current_task| {
821            let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
822            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
823            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
824
825            // Add a thing
826            const EVENT_DATA: u64 = 42;
827            epoll_file
828                .add(
829                    locked,
830                    &current_task,
831                    &event,
832                    &epoll_file_handle,
833                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
834                )
835                .unwrap();
836
837            // Make the thing send a notification, wait for it
838            let add_val = 1u64;
839            assert_eq!(
840                event
841                    .write(locked, &current_task, &mut VecInputBuffer::new(&add_val.to_ne_bytes()))
842                    .unwrap(),
843                std::mem::size_of::<u64>()
844            );
845
846            assert_eq!(
847                epoll_file
848                    .wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO)
849                    .unwrap()
850                    .len(),
851                1
852            );
853
854            // Remove the thing
855            epoll_file.delete(&event).unwrap();
856
857            // Wait for new notifications
858            assert_eq!(
859                epoll_file
860                    .wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO)
861                    .unwrap()
862                    .len(),
863                0
864            );
865            // That shouldn't crash
866        })
867        .await;
868    }
869
870    #[::fuchsia::test]
871    async fn test_add_then_modify() {
872        spawn_kernel_and_run(async |locked, current_task| {
873            let (socket1, _socket2) = UnixSocket::new_pair(
874                locked,
875                &current_task,
876                SocketDomain::Unix,
877                SocketType::Stream,
878                OpenFlags::RDWR,
879            )
880            .expect("Failed to create socket pair.");
881
882            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
883            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
884
885            const EVENT_DATA: u64 = 42;
886            epoll_file
887                .add(
888                    locked,
889                    &current_task,
890                    &socket1,
891                    &epoll_file_handle,
892                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
893                )
894                .unwrap();
895            assert_eq!(
896                epoll_file
897                    .wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO)
898                    .unwrap()
899                    .len(),
900                0
901            );
902
903            let read_write_event = FdEvents::POLLIN | FdEvents::POLLOUT;
904            epoll_file
905                .modify(
906                    locked,
907                    &current_task,
908                    &socket1,
909                    EpollEvent::new(read_write_event, EVENT_DATA),
910                )
911                .unwrap();
912            let triggered_events =
913                epoll_file.wait(locked, &current_task, 10, zx::MonotonicInstant::ZERO).unwrap();
914            assert_eq!(1, triggered_events.len());
915            let event = &triggered_events[0];
916            assert_eq!(event.events(), FdEvents::POLLOUT);
917            assert_eq!(event.data(), EVENT_DATA);
918        })
919        .await;
920    }
921
922    #[::fuchsia::test]
923    async fn test_waiter_removal() {
924        spawn_kernel_and_run(async |locked, current_task| {
925            let event = new_eventfd(locked, &current_task, 0, EventFdType::Counter, true);
926            let epoll_file_handle = EpollFileObject::new_file(locked, &current_task);
927            let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
928
929            const EVENT_DATA: u64 = 42;
930            epoll_file
931                .add(
932                    locked,
933                    &current_task,
934                    &event,
935                    &epoll_file_handle,
936                    EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
937                )
938                .unwrap();
939
940            std::mem::drop(event);
941
942            assert!(epoll_file.waitable_state.lock().waiters.is_empty());
943        })
944        .await;
945    }
946}