1use crate::power::WakeupSourceOrigin;
6use crate::task::{
7 CurrentTask, EventHandler, ReadyItem, ReadyItemKey, WaitCanceler, WaitQueue, Waiter,
8};
9use crate::vfs::{
10 Anon, FileHandle, FileObject, FileObjectState, FileOps, WeakFileHandle, fileops_impl_dataless,
11 fileops_impl_nonseekable, fileops_impl_noop_sync,
12};
13use itertools::Itertools;
14use starnix_logging::log_warn;
15use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex};
16use starnix_uapi::error;
17use starnix_uapi::errors::{EINTR, ETIMEDOUT, Errno};
18use starnix_uapi::open_flags::OpenFlags;
19use starnix_uapi::vfs::{EpollEvent, FdEvents};
20use std::collections::hash_map::Entry;
21use std::collections::{HashMap, VecDeque};
22use std::sync::Arc;
23
24const MAX_NESTED_DEPTH: u32 = 5;
27
28struct WaitObject {
34 target: WeakFileHandle,
35 events: FdEvents,
36 data: u64,
37 wait_canceler: Option<WaitCanceler>,
38}
39
40impl WaitObject {
41 fn target(&self) -> Option<FileHandle> {
46 self.target.upgrade()
47 }
48}
49
50pub type EpollKey = usize;
53
54pub fn wakeup_source_name_for_epoll(current_task: &CurrentTask, key: EpollKey) -> String {
55 format!("[{}] {}", current_task.command(), key)
56}
57
58#[derive(Default)]
61pub struct EpollFileObject {
62 waiter: Waiter,
63 state: Mutex<EpollState>,
65 waitable_state: Arc<Mutex<EpollWaitableState>>,
66}
67
68#[derive(Default)]
69struct EpollState {
70 wait_objects: HashMap<ReadyItemKey, WaitObject>,
73 processing_list: VecDeque<ReadyItem>,
81 recheck_list: Vec<ReadyItemKey>,
87}
88
89#[derive(Default)]
90struct EpollWaitableState {
91 trigger_list: VecDeque<ReadyItem>,
94 waiters: WaitQueue,
97}
98
99impl EpollFileObject {
100 pub fn new_file<L>(locked: &mut Locked<L>, current_task: &CurrentTask) -> FileHandle
102 where
103 L: LockEqualOrBefore<FileOpsCore>,
104 {
105 let epoll = Box::new(EpollFileObject::default());
106
107 #[cfg(any(test, debug_assertions))]
108 {
109 let _l1 = epoll.state.lock();
110 let _l2 = epoll.waitable_state.lock();
111 }
112
113 Anon::new_private_file(locked, current_task, epoll, OpenFlags::RDWR, "[eventpoll]")
114 }
115
116 fn wait_on_file<L>(
117 &self,
118 locked: &mut Locked<L>,
119 current_task: &CurrentTask,
120 key: ReadyItemKey,
121 wait_object: &mut WaitObject,
122 ) -> Result<(), Errno>
123 where
124 L: LockEqualOrBefore<FileOpsCore>,
125 {
126 self.wait_on_file_edge_triggered(locked, current_task, key, wait_object)?;
128
129 self.do_recheck(locked, current_task, wait_object, key)?;
130
131 Ok(())
132 }
133
134 fn do_recheck<L>(
135 &self,
136 locked: &mut Locked<L>,
137 current_task: &CurrentTask,
138 wait_object: &mut WaitObject,
139 key: ReadyItemKey,
140 ) -> Result<(), Errno>
141 where
142 L: LockEqualOrBefore<FileOpsCore>,
143 {
144 let Some(target) = wait_object.target() else { return Ok(()) };
145 let events = target.query_events(locked, current_task)?;
146 if !(events & wait_object.events).is_empty() {
147 self.waiter.wake_immediately(events, self.new_wait_handler(key));
148 if let Some(wait_canceler) = wait_object.wait_canceler.take() {
149 wait_canceler.cancel();
150 } else {
151 log_warn!("wait canceler should have been set by `wait_on_file_edge_triggered`");
152 }
153 }
154 Ok(())
155 }
156
157 fn wait_on_file_edge_triggered<L>(
158 &self,
159 locked: &mut Locked<L>,
160 current_task: &CurrentTask,
161 key: ReadyItemKey,
162 wait_object: &mut WaitObject,
163 ) -> Result<(), Errno>
164 where
165 L: LockEqualOrBefore<FileOpsCore>,
166 {
167 let Some(target) = wait_object.target() else {
168 return Ok(());
169 };
170
171 wait_object.wait_canceler = target.wait_async(
172 locked,
173 current_task,
174 &self.waiter,
175 wait_object.events,
176 self.new_wait_handler(key),
177 );
178 if wait_object.wait_canceler.is_none() {
179 return error!(EPERM);
180 }
181 Ok(())
182 }
183
184 fn check_eloop(&self, parent: &FileHandle, depth_left: u32) -> Result<(), Errno> {
187 if depth_left == 0 {
188 return error!(ELOOP);
189 }
190
191 let state = self.state.lock();
192 for nested_object in state.wait_objects.values() {
193 let Some(child) = nested_object.target() else {
194 continue;
195 };
196 let Some(child_file) = child.downcast_file::<EpollFileObject>() else {
197 continue;
198 };
199
200 if Arc::ptr_eq(&child, parent) {
201 return error!(ELOOP);
202 }
203 child_file.check_eloop(parent, depth_left - 1)?;
204 }
205
206 Ok(())
207 }
208
209 pub fn add<L>(
211 &self,
212 locked: &mut Locked<L>,
213 current_task: &CurrentTask,
214 file: &FileHandle,
215 epoll_file_handle: &FileHandle,
216 epoll_event: EpollEvent,
217 ) -> Result<(), Errno>
218 where
219 L: LockEqualOrBefore<FileOpsCore>,
220 {
221 if let Some(epoll_to_add) = file.downcast_file::<EpollFileObject>() {
223 epoll_to_add.check_eloop(epoll_file_handle, MAX_NESTED_DEPTH - 1)?;
226 }
227
228 let mut state = self.state.lock();
229 let key = file.id.as_epoll_key().into();
230 match state.wait_objects.entry(key) {
231 Entry::Occupied(_) => error!(EEXIST),
232 Entry::Vacant(entry) => {
233 let wait_object = entry.insert(WaitObject {
234 target: Arc::downgrade(file),
235 events: epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR,
236 data: epoll_event.data(),
237 wait_canceler: None,
238 });
239 self.wait_on_file(locked, current_task, key, wait_object)
240 }
241 }
242 }
243
244 pub fn modify<L>(
246 &self,
247 locked: &mut Locked<L>,
248 current_task: &CurrentTask,
249 file: &FileHandle,
250 epoll_event: EpollEvent,
251 ) -> Result<(), Errno>
252 where
253 L: LockEqualOrBefore<FileOpsCore>,
254 {
255 let mut state = self.state.lock();
256 let key = file.id.as_epoll_key();
257 state.recheck_list.retain(|x| *x != key.into());
258 let Some(wait_object) = state.wait_objects.get_mut(&key.into()) else {
259 return error!(ENOENT);
260 };
261 if let Some(wait_canceler) = wait_object.wait_canceler.take() {
262 wait_canceler.cancel();
263 }
264 wait_object.events = epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR;
265 wait_object.data = epoll_event.data();
266 if wait_object.events.contains(FdEvents::EPOLLWAKEUP)
270 && !epoll_event.events().contains(FdEvents::EPOLLWAKEUP)
271 {
272 current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
273 &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key)),
274 );
275 }
276 self.wait_on_file(locked, current_task, key.into(), wait_object)
277 }
278
279 pub fn delete(&self, current_task: &CurrentTask, file: &FileObject) -> Result<(), Errno> {
282 let mut state = self.state.lock();
283 let key = file.id.as_epoll_key().into();
284 let ReadyItemKey::Usize(key_usize) = key else {
285 return error!(EINVAL);
287 };
288 if let Some(mut wait_object) = state.wait_objects.remove(&key) {
289 if let Some(wait_canceler) = wait_object.wait_canceler.take() {
290 wait_canceler.cancel();
291 }
292 state.recheck_list.retain(|x| *x != key);
293 current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
295 &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key_usize)),
296 );
297 Ok(())
298 } else {
299 error!(ENOENT)
300 }
301 }
302
303 fn process_triggered_events<L>(
311 &self,
312 locked: &mut Locked<L>,
313 current_task: &CurrentTask,
314 pending_list: &mut Vec<ReadyItem>,
315 max_events: usize,
316 ) -> Result<(), Errno>
317 where
318 L: LockEqualOrBefore<FileOpsCore>,
319 {
320 let mut state = self.state.lock();
321 state.processing_list.append(&mut self.waitable_state.lock().trigger_list);
326 while pending_list.len() < max_events && !state.processing_list.is_empty() {
327 if let Some(pending) = state.processing_list.pop_front() {
328 if let Some(wait) = state.wait_objects.get_mut(&pending.key) {
329 if let Some(target) = wait.target.upgrade() {
333 let ready = ReadyItem {
334 key: pending.key,
335 events: target.query_events(locked, current_task)?,
336 };
337 if ready.events.intersects(wait.events) {
338 pending_list.push(ready);
339 } else {
340 self.wait_on_file(locked, current_task, pending.key, wait)?;
342 }
343 }
344 }
345 }
346 }
347 Ok(())
348 }
349
350 fn wait_until_pending_event<L>(
353 &self,
354 locked: &mut Locked<L>,
355 current_task: &CurrentTask,
356 max_events: usize,
357 mut wait_deadline: zx::MonotonicInstant,
358 ) -> Result<Vec<ReadyItem>, Errno>
359 where
360 L: LockEqualOrBefore<FileOpsCore>,
361 {
362 let mut pending_list = Vec::new();
363
364 loop {
365 self.process_triggered_events(locked, current_task, &mut pending_list, max_events)?;
366
367 if pending_list.len() == max_events {
368 break; }
370
371 if !pending_list.is_empty() {
372 wait_deadline = zx::MonotonicInstant::ZERO;
376 }
377
378 match self.waiter.wait_until(locked, current_task, wait_deadline) {
392 Err(err) if err == ETIMEDOUT => break,
393 Err(err) if err == EINTR => {
394 debug_assert!(
399 pending_list.is_empty(),
400 "Got EINTR from wait of {}ns with {} items pending.",
401 wait_deadline.into_nanos(),
402 pending_list.len()
403 );
404 return Err(err);
405 }
406 result => result?,
408 }
409 }
410
411 Ok(pending_list)
412 }
413
414 pub fn wait<L>(
416 &self,
417 locked: &mut Locked<L>,
418 current_task: &CurrentTask,
419 max_events: usize,
420 deadline: zx::MonotonicInstant,
421 ) -> Result<Vec<EpollEvent>, Errno>
422 where
423 L: LockEqualOrBefore<FileOpsCore>,
424 {
425 {
426 let mut state = self.state.lock();
427 let recheck_list = std::mem::take(&mut state.recheck_list);
428 for key in recheck_list {
429 if let ReadyItemKey::Usize(key) = key {
430 current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
431 &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key)),
432 );
433 }
434 let wait_object = state.wait_objects.get_mut(&key).unwrap();
435 self.do_recheck(locked, current_task, wait_object, key)?;
436 }
437 }
438
439 let pending_list =
440 self.wait_until_pending_event(locked, current_task, max_events, deadline)?;
441
442 let mut result = vec![];
445 let mut state = self.state.lock();
446 let state = &mut *state;
447 for pending_event in pending_list.iter().unique_by(|e| e.key) {
448 let Some(wait) = state.wait_objects.get_mut(&pending_event.key) else { continue };
451
452 let reported_events = pending_event.events & wait.events;
453 result.push(EpollEvent::new(reported_events, wait.data));
454
455 if wait.events.contains(FdEvents::EPOLLONESHOT) {
458 continue;
459 }
460
461 self.wait_on_file_edge_triggered(locked, current_task, pending_event.key, wait)?;
462
463 if !wait.events.contains(FdEvents::EPOLLET) {
464 state.recheck_list.push(pending_event.key);
465 }
466
467 if !wait.events.contains(FdEvents::EPOLLET) {
469 if wait.events.contains(FdEvents::EPOLLWAKEUP) {
472 if let ReadyItemKey::Usize(key) = pending_event.key {
473 current_task.kernel().suspend_resume_manager.activate_wakeup_source(
474 WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(
475 current_task,
476 key,
477 )),
478 );
479 }
480 }
481 }
482 }
483
484 Ok(result)
485 }
486}
487
488impl FileOps for EpollFileObject {
489 fileops_impl_nonseekable!();
490 fileops_impl_noop_sync!();
491 fileops_impl_dataless!();
492
493 fn wait_async(
494 &self,
495 _locked: &mut Locked<FileOpsCore>,
496 _file: &FileObject,
497 _current_task: &CurrentTask,
498 waiter: &Waiter,
499 events: FdEvents,
500 handler: EventHandler,
501 ) -> Option<WaitCanceler> {
502 Some(self.waitable_state.lock().waiters.wait_async_fd_events(waiter, events, handler))
503 }
504
505 fn query_events(
506 &self,
507 locked: &mut Locked<FileOpsCore>,
508 _file: &FileObject,
509 current_task: &CurrentTask,
510 ) -> Result<FdEvents, Errno> {
511 let mut events = FdEvents::empty();
512 let state = self.state.lock();
513 if !state.processing_list.is_empty() || !self.waitable_state.lock().trigger_list.is_empty()
514 {
515 events |= FdEvents::POLLIN;
516 } else {
517 for key in &state.recheck_list {
518 let wait_object = state.wait_objects.get(&key).unwrap();
519 let Some(target) = wait_object.target() else { continue };
520 if !(target.query_events(locked, current_task)? & wait_object.events).is_empty() {
521 events |= FdEvents::POLLIN;
522 break;
523 }
524 }
525 }
526 Ok(events)
527 }
528
529 fn close(
530 self: Box<Self>,
531 _locked: &mut Locked<FileOpsCore>,
532 _file: &FileObjectState,
533 current_task: &CurrentTask,
534 ) {
535 let guard = self.state.lock();
536 for (key, _wait_object) in guard.wait_objects.iter() {
537 if let ReadyItemKey::Usize(key) = key {
538 current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
539 &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, *key)),
540 );
541 }
542 }
543 }
544}
545
546#[derive(Clone)]
547pub struct EpollEventHandler {
548 key: ReadyItemKey,
549 waitable_state: Arc<Mutex<EpollWaitableState>>,
550}
551
552impl EpollEventHandler {
553 pub fn handle(self, events: FdEvents) {
554 let mut waitable_state = self.waitable_state.lock();
555 waitable_state.trigger_list.push_back(ReadyItem { key: self.key, events });
556 waitable_state.waiters.notify_fd_events(FdEvents::POLLIN);
557 }
558}
559
560impl EpollFileObject {
561 fn new_wait_handler(&self, key: ReadyItemKey) -> EventHandler {
562 EventHandler::Epoll(EpollEventHandler {
563 key,
564 waitable_state: Arc::clone(&self.waitable_state),
565 })
566 }
567}
568
569#[cfg(test)]
570mod tests {
571 use super::*;
572 use crate::fs::fuchsia::create_fuchsia_pipe;
573 use crate::task::Waiter;
574 use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
575 use crate::testing::spawn_kernel_and_run;
576 use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
577 use crate::vfs::eventfd::{EventFdType, new_eventfd};
578 use crate::vfs::fs_registry::FsRegistry;
579 use crate::vfs::pipe::{new_pipe, register_pipe_fs};
580 use crate::vfs::socket::{SocketDomain, SocketType, UnixSocket};
581 use starnix_lifecycle::AtomicUsizeCounter;
582 use starnix_sync::Unlocked;
583 use starnix_uapi::vfs::{EpollEvent, FdEvents};
584 use syncio::Zxio;
585 use zx::{
586 HandleBased, {self as zx},
587 };
588
589 #[::fuchsia::test]
590 async fn test_epoll_read_ready() {
591 static WRITE_COUNT: AtomicUsizeCounter = AtomicUsizeCounter::new(0);
592 const EVENT_DATA: u64 = 42;
593
594 spawn_kernel_and_run(async |locked, current_task| {
595 let kernel = current_task.kernel();
596 register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
597
598 let (pipe_out, pipe_in) = new_pipe(locked, ¤t_task).unwrap();
599
600 let test_string = "hello starnix".to_string();
601 let test_len = test_string.len();
602
603 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
604 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
605 epoll_file
606 .add(
607 locked,
608 ¤t_task,
609 &pipe_out,
610 &epoll_file_handle,
611 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
612 )
613 .unwrap();
614
615 let (sender, receiver) = std::sync::mpsc::channel();
616 let value = test_string.clone();
617 let closure = move |locked: &mut Locked<Unlocked>, task: &CurrentTask| {
618 let bytes_written = pipe_in
619 .write(locked, &task, &mut VecInputBuffer::new(value.as_bytes()))
620 .unwrap();
621 assert_eq!(bytes_written, test_len);
622 WRITE_COUNT.add(bytes_written);
623 sender.send(()).unwrap();
624 };
625 let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
626 kernel.kthreads.spawner().spawn_from_request(req);
627 let events =
628 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
629 receiver.recv().unwrap();
630 assert_eq!(1, events.len());
631 let event = &events[0];
632 assert!(event.events().contains(FdEvents::POLLIN));
633 assert_eq!(event.data(), EVENT_DATA);
634
635 let mut buffer = VecOutputBuffer::new(test_len);
636 let bytes_read = pipe_out.read(locked, ¤t_task, &mut buffer).unwrap();
637 assert_eq!(bytes_read, WRITE_COUNT.get());
638 assert_eq!(bytes_read, test_len);
639 assert_eq!(buffer.data(), test_string.as_bytes());
640 })
641 .await;
642 }
643
644 #[::fuchsia::test]
645 async fn test_epoll_ready_then_wait() {
646 const EVENT_DATA: u64 = 42;
647
648 spawn_kernel_and_run(async |locked, current_task| {
649 let kernel = current_task.kernel();
650 register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
651
652 let (pipe_out, pipe_in) = new_pipe(locked, ¤t_task).unwrap();
653
654 let test_string = "hello starnix".to_string();
655 let test_bytes = test_string.as_bytes();
656 let test_len = test_bytes.len();
657
658 assert_eq!(
659 pipe_in.write(locked, ¤t_task, &mut VecInputBuffer::new(test_bytes)).unwrap(),
660 test_bytes.len()
661 );
662
663 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
664 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
665 epoll_file
666 .add(
667 locked,
668 ¤t_task,
669 &pipe_out,
670 &epoll_file_handle,
671 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
672 )
673 .unwrap();
674
675 let events =
676 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
677 assert_eq!(1, events.len());
678 let event = &events[0];
679 assert!(event.events().contains(FdEvents::POLLIN));
680 assert_eq!(event.data(), EVENT_DATA);
681
682 let mut buffer = VecOutputBuffer::new(test_len);
683 let bytes_read = pipe_out.read(locked, ¤t_task, &mut buffer).unwrap();
684 assert_eq!(bytes_read, test_len);
685 assert_eq!(buffer.data(), test_bytes);
686 })
687 .await;
688 }
689
690 #[::fuchsia::test]
691 async fn test_epoll_ctl_cancel() {
692 spawn_kernel_and_run(async |locked, current_task| {
693 for do_cancel in [true, false] {
694 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
695 let waiter = Waiter::new();
696
697 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
698 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
699 const EVENT_DATA: u64 = 42;
700 epoll_file
701 .add(
702 locked,
703 ¤t_task,
704 &event,
705 &epoll_file_handle,
706 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
707 )
708 .unwrap();
709
710 if do_cancel {
711 epoll_file.delete(¤t_task, &event).unwrap();
712 }
713
714 let wait_canceler = event
715 .wait_async(
716 locked,
717 ¤t_task,
718 &waiter,
719 FdEvents::POLLIN,
720 EventHandler::None,
721 )
722 .expect("wait_async");
723 if do_cancel {
724 wait_canceler.cancel();
725 }
726
727 let add_val = 1u64;
728
729 assert_eq!(
730 event
731 .write(
732 locked,
733 ¤t_task,
734 &mut VecInputBuffer::new(&add_val.to_ne_bytes())
735 )
736 .unwrap(),
737 std::mem::size_of::<u64>()
738 );
739
740 let events =
741 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO).unwrap();
742
743 if do_cancel {
744 assert_eq!(0, events.len());
745 } else {
746 assert_eq!(1, events.len());
747 let event = &events[0];
748 assert!(event.events().contains(FdEvents::POLLIN));
749 assert_eq!(event.data(), EVENT_DATA);
750 }
751 }
752 })
753 .await;
754 }
755
756 #[::fuchsia::test]
757 async fn test_multiple_events() {
758 spawn_kernel_and_run(async |locked, current_task| {
759 let (client1, server1) = zx::Socket::create_stream();
760 let (client2, server2) = zx::Socket::create_stream();
761 let pipe1 = create_fuchsia_pipe(locked, ¤t_task, client1, OpenFlags::RDWR)
762 .expect("create_fuchsia_pipe");
763 let pipe2 = create_fuchsia_pipe(locked, ¤t_task, client2, OpenFlags::RDWR)
764 .expect("create_fuchsia_pipe");
765 let server1_zxio = Zxio::create(server1.into_handle()).expect("Zxio::create");
766 let server2_zxio = Zxio::create(server2.into_handle()).expect("Zxio::create");
767
768 let poll = |locked: &mut Locked<Unlocked>| {
769 let epoll_object = EpollFileObject::new_file(locked, ¤t_task);
770 let epoll_file = epoll_object.downcast_file::<EpollFileObject>().unwrap();
771 epoll_file
772 .add(
773 locked,
774 ¤t_task,
775 &pipe1,
776 &epoll_object,
777 EpollEvent::new(FdEvents::POLLIN, 1),
778 )
779 .expect("epoll_file.add");
780 epoll_file
781 .add(
782 locked,
783 ¤t_task,
784 &pipe2,
785 &epoll_object,
786 EpollEvent::new(FdEvents::POLLIN, 2),
787 )
788 .expect("epoll_file.add");
789 epoll_file.wait(locked, ¤t_task, 2, zx::MonotonicInstant::ZERO).expect("wait")
790 };
791
792 let fds = poll(locked);
793 assert!(fds.is_empty());
794
795 assert_eq!(server1_zxio.write(&[0]).expect("write"), 1);
796
797 let fds = poll(locked);
798 assert_eq!(fds.len(), 1);
799 assert_eq!(fds[0].events(), FdEvents::POLLIN);
800 assert_eq!(fds[0].data(), 1);
801 assert_eq!(
802 pipe1.read(locked, ¤t_task, &mut VecOutputBuffer::new(64)).expect("read"),
803 1
804 );
805
806 let fds = poll(locked);
807 assert!(fds.is_empty());
808
809 assert_eq!(server2_zxio.write(&[0]).expect("write"), 1);
810
811 let fds = poll(locked);
812 assert_eq!(fds.len(), 1);
813 assert_eq!(fds[0].events(), FdEvents::POLLIN);
814 assert_eq!(fds[0].data(), 2);
815 assert_eq!(
816 pipe2.read(locked, ¤t_task, &mut VecOutputBuffer::new(64)).expect("read"),
817 1
818 );
819
820 let fds = poll(locked);
821 assert!(fds.is_empty());
822 })
823 .await;
824 }
825
826 #[::fuchsia::test]
827 async fn test_cancel_after_notify() {
828 spawn_kernel_and_run(async |locked, current_task| {
829 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
830 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
831 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
832
833 const EVENT_DATA: u64 = 42;
835 epoll_file
836 .add(
837 locked,
838 ¤t_task,
839 &event,
840 &epoll_file_handle,
841 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
842 )
843 .unwrap();
844
845 let add_val = 1u64;
847 assert_eq!(
848 event
849 .write(locked, ¤t_task, &mut VecInputBuffer::new(&add_val.to_ne_bytes()))
850 .unwrap(),
851 std::mem::size_of::<u64>()
852 );
853
854 assert_eq!(
855 epoll_file
856 .wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO)
857 .unwrap()
858 .len(),
859 1
860 );
861
862 epoll_file.delete(¤t_task, &event).unwrap();
864
865 assert_eq!(
867 epoll_file
868 .wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO)
869 .unwrap()
870 .len(),
871 0
872 );
873 })
875 .await;
876 }
877
878 #[::fuchsia::test]
879 async fn test_add_then_modify() {
880 spawn_kernel_and_run(async |locked, current_task| {
881 let (socket1, _socket2) = UnixSocket::new_pair(
882 locked,
883 ¤t_task,
884 SocketDomain::Unix,
885 SocketType::Stream,
886 OpenFlags::RDWR,
887 )
888 .expect("Failed to create socket pair.");
889
890 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
891 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
892
893 const EVENT_DATA: u64 = 42;
894 epoll_file
895 .add(
896 locked,
897 ¤t_task,
898 &socket1,
899 &epoll_file_handle,
900 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
901 )
902 .unwrap();
903 assert_eq!(
904 epoll_file
905 .wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO)
906 .unwrap()
907 .len(),
908 0
909 );
910
911 let read_write_event = FdEvents::POLLIN | FdEvents::POLLOUT;
912 epoll_file
913 .modify(
914 locked,
915 ¤t_task,
916 &socket1,
917 EpollEvent::new(read_write_event, EVENT_DATA),
918 )
919 .unwrap();
920 let triggered_events =
921 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO).unwrap();
922 assert_eq!(1, triggered_events.len());
923 let event = &triggered_events[0];
924 assert_eq!(event.events(), FdEvents::POLLOUT);
925 assert_eq!(event.data(), EVENT_DATA);
926 })
927 .await;
928 }
929
930 #[::fuchsia::test]
931 async fn test_waiter_removal() {
932 spawn_kernel_and_run(async |locked, current_task| {
933 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
934 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
935 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
936
937 const EVENT_DATA: u64 = 42;
938 epoll_file
939 .add(
940 locked,
941 ¤t_task,
942 &event,
943 &epoll_file_handle,
944 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
945 )
946 .unwrap();
947
948 std::mem::drop(event);
949
950 assert!(epoll_file.waitable_state.lock().waiters.is_empty());
951 })
952 .await;
953 }
954}