1use crate::power::WakeupSourceOrigin;
6use crate::task::{
7 CurrentTask, EventHandler, ReadyItem, ReadyItemKey, WaitCanceler, WaitQueue, Waiter,
8};
9use crate::vfs::{
10 Anon, FileHandle, FileObject, FileObjectState, FileOps, WeakFileHandle, fileops_impl_dataless,
11 fileops_impl_nonseekable, fileops_impl_noop_sync,
12};
13use itertools::Itertools;
14use starnix_logging::log_warn;
15use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex};
16use starnix_uapi::error;
17use starnix_uapi::errors::{EINTR, ETIMEDOUT, Errno};
18use starnix_uapi::open_flags::OpenFlags;
19use starnix_uapi::vfs::{EpollEvent, FdEvents};
20use std::collections::hash_map::Entry;
21use std::collections::{HashMap, VecDeque};
22use std::sync::Arc;
23
24const MAX_NESTED_DEPTH: u32 = 5;
27
28struct WaitObject {
34 target: WeakFileHandle,
35 events: FdEvents,
36 data: u64,
37 wait_canceler: Option<WaitCanceler>,
38}
39
40impl WaitObject {
41 fn target(&self) -> Option<FileHandle> {
46 self.target.upgrade()
47 }
48}
49
50pub type EpollKey = usize;
53
54pub fn wakeup_source_name_for_epoll(current_task: &CurrentTask, key: EpollKey) -> String {
55 format!("[{}] {}", current_task.command(), key)
56}
57
58#[derive(Default)]
61pub struct EpollFileObject {
62 waiter: Waiter,
63 state: Mutex<EpollState>,
65 waitable_state: Arc<Mutex<EpollWaitableState>>,
66}
67
68#[derive(Default)]
69struct EpollState {
70 wait_objects: HashMap<ReadyItemKey, WaitObject>,
73 processing_list: VecDeque<ReadyItem>,
81 recheck_list: Vec<ReadyItemKey>,
87}
88
89#[derive(Default)]
90struct EpollWaitableState {
91 trigger_list: VecDeque<ReadyItem>,
94 waiters: WaitQueue,
97}
98
99impl EpollFileObject {
100 pub fn new_file<L>(locked: &mut Locked<L>, current_task: &CurrentTask) -> FileHandle
102 where
103 L: LockEqualOrBefore<FileOpsCore>,
104 {
105 let epoll = Box::new(EpollFileObject::default());
106
107 #[cfg(any(test, debug_assertions))]
108 {
109 let _l1 = epoll.state.lock();
110 let _l2 = epoll.waitable_state.lock();
111 }
112
113 Anon::new_private_file(locked, current_task, epoll, OpenFlags::RDWR, "[eventpoll]")
114 }
115
116 fn wait_on_file<L>(
117 &self,
118 locked: &mut Locked<L>,
119 current_task: &CurrentTask,
120 key: ReadyItemKey,
121 wait_object: &mut WaitObject,
122 ) -> Result<(), Errno>
123 where
124 L: LockEqualOrBefore<FileOpsCore>,
125 {
126 self.wait_on_file_edge_triggered(locked, current_task, key, wait_object)?;
128
129 self.do_recheck(locked, current_task, wait_object, key)?;
130
131 Ok(())
132 }
133
134 fn do_recheck<L>(
135 &self,
136 locked: &mut Locked<L>,
137 current_task: &CurrentTask,
138 wait_object: &mut WaitObject,
139 key: ReadyItemKey,
140 ) -> Result<(), Errno>
141 where
142 L: LockEqualOrBefore<FileOpsCore>,
143 {
144 let Some(target) = wait_object.target() else { return Ok(()) };
145 let events = target.query_events(locked, current_task)?;
146 if !(events & wait_object.events).is_empty() {
147 self.waiter.wake_immediately(events, self.new_wait_handler(key));
148 if let Some(wait_canceler) = wait_object.wait_canceler.take() {
149 wait_canceler.cancel();
150 } else {
151 log_warn!("wait canceler should have been set by `wait_on_file_edge_triggered`");
152 }
153 }
154 Ok(())
155 }
156
157 fn wait_on_file_edge_triggered<L>(
158 &self,
159 locked: &mut Locked<L>,
160 current_task: &CurrentTask,
161 key: ReadyItemKey,
162 wait_object: &mut WaitObject,
163 ) -> Result<(), Errno>
164 where
165 L: LockEqualOrBefore<FileOpsCore>,
166 {
167 let Some(target) = wait_object.target() else {
168 return Ok(());
169 };
170
171 wait_object.wait_canceler = target.wait_async(
172 locked,
173 current_task,
174 &self.waiter,
175 wait_object.events,
176 self.new_wait_handler(key),
177 );
178 if wait_object.wait_canceler.is_none() {
179 return error!(EPERM);
180 }
181 Ok(())
182 }
183
184 fn check_eloop(&self, parent: &FileHandle, depth_left: u32) -> Result<(), Errno> {
187 if depth_left == 0 {
188 return error!(ELOOP);
189 }
190
191 let state = self.state.lock();
192 for nested_object in state.wait_objects.values() {
193 let Some(child) = nested_object.target() else {
194 continue;
195 };
196 let Some(child_file) = child.downcast_file::<EpollFileObject>() else {
197 continue;
198 };
199
200 if Arc::ptr_eq(&child, parent) {
201 return error!(ELOOP);
202 }
203 child_file.check_eloop(parent, depth_left - 1)?;
204 }
205
206 Ok(())
207 }
208
209 pub fn add<L>(
211 &self,
212 locked: &mut Locked<L>,
213 current_task: &CurrentTask,
214 file: &FileHandle,
215 epoll_file_handle: &FileHandle,
216 epoll_event: EpollEvent,
217 ) -> Result<(), Errno>
218 where
219 L: LockEqualOrBefore<FileOpsCore>,
220 {
221 if let Some(epoll_to_add) = file.downcast_file::<EpollFileObject>() {
223 epoll_to_add.check_eloop(epoll_file_handle, MAX_NESTED_DEPTH - 1)?;
226 }
227
228 let mut state = self.state.lock();
229 let key = file.id.as_epoll_key().into();
230 match state.wait_objects.entry(key) {
231 Entry::Occupied(_) => error!(EEXIST),
232 Entry::Vacant(entry) => {
233 let wait_object = entry.insert(WaitObject {
234 target: Arc::downgrade(file),
235 events: epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR,
236 data: epoll_event.data(),
237 wait_canceler: None,
238 });
239 self.wait_on_file(locked, current_task, key, wait_object)
240 }
241 }
242 }
243
244 pub fn modify<L>(
246 &self,
247 locked: &mut Locked<L>,
248 current_task: &CurrentTask,
249 file: &FileHandle,
250 epoll_event: EpollEvent,
251 ) -> Result<(), Errno>
252 where
253 L: LockEqualOrBefore<FileOpsCore>,
254 {
255 let mut state = self.state.lock();
256 let key = file.id.as_epoll_key();
257 state.recheck_list.retain(|x| *x != key.into());
258 let Some(wait_object) = state.wait_objects.get_mut(&key.into()) else {
259 return error!(ENOENT);
260 };
261 if let Some(wait_canceler) = wait_object.wait_canceler.take() {
262 wait_canceler.cancel();
263 }
264 wait_object.events = epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR;
265 wait_object.data = epoll_event.data();
266 if wait_object.events.contains(FdEvents::EPOLLWAKEUP)
270 && !epoll_event.events().contains(FdEvents::EPOLLWAKEUP)
271 {
272 current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
273 &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key)),
274 );
275 }
276 self.wait_on_file(locked, current_task, key.into(), wait_object)
277 }
278
279 pub fn delete(&self, file: &FileObject) -> Result<(), Errno> {
282 let mut state = self.state.lock();
283 let key = file.id.as_epoll_key().into();
284 if let Some(mut wait_object) = state.wait_objects.remove(&key) {
285 if let Some(wait_canceler) = wait_object.wait_canceler.take() {
286 wait_canceler.cancel();
287 }
288 state.recheck_list.retain(|x| *x != key);
289 Ok(())
290 } else {
291 error!(ENOENT)
292 }
293 }
294
295 fn process_triggered_events<L>(
303 &self,
304 locked: &mut Locked<L>,
305 current_task: &CurrentTask,
306 pending_list: &mut Vec<ReadyItem>,
307 max_events: usize,
308 ) -> Result<(), Errno>
309 where
310 L: LockEqualOrBefore<FileOpsCore>,
311 {
312 let mut state = self.state.lock();
313 state.processing_list.append(&mut self.waitable_state.lock().trigger_list);
318 while pending_list.len() < max_events && !state.processing_list.is_empty() {
319 if let Some(pending) = state.processing_list.pop_front() {
320 if let Some(wait) = state.wait_objects.get_mut(&pending.key) {
321 if let Some(target) = wait.target.upgrade() {
325 let ready = ReadyItem {
326 key: pending.key,
327 events: target.query_events(locked, current_task)?,
328 };
329 if ready.events.intersects(wait.events) {
330 pending_list.push(ready);
331 } else {
332 self.wait_on_file(locked, current_task, pending.key, wait)?;
334 }
335 }
336 }
337 }
338 }
339 Ok(())
340 }
341
342 fn wait_until_pending_event<L>(
345 &self,
346 locked: &mut Locked<L>,
347 current_task: &CurrentTask,
348 max_events: usize,
349 mut wait_deadline: zx::MonotonicInstant,
350 ) -> Result<Vec<ReadyItem>, Errno>
351 where
352 L: LockEqualOrBefore<FileOpsCore>,
353 {
354 let mut pending_list = Vec::new();
355
356 loop {
357 self.process_triggered_events(locked, current_task, &mut pending_list, max_events)?;
358
359 if pending_list.len() == max_events {
360 break; }
362
363 if !pending_list.is_empty() {
364 wait_deadline = zx::MonotonicInstant::ZERO;
368 }
369
370 match self.waiter.wait_until(locked, current_task, wait_deadline) {
384 Err(err) if err == ETIMEDOUT => break,
385 Err(err) if err == EINTR => {
386 debug_assert!(
391 pending_list.is_empty(),
392 "Got EINTR from wait of {}ns with {} items pending.",
393 wait_deadline.into_nanos(),
394 pending_list.len()
395 );
396 return Err(err);
397 }
398 result => result?,
400 }
401 }
402
403 Ok(pending_list)
404 }
405
406 pub fn wait<L>(
408 &self,
409 locked: &mut Locked<L>,
410 current_task: &CurrentTask,
411 max_events: usize,
412 deadline: zx::MonotonicInstant,
413 ) -> Result<Vec<EpollEvent>, Errno>
414 where
415 L: LockEqualOrBefore<FileOpsCore>,
416 {
417 {
418 let mut state = self.state.lock();
419 let recheck_list = std::mem::take(&mut state.recheck_list);
420 for key in recheck_list {
421 if let ReadyItemKey::Usize(key) = key {
422 current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
423 &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key)),
424 );
425 }
426 let wait_object = state.wait_objects.get_mut(&key).unwrap();
427 self.do_recheck(locked, current_task, wait_object, key)?;
428 }
429 }
430
431 let pending_list =
432 self.wait_until_pending_event(locked, current_task, max_events, deadline)?;
433
434 let mut result = vec![];
437 let mut state = self.state.lock();
438 let state = &mut *state;
439 for pending_event in pending_list.iter().unique_by(|e| e.key) {
440 let Some(wait) = state.wait_objects.get_mut(&pending_event.key) else { continue };
443
444 let reported_events = pending_event.events & wait.events;
445 result.push(EpollEvent::new(reported_events, wait.data));
446
447 if wait.events.contains(FdEvents::EPOLLONESHOT) {
450 continue;
451 }
452
453 self.wait_on_file_edge_triggered(locked, current_task, pending_event.key, wait)?;
454
455 if !wait.events.contains(FdEvents::EPOLLET) {
456 state.recheck_list.push(pending_event.key);
457 }
458
459 if !wait.events.contains(FdEvents::EPOLLET) {
461 if wait.events.contains(FdEvents::EPOLLWAKEUP) {
464 if let ReadyItemKey::Usize(key) = pending_event.key {
465 current_task.kernel().suspend_resume_manager.activate_wakeup_source(
466 WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(
467 current_task,
468 key,
469 )),
470 );
471 }
472 }
473 }
474 }
475
476 Ok(result)
477 }
478}
479
480impl FileOps for EpollFileObject {
481 fileops_impl_nonseekable!();
482 fileops_impl_noop_sync!();
483 fileops_impl_dataless!();
484
485 fn wait_async(
486 &self,
487 _locked: &mut Locked<FileOpsCore>,
488 _file: &FileObject,
489 _current_task: &CurrentTask,
490 waiter: &Waiter,
491 events: FdEvents,
492 handler: EventHandler,
493 ) -> Option<WaitCanceler> {
494 Some(self.waitable_state.lock().waiters.wait_async_fd_events(waiter, events, handler))
495 }
496
497 fn query_events(
498 &self,
499 locked: &mut Locked<FileOpsCore>,
500 _file: &FileObject,
501 current_task: &CurrentTask,
502 ) -> Result<FdEvents, Errno> {
503 let mut events = FdEvents::empty();
504 let state = self.state.lock();
505 if !state.processing_list.is_empty() || !self.waitable_state.lock().trigger_list.is_empty()
506 {
507 events |= FdEvents::POLLIN;
508 } else {
509 for key in &state.recheck_list {
510 let wait_object = state.wait_objects.get(&key).unwrap();
511 let Some(target) = wait_object.target() else { continue };
512 if !(target.query_events(locked, current_task)? & wait_object.events).is_empty() {
513 events |= FdEvents::POLLIN;
514 break;
515 }
516 }
517 }
518 Ok(events)
519 }
520
521 fn close(
522 self: Box<Self>,
523 _locked: &mut Locked<FileOpsCore>,
524 _file: &FileObjectState,
525 current_task: &CurrentTask,
526 ) {
527 let guard = self.state.lock();
528 for (key, _wait_object) in guard.wait_objects.iter() {
529 if let ReadyItemKey::Usize(key) = key {
530 current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
531 &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, *key)),
532 );
533 }
534 }
535 }
536}
537
538#[derive(Clone)]
539pub struct EpollEventHandler {
540 key: ReadyItemKey,
541 waitable_state: Arc<Mutex<EpollWaitableState>>,
542}
543
544impl EpollEventHandler {
545 pub fn handle(self, events: FdEvents) {
546 let mut waitable_state = self.waitable_state.lock();
547 waitable_state.trigger_list.push_back(ReadyItem { key: self.key, events });
548 waitable_state.waiters.notify_fd_events(FdEvents::POLLIN);
549 }
550}
551
552impl EpollFileObject {
553 fn new_wait_handler(&self, key: ReadyItemKey) -> EventHandler {
554 EventHandler::Epoll(EpollEventHandler {
555 key,
556 waitable_state: Arc::clone(&self.waitable_state),
557 })
558 }
559}
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564 use crate::fs::fuchsia::create_fuchsia_pipe;
565 use crate::task::Waiter;
566 use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
567 use crate::testing::spawn_kernel_and_run;
568 use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
569 use crate::vfs::eventfd::{EventFdType, new_eventfd};
570 use crate::vfs::fs_registry::FsRegistry;
571 use crate::vfs::pipe::{new_pipe, register_pipe_fs};
572 use crate::vfs::socket::{SocketDomain, SocketType, UnixSocket};
573 use starnix_lifecycle::AtomicUsizeCounter;
574 use starnix_sync::Unlocked;
575 use starnix_uapi::vfs::{EpollEvent, FdEvents};
576 use syncio::Zxio;
577 use zx::{
578 HandleBased, {self as zx},
579 };
580
581 #[::fuchsia::test]
582 async fn test_epoll_read_ready() {
583 static WRITE_COUNT: AtomicUsizeCounter = AtomicUsizeCounter::new(0);
584 const EVENT_DATA: u64 = 42;
585
586 spawn_kernel_and_run(async |locked, current_task| {
587 let kernel = current_task.kernel();
588 register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
589
590 let (pipe_out, pipe_in) = new_pipe(locked, ¤t_task).unwrap();
591
592 let test_string = "hello starnix".to_string();
593 let test_len = test_string.len();
594
595 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
596 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
597 epoll_file
598 .add(
599 locked,
600 ¤t_task,
601 &pipe_out,
602 &epoll_file_handle,
603 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
604 )
605 .unwrap();
606
607 let (sender, receiver) = std::sync::mpsc::channel();
608 let value = test_string.clone();
609 let closure = move |locked: &mut Locked<Unlocked>, task: &CurrentTask| {
610 let bytes_written = pipe_in
611 .write(locked, &task, &mut VecInputBuffer::new(value.as_bytes()))
612 .unwrap();
613 assert_eq!(bytes_written, test_len);
614 WRITE_COUNT.add(bytes_written);
615 sender.send(()).unwrap();
616 };
617 let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
618 kernel.kthreads.spawner().spawn_from_request(req);
619 let events =
620 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
621 receiver.recv().unwrap();
622 assert_eq!(1, events.len());
623 let event = &events[0];
624 assert!(event.events().contains(FdEvents::POLLIN));
625 assert_eq!(event.data(), EVENT_DATA);
626
627 let mut buffer = VecOutputBuffer::new(test_len);
628 let bytes_read = pipe_out.read(locked, ¤t_task, &mut buffer).unwrap();
629 assert_eq!(bytes_read, WRITE_COUNT.get());
630 assert_eq!(bytes_read, test_len);
631 assert_eq!(buffer.data(), test_string.as_bytes());
632 })
633 .await;
634 }
635
636 #[::fuchsia::test]
637 async fn test_epoll_ready_then_wait() {
638 const EVENT_DATA: u64 = 42;
639
640 spawn_kernel_and_run(async |locked, current_task| {
641 let kernel = current_task.kernel();
642 register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
643
644 let (pipe_out, pipe_in) = new_pipe(locked, ¤t_task).unwrap();
645
646 let test_string = "hello starnix".to_string();
647 let test_bytes = test_string.as_bytes();
648 let test_len = test_bytes.len();
649
650 assert_eq!(
651 pipe_in.write(locked, ¤t_task, &mut VecInputBuffer::new(test_bytes)).unwrap(),
652 test_bytes.len()
653 );
654
655 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
656 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
657 epoll_file
658 .add(
659 locked,
660 ¤t_task,
661 &pipe_out,
662 &epoll_file_handle,
663 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
664 )
665 .unwrap();
666
667 let events =
668 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
669 assert_eq!(1, events.len());
670 let event = &events[0];
671 assert!(event.events().contains(FdEvents::POLLIN));
672 assert_eq!(event.data(), EVENT_DATA);
673
674 let mut buffer = VecOutputBuffer::new(test_len);
675 let bytes_read = pipe_out.read(locked, ¤t_task, &mut buffer).unwrap();
676 assert_eq!(bytes_read, test_len);
677 assert_eq!(buffer.data(), test_bytes);
678 })
679 .await;
680 }
681
682 #[::fuchsia::test]
683 async fn test_epoll_ctl_cancel() {
684 spawn_kernel_and_run(async |locked, current_task| {
685 for do_cancel in [true, false] {
686 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
687 let waiter = Waiter::new();
688
689 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
690 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
691 const EVENT_DATA: u64 = 42;
692 epoll_file
693 .add(
694 locked,
695 ¤t_task,
696 &event,
697 &epoll_file_handle,
698 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
699 )
700 .unwrap();
701
702 if do_cancel {
703 epoll_file.delete(&event).unwrap();
704 }
705
706 let wait_canceler = event
707 .wait_async(
708 locked,
709 ¤t_task,
710 &waiter,
711 FdEvents::POLLIN,
712 EventHandler::None,
713 )
714 .expect("wait_async");
715 if do_cancel {
716 wait_canceler.cancel();
717 }
718
719 let add_val = 1u64;
720
721 assert_eq!(
722 event
723 .write(
724 locked,
725 ¤t_task,
726 &mut VecInputBuffer::new(&add_val.to_ne_bytes())
727 )
728 .unwrap(),
729 std::mem::size_of::<u64>()
730 );
731
732 let events =
733 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO).unwrap();
734
735 if do_cancel {
736 assert_eq!(0, events.len());
737 } else {
738 assert_eq!(1, events.len());
739 let event = &events[0];
740 assert!(event.events().contains(FdEvents::POLLIN));
741 assert_eq!(event.data(), EVENT_DATA);
742 }
743 }
744 })
745 .await;
746 }
747
748 #[::fuchsia::test]
749 async fn test_multiple_events() {
750 spawn_kernel_and_run(async |locked, current_task| {
751 let (client1, server1) = zx::Socket::create_stream();
752 let (client2, server2) = zx::Socket::create_stream();
753 let pipe1 = create_fuchsia_pipe(locked, ¤t_task, client1, OpenFlags::RDWR)
754 .expect("create_fuchsia_pipe");
755 let pipe2 = create_fuchsia_pipe(locked, ¤t_task, client2, OpenFlags::RDWR)
756 .expect("create_fuchsia_pipe");
757 let server1_zxio = Zxio::create(server1.into_handle()).expect("Zxio::create");
758 let server2_zxio = Zxio::create(server2.into_handle()).expect("Zxio::create");
759
760 let poll = |locked: &mut Locked<Unlocked>| {
761 let epoll_object = EpollFileObject::new_file(locked, ¤t_task);
762 let epoll_file = epoll_object.downcast_file::<EpollFileObject>().unwrap();
763 epoll_file
764 .add(
765 locked,
766 ¤t_task,
767 &pipe1,
768 &epoll_object,
769 EpollEvent::new(FdEvents::POLLIN, 1),
770 )
771 .expect("epoll_file.add");
772 epoll_file
773 .add(
774 locked,
775 ¤t_task,
776 &pipe2,
777 &epoll_object,
778 EpollEvent::new(FdEvents::POLLIN, 2),
779 )
780 .expect("epoll_file.add");
781 epoll_file.wait(locked, ¤t_task, 2, zx::MonotonicInstant::ZERO).expect("wait")
782 };
783
784 let fds = poll(locked);
785 assert!(fds.is_empty());
786
787 assert_eq!(server1_zxio.write(&[0]).expect("write"), 1);
788
789 let fds = poll(locked);
790 assert_eq!(fds.len(), 1);
791 assert_eq!(fds[0].events(), FdEvents::POLLIN);
792 assert_eq!(fds[0].data(), 1);
793 assert_eq!(
794 pipe1.read(locked, ¤t_task, &mut VecOutputBuffer::new(64)).expect("read"),
795 1
796 );
797
798 let fds = poll(locked);
799 assert!(fds.is_empty());
800
801 assert_eq!(server2_zxio.write(&[0]).expect("write"), 1);
802
803 let fds = poll(locked);
804 assert_eq!(fds.len(), 1);
805 assert_eq!(fds[0].events(), FdEvents::POLLIN);
806 assert_eq!(fds[0].data(), 2);
807 assert_eq!(
808 pipe2.read(locked, ¤t_task, &mut VecOutputBuffer::new(64)).expect("read"),
809 1
810 );
811
812 let fds = poll(locked);
813 assert!(fds.is_empty());
814 })
815 .await;
816 }
817
818 #[::fuchsia::test]
819 async fn test_cancel_after_notify() {
820 spawn_kernel_and_run(async |locked, current_task| {
821 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
822 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
823 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
824
825 const EVENT_DATA: u64 = 42;
827 epoll_file
828 .add(
829 locked,
830 ¤t_task,
831 &event,
832 &epoll_file_handle,
833 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
834 )
835 .unwrap();
836
837 let add_val = 1u64;
839 assert_eq!(
840 event
841 .write(locked, ¤t_task, &mut VecInputBuffer::new(&add_val.to_ne_bytes()))
842 .unwrap(),
843 std::mem::size_of::<u64>()
844 );
845
846 assert_eq!(
847 epoll_file
848 .wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO)
849 .unwrap()
850 .len(),
851 1
852 );
853
854 epoll_file.delete(&event).unwrap();
856
857 assert_eq!(
859 epoll_file
860 .wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO)
861 .unwrap()
862 .len(),
863 0
864 );
865 })
867 .await;
868 }
869
870 #[::fuchsia::test]
871 async fn test_add_then_modify() {
872 spawn_kernel_and_run(async |locked, current_task| {
873 let (socket1, _socket2) = UnixSocket::new_pair(
874 locked,
875 ¤t_task,
876 SocketDomain::Unix,
877 SocketType::Stream,
878 OpenFlags::RDWR,
879 )
880 .expect("Failed to create socket pair.");
881
882 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
883 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
884
885 const EVENT_DATA: u64 = 42;
886 epoll_file
887 .add(
888 locked,
889 ¤t_task,
890 &socket1,
891 &epoll_file_handle,
892 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
893 )
894 .unwrap();
895 assert_eq!(
896 epoll_file
897 .wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO)
898 .unwrap()
899 .len(),
900 0
901 );
902
903 let read_write_event = FdEvents::POLLIN | FdEvents::POLLOUT;
904 epoll_file
905 .modify(
906 locked,
907 ¤t_task,
908 &socket1,
909 EpollEvent::new(read_write_event, EVENT_DATA),
910 )
911 .unwrap();
912 let triggered_events =
913 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO).unwrap();
914 assert_eq!(1, triggered_events.len());
915 let event = &triggered_events[0];
916 assert_eq!(event.events(), FdEvents::POLLOUT);
917 assert_eq!(event.data(), EVENT_DATA);
918 })
919 .await;
920 }
921
922 #[::fuchsia::test]
923 async fn test_waiter_removal() {
924 spawn_kernel_and_run(async |locked, current_task| {
925 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
926 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
927 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
928
929 const EVENT_DATA: u64 = 42;
930 epoll_file
931 .add(
932 locked,
933 ¤t_task,
934 &event,
935 &epoll_file_handle,
936 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
937 )
938 .unwrap();
939
940 std::mem::drop(event);
941
942 assert!(epoll_file.waitable_state.lock().waiters.is_empty());
943 })
944 .await;
945 }
946}