1use crate::task::{
6 CurrentTask, EventHandler, ReadyItem, ReadyItemKey, WaitCanceler, WaitQueue, Waiter,
7};
8use crate::vfs::{
9 Anon, FileHandle, FileObject, FileObjectState, FileOps, WeakFileHandle, fileops_impl_dataless,
10 fileops_impl_nonseekable, fileops_impl_noop_sync,
11};
12use itertools::Itertools;
13use starnix_logging::log_warn;
14use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex};
15use starnix_uapi::error;
16use starnix_uapi::errors::{EINTR, ETIMEDOUT, Errno};
17use starnix_uapi::open_flags::OpenFlags;
18use starnix_uapi::vfs::{EpollEvent, FdEvents};
19use std::collections::hash_map::Entry;
20use std::collections::{HashMap, VecDeque};
21use std::sync::Arc;
22
23const MAX_NESTED_DEPTH: u32 = 5;
26
27struct WaitObject {
33 target: WeakFileHandle,
34 events: FdEvents,
35 data: u64,
36 wait_canceler: Option<WaitCanceler>,
37}
38
39impl WaitObject {
40 fn target(&self) -> Option<FileHandle> {
45 self.target.upgrade()
46 }
47}
48
49pub type EpollKey = usize;
52
53#[derive(Default)]
56pub struct EpollFileObject {
57 waiter: Waiter,
58 state: Mutex<EpollState>,
60 waitable_state: Arc<Mutex<EpollWaitableState>>,
61}
62
63#[derive(Default)]
64struct EpollState {
65 wait_objects: HashMap<ReadyItemKey, WaitObject>,
68 processing_list: VecDeque<ReadyItem>,
76 recheck_list: Vec<ReadyItemKey>,
82}
83
84#[derive(Default)]
85struct EpollWaitableState {
86 trigger_list: VecDeque<ReadyItem>,
89 waiters: WaitQueue,
92}
93
94impl EpollFileObject {
95 pub fn new_file<L>(locked: &mut Locked<L>, current_task: &CurrentTask) -> FileHandle
97 where
98 L: LockEqualOrBefore<FileOpsCore>,
99 {
100 let epoll = Box::new(EpollFileObject::default());
101
102 #[cfg(any(test, debug_assertions))]
103 {
104 let _l1 = epoll.state.lock();
105 let _l2 = epoll.waitable_state.lock();
106 }
107
108 Anon::new_private_file(locked, current_task, epoll, OpenFlags::RDWR, "[eventpoll]")
109 }
110
111 fn wait_on_file<L>(
112 &self,
113 locked: &mut Locked<L>,
114 current_task: &CurrentTask,
115 key: ReadyItemKey,
116 wait_object: &mut WaitObject,
117 ) -> Result<(), Errno>
118 where
119 L: LockEqualOrBefore<FileOpsCore>,
120 {
121 self.wait_on_file_edge_triggered(locked, current_task, key, wait_object)?;
123
124 self.do_recheck(locked, current_task, wait_object, key)?;
125
126 Ok(())
127 }
128
129 fn do_recheck<L>(
130 &self,
131 locked: &mut Locked<L>,
132 current_task: &CurrentTask,
133 wait_object: &mut WaitObject,
134 key: ReadyItemKey,
135 ) -> Result<(), Errno>
136 where
137 L: LockEqualOrBefore<FileOpsCore>,
138 {
139 let Some(target) = wait_object.target() else { return Ok(()) };
140 let events = target.query_events(locked, current_task)?;
141 if !(events & wait_object.events).is_empty() {
142 self.waiter.wake_immediately(events, self.new_wait_handler(key));
143 if let Some(wait_canceler) = wait_object.wait_canceler.take() {
144 wait_canceler.cancel();
145 } else {
146 log_warn!("wait canceler should have been set by `wait_on_file_edge_triggered`");
147 }
148 }
149 Ok(())
150 }
151
152 fn wait_on_file_edge_triggered<L>(
153 &self,
154 locked: &mut Locked<L>,
155 current_task: &CurrentTask,
156 key: ReadyItemKey,
157 wait_object: &mut WaitObject,
158 ) -> Result<(), Errno>
159 where
160 L: LockEqualOrBefore<FileOpsCore>,
161 {
162 let Some(target) = wait_object.target() else {
163 return Ok(());
164 };
165
166 wait_object.wait_canceler = target.wait_async(
167 locked,
168 current_task,
169 &self.waiter,
170 wait_object.events,
171 self.new_wait_handler(key),
172 );
173 if wait_object.wait_canceler.is_none() {
174 return error!(EPERM);
175 }
176 Ok(())
177 }
178
179 fn check_eloop(&self, parent: &FileHandle, depth_left: u32) -> Result<(), Errno> {
182 if depth_left == 0 {
183 return error!(ELOOP);
184 }
185
186 let state = self.state.lock();
187 for nested_object in state.wait_objects.values() {
188 let Some(child) = nested_object.target() else {
189 continue;
190 };
191 let Some(child_file) = child.downcast_file::<EpollFileObject>() else {
192 continue;
193 };
194
195 if Arc::ptr_eq(&child, parent) {
196 return error!(ELOOP);
197 }
198 child_file.check_eloop(parent, depth_left - 1)?;
199 }
200
201 Ok(())
202 }
203
204 pub fn add<L>(
206 &self,
207 locked: &mut Locked<L>,
208 current_task: &CurrentTask,
209 file: &FileHandle,
210 epoll_file_handle: &FileHandle,
211 epoll_event: EpollEvent,
212 ) -> Result<(), Errno>
213 where
214 L: LockEqualOrBefore<FileOpsCore>,
215 {
216 if let Some(epoll_to_add) = file.downcast_file::<EpollFileObject>() {
218 epoll_to_add.check_eloop(epoll_file_handle, MAX_NESTED_DEPTH - 1)?;
221 }
222
223 let mut state = self.state.lock();
224 let key = file.id.as_epoll_key().into();
225 match state.wait_objects.entry(key) {
226 Entry::Occupied(_) => error!(EEXIST),
227 Entry::Vacant(entry) => {
228 let wait_object = entry.insert(WaitObject {
229 target: Arc::downgrade(file),
230 events: epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR,
231 data: epoll_event.data(),
232 wait_canceler: None,
233 });
234 self.wait_on_file(locked, current_task, key, wait_object)
235 }
236 }
237 }
238
239 pub fn modify<L>(
241 &self,
242 locked: &mut Locked<L>,
243 current_task: &CurrentTask,
244 file: &FileHandle,
245 epoll_event: EpollEvent,
246 ) -> Result<(), Errno>
247 where
248 L: LockEqualOrBefore<FileOpsCore>,
249 {
250 let mut state = self.state.lock();
251 let key = file.id.as_epoll_key();
252 state.recheck_list.retain(|x| *x != key.into());
253 let Some(wait_object) = state.wait_objects.get_mut(&key.into()) else {
254 return error!(ENOENT);
255 };
256 if let Some(wait_canceler) = wait_object.wait_canceler.take() {
257 wait_canceler.cancel();
258 }
259 wait_object.events = epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR;
260 wait_object.data = epoll_event.data();
261 if wait_object.events.contains(FdEvents::EPOLLWAKEUP)
265 && !epoll_event.events().contains(FdEvents::EPOLLWAKEUP)
266 {
267 current_task.kernel().suspend_resume_manager.remove_epoll(key);
268 }
269 self.wait_on_file(locked, current_task, key.into(), wait_object)
270 }
271
272 pub fn delete(&self, file: &FileObject) -> Result<(), Errno> {
275 let mut state = self.state.lock();
276 let key = file.id.as_epoll_key().into();
277 if let Some(mut wait_object) = state.wait_objects.remove(&key) {
278 if let Some(wait_canceler) = wait_object.wait_canceler.take() {
279 wait_canceler.cancel();
280 }
281 state.recheck_list.retain(|x| *x != key);
282 Ok(())
283 } else {
284 error!(ENOENT)
285 }
286 }
287
288 fn process_triggered_events<L>(
296 &self,
297 locked: &mut Locked<L>,
298 current_task: &CurrentTask,
299 pending_list: &mut Vec<ReadyItem>,
300 max_events: usize,
301 ) -> Result<(), Errno>
302 where
303 L: LockEqualOrBefore<FileOpsCore>,
304 {
305 let mut state = self.state.lock();
306 state.processing_list.append(&mut self.waitable_state.lock().trigger_list);
311 while pending_list.len() < max_events && !state.processing_list.is_empty() {
312 if let Some(pending) = state.processing_list.pop_front() {
313 if let Some(wait) = state.wait_objects.get_mut(&pending.key) {
314 if let Some(target) = wait.target.upgrade() {
318 let ready = ReadyItem {
319 key: pending.key,
320 events: target.query_events(locked, current_task)?,
321 };
322 if ready.events.intersects(wait.events) {
323 pending_list.push(ready);
324 } else {
325 self.wait_on_file(locked, current_task, pending.key, wait)?;
327 }
328 }
329 }
330 }
331 }
332 Ok(())
333 }
334
335 fn wait_until_pending_event<L>(
338 &self,
339 locked: &mut Locked<L>,
340 current_task: &CurrentTask,
341 max_events: usize,
342 mut wait_deadline: zx::MonotonicInstant,
343 ) -> Result<Vec<ReadyItem>, Errno>
344 where
345 L: LockEqualOrBefore<FileOpsCore>,
346 {
347 let mut pending_list = Vec::new();
348
349 loop {
350 self.process_triggered_events(locked, current_task, &mut pending_list, max_events)?;
351
352 if pending_list.len() == max_events {
353 break; }
355
356 if !pending_list.is_empty() {
357 wait_deadline = zx::MonotonicInstant::ZERO;
361 }
362
363 match self.waiter.wait_until(locked, current_task, wait_deadline) {
377 Err(err) if err == ETIMEDOUT => break,
378 Err(err) if err == EINTR => {
379 debug_assert!(
384 pending_list.is_empty(),
385 "Got EINTR from wait of {}ns with {} items pending.",
386 wait_deadline.into_nanos(),
387 pending_list.len()
388 );
389 return Err(err);
390 }
391 result => result?,
393 }
394 }
395
396 Ok(pending_list)
397 }
398
399 pub fn wait<L>(
401 &self,
402 locked: &mut Locked<L>,
403 current_task: &CurrentTask,
404 max_events: usize,
405 deadline: zx::MonotonicInstant,
406 ) -> Result<Vec<EpollEvent>, Errno>
407 where
408 L: LockEqualOrBefore<FileOpsCore>,
409 {
410 {
411 let mut state = self.state.lock();
412 let recheck_list = std::mem::take(&mut state.recheck_list);
413 for key in recheck_list {
414 if let ReadyItemKey::Usize(key) = key {
415 current_task.kernel().suspend_resume_manager.remove_epoll(key);
416 }
417 let wait_object = state.wait_objects.get_mut(&key).unwrap();
418 self.do_recheck(locked, current_task, wait_object, key)?;
419 }
420 }
421
422 let pending_list =
423 self.wait_until_pending_event(locked, current_task, max_events, deadline)?;
424
425 let mut result = vec![];
428 let mut state = self.state.lock();
429 let state = &mut *state;
430 for pending_event in pending_list.iter().unique_by(|e| e.key) {
431 let Some(wait) = state.wait_objects.get_mut(&pending_event.key) else { continue };
434
435 let reported_events = pending_event.events & wait.events;
436 result.push(EpollEvent::new(reported_events, wait.data));
437
438 if wait.events.contains(FdEvents::EPOLLONESHOT) {
441 continue;
442 }
443
444 self.wait_on_file_edge_triggered(locked, current_task, pending_event.key, wait)?;
445
446 if !wait.events.contains(FdEvents::EPOLLET) {
447 state.recheck_list.push(pending_event.key);
448 }
449
450 if !wait.events.contains(FdEvents::EPOLLET) {
452 if wait.events.contains(FdEvents::EPOLLWAKEUP) {
455 if let ReadyItemKey::Usize(key) = pending_event.key {
456 current_task.kernel().suspend_resume_manager.add_epoll(current_task, key)
457 }
458 }
459 }
460 }
461
462 Ok(result)
463 }
464
465 pub fn drop_lease(&self, current_task: &CurrentTask, file: &FileHandle) {
467 let mut guard = self.state.lock();
468 let key = file.id.as_epoll_key();
469 if let Entry::Occupied(_) = guard.wait_objects.entry(key.into()) {
470 current_task.kernel().suspend_resume_manager.remove_epoll(key);
471 }
472 }
473
474 pub fn activate_lease(
479 &self,
480 current_task: &CurrentTask,
481 file: &FileHandle,
482 _baton_lease: &zx::NullableHandle,
483 ) -> Result<(), Errno> {
484 let key = file.id.as_epoll_key();
485 let add_epoll = self
489 .state
490 .lock()
491 .wait_objects
492 .get(&key.into())
493 .map(|obj| obj.events.contains(FdEvents::EPOLLWAKEUP))
494 .unwrap_or(false);
495 if add_epoll {
496 current_task.kernel().suspend_resume_manager.add_epoll(current_task, key);
497 }
498 Ok(())
499 }
500}
501
502impl FileOps for EpollFileObject {
503 fileops_impl_nonseekable!();
504 fileops_impl_noop_sync!();
505 fileops_impl_dataless!();
506
507 fn wait_async(
508 &self,
509 _locked: &mut Locked<FileOpsCore>,
510 _file: &FileObject,
511 _current_task: &CurrentTask,
512 waiter: &Waiter,
513 events: FdEvents,
514 handler: EventHandler,
515 ) -> Option<WaitCanceler> {
516 Some(self.waitable_state.lock().waiters.wait_async_fd_events(waiter, events, handler))
517 }
518
519 fn query_events(
520 &self,
521 locked: &mut Locked<FileOpsCore>,
522 _file: &FileObject,
523 current_task: &CurrentTask,
524 ) -> Result<FdEvents, Errno> {
525 let mut events = FdEvents::empty();
526 let state = self.state.lock();
527 if !state.processing_list.is_empty() || !self.waitable_state.lock().trigger_list.is_empty()
528 {
529 events |= FdEvents::POLLIN;
530 } else {
531 for key in &state.recheck_list {
532 let wait_object = state.wait_objects.get(&key).unwrap();
533 let Some(target) = wait_object.target() else { continue };
534 if !(target.query_events(locked, current_task)? & wait_object.events).is_empty() {
535 events |= FdEvents::POLLIN;
536 break;
537 }
538 }
539 }
540 Ok(events)
541 }
542
543 fn close(
544 self: Box<Self>,
545 _locked: &mut Locked<FileOpsCore>,
546 _file: &FileObjectState,
547 current_task: &CurrentTask,
548 ) {
549 let guard = self.state.lock();
550 for (key, _wait_object) in guard.wait_objects.iter() {
551 if let ReadyItemKey::Usize(key) = key {
552 current_task.kernel().suspend_resume_manager.remove_epoll(*key);
553 }
554 }
555 }
556}
557
558#[derive(Clone)]
559pub struct EpollEventHandler {
560 key: ReadyItemKey,
561 waitable_state: Arc<Mutex<EpollWaitableState>>,
562}
563
564impl EpollEventHandler {
565 pub fn handle(self, events: FdEvents) {
566 let mut waitable_state = self.waitable_state.lock();
567 waitable_state.trigger_list.push_back(ReadyItem { key: self.key, events });
568 waitable_state.waiters.notify_fd_events(FdEvents::POLLIN);
569 }
570}
571
572impl EpollFileObject {
573 fn new_wait_handler(&self, key: ReadyItemKey) -> EventHandler {
574 EventHandler::Epoll(EpollEventHandler {
575 key,
576 waitable_state: Arc::clone(&self.waitable_state),
577 })
578 }
579}
580
581#[cfg(test)]
582mod tests {
583 use super::*;
584 use crate::fs::fuchsia::create_fuchsia_pipe;
585 use crate::task::Waiter;
586 use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
587 use crate::testing::spawn_kernel_and_run;
588 use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
589 use crate::vfs::eventfd::{EventFdType, new_eventfd};
590 use crate::vfs::fs_registry::FsRegistry;
591 use crate::vfs::pipe::{new_pipe, register_pipe_fs};
592 use crate::vfs::socket::{SocketDomain, SocketType, UnixSocket};
593 use starnix_lifecycle::AtomicUsizeCounter;
594 use starnix_sync::Unlocked;
595 use starnix_uapi::vfs::{EpollEvent, FdEvents};
596 use syncio::Zxio;
597 use zx::{
598 HandleBased, {self as zx},
599 };
600
601 #[::fuchsia::test]
602 async fn test_epoll_read_ready() {
603 static WRITE_COUNT: AtomicUsizeCounter = AtomicUsizeCounter::new(0);
604 const EVENT_DATA: u64 = 42;
605
606 spawn_kernel_and_run(async |locked, current_task| {
607 let kernel = current_task.kernel();
608 register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
609
610 let (pipe_out, pipe_in) = new_pipe(locked, ¤t_task).unwrap();
611
612 let test_string = "hello starnix".to_string();
613 let test_len = test_string.len();
614
615 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
616 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
617 epoll_file
618 .add(
619 locked,
620 ¤t_task,
621 &pipe_out,
622 &epoll_file_handle,
623 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
624 )
625 .unwrap();
626
627 let (sender, receiver) = std::sync::mpsc::channel();
628 let value = test_string.clone();
629 let closure = move |locked: &mut Locked<Unlocked>, task: &CurrentTask| {
630 let bytes_written = pipe_in
631 .write(locked, &task, &mut VecInputBuffer::new(value.as_bytes()))
632 .unwrap();
633 assert_eq!(bytes_written, test_len);
634 WRITE_COUNT.add(bytes_written);
635 sender.send(()).unwrap();
636 };
637 let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
638 kernel.kthreads.spawner().spawn_from_request(req);
639 let events =
640 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
641 receiver.recv().unwrap();
642 assert_eq!(1, events.len());
643 let event = &events[0];
644 assert!(event.events().contains(FdEvents::POLLIN));
645 assert_eq!(event.data(), EVENT_DATA);
646
647 let mut buffer = VecOutputBuffer::new(test_len);
648 let bytes_read = pipe_out.read(locked, ¤t_task, &mut buffer).unwrap();
649 assert_eq!(bytes_read, WRITE_COUNT.get());
650 assert_eq!(bytes_read, test_len);
651 assert_eq!(buffer.data(), test_string.as_bytes());
652 })
653 .await;
654 }
655
656 #[::fuchsia::test]
657 async fn test_epoll_ready_then_wait() {
658 const EVENT_DATA: u64 = 42;
659
660 spawn_kernel_and_run(async |locked, current_task| {
661 let kernel = current_task.kernel();
662 register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
663
664 let (pipe_out, pipe_in) = new_pipe(locked, ¤t_task).unwrap();
665
666 let test_string = "hello starnix".to_string();
667 let test_bytes = test_string.as_bytes();
668 let test_len = test_bytes.len();
669
670 assert_eq!(
671 pipe_in.write(locked, ¤t_task, &mut VecInputBuffer::new(test_bytes)).unwrap(),
672 test_bytes.len()
673 );
674
675 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
676 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
677 epoll_file
678 .add(
679 locked,
680 ¤t_task,
681 &pipe_out,
682 &epoll_file_handle,
683 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
684 )
685 .unwrap();
686
687 let events =
688 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
689 assert_eq!(1, events.len());
690 let event = &events[0];
691 assert!(event.events().contains(FdEvents::POLLIN));
692 assert_eq!(event.data(), EVENT_DATA);
693
694 let mut buffer = VecOutputBuffer::new(test_len);
695 let bytes_read = pipe_out.read(locked, ¤t_task, &mut buffer).unwrap();
696 assert_eq!(bytes_read, test_len);
697 assert_eq!(buffer.data(), test_bytes);
698 })
699 .await;
700 }
701
702 #[::fuchsia::test]
703 async fn test_epoll_ctl_cancel() {
704 spawn_kernel_and_run(async |locked, current_task| {
705 for do_cancel in [true, false] {
706 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
707 let waiter = Waiter::new();
708
709 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
710 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
711 const EVENT_DATA: u64 = 42;
712 epoll_file
713 .add(
714 locked,
715 ¤t_task,
716 &event,
717 &epoll_file_handle,
718 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
719 )
720 .unwrap();
721
722 if do_cancel {
723 epoll_file.delete(&event).unwrap();
724 }
725
726 let wait_canceler = event
727 .wait_async(
728 locked,
729 ¤t_task,
730 &waiter,
731 FdEvents::POLLIN,
732 EventHandler::None,
733 )
734 .expect("wait_async");
735 if do_cancel {
736 wait_canceler.cancel();
737 }
738
739 let add_val = 1u64;
740
741 assert_eq!(
742 event
743 .write(
744 locked,
745 ¤t_task,
746 &mut VecInputBuffer::new(&add_val.to_ne_bytes())
747 )
748 .unwrap(),
749 std::mem::size_of::<u64>()
750 );
751
752 let events =
753 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO).unwrap();
754
755 if do_cancel {
756 assert_eq!(0, events.len());
757 } else {
758 assert_eq!(1, events.len());
759 let event = &events[0];
760 assert!(event.events().contains(FdEvents::POLLIN));
761 assert_eq!(event.data(), EVENT_DATA);
762 }
763 }
764 })
765 .await;
766 }
767
768 #[::fuchsia::test]
769 async fn test_multiple_events() {
770 spawn_kernel_and_run(async |locked, current_task| {
771 let (client1, server1) = zx::Socket::create_stream();
772 let (client2, server2) = zx::Socket::create_stream();
773 let pipe1 = create_fuchsia_pipe(locked, ¤t_task, client1, OpenFlags::RDWR)
774 .expect("create_fuchsia_pipe");
775 let pipe2 = create_fuchsia_pipe(locked, ¤t_task, client2, OpenFlags::RDWR)
776 .expect("create_fuchsia_pipe");
777 let server1_zxio = Zxio::create(server1.into_handle()).expect("Zxio::create");
778 let server2_zxio = Zxio::create(server2.into_handle()).expect("Zxio::create");
779
780 let poll = |locked: &mut Locked<Unlocked>| {
781 let epoll_object = EpollFileObject::new_file(locked, ¤t_task);
782 let epoll_file = epoll_object.downcast_file::<EpollFileObject>().unwrap();
783 epoll_file
784 .add(
785 locked,
786 ¤t_task,
787 &pipe1,
788 &epoll_object,
789 EpollEvent::new(FdEvents::POLLIN, 1),
790 )
791 .expect("epoll_file.add");
792 epoll_file
793 .add(
794 locked,
795 ¤t_task,
796 &pipe2,
797 &epoll_object,
798 EpollEvent::new(FdEvents::POLLIN, 2),
799 )
800 .expect("epoll_file.add");
801 epoll_file.wait(locked, ¤t_task, 2, zx::MonotonicInstant::ZERO).expect("wait")
802 };
803
804 let fds = poll(locked);
805 assert!(fds.is_empty());
806
807 assert_eq!(server1_zxio.write(&[0]).expect("write"), 1);
808
809 let fds = poll(locked);
810 assert_eq!(fds.len(), 1);
811 assert_eq!(fds[0].events(), FdEvents::POLLIN);
812 assert_eq!(fds[0].data(), 1);
813 assert_eq!(
814 pipe1.read(locked, ¤t_task, &mut VecOutputBuffer::new(64)).expect("read"),
815 1
816 );
817
818 let fds = poll(locked);
819 assert!(fds.is_empty());
820
821 assert_eq!(server2_zxio.write(&[0]).expect("write"), 1);
822
823 let fds = poll(locked);
824 assert_eq!(fds.len(), 1);
825 assert_eq!(fds[0].events(), FdEvents::POLLIN);
826 assert_eq!(fds[0].data(), 2);
827 assert_eq!(
828 pipe2.read(locked, ¤t_task, &mut VecOutputBuffer::new(64)).expect("read"),
829 1
830 );
831
832 let fds = poll(locked);
833 assert!(fds.is_empty());
834 })
835 .await;
836 }
837
838 #[::fuchsia::test]
839 async fn test_cancel_after_notify() {
840 spawn_kernel_and_run(async |locked, current_task| {
841 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
842 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
843 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
844
845 const EVENT_DATA: u64 = 42;
847 epoll_file
848 .add(
849 locked,
850 ¤t_task,
851 &event,
852 &epoll_file_handle,
853 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
854 )
855 .unwrap();
856
857 let add_val = 1u64;
859 assert_eq!(
860 event
861 .write(locked, ¤t_task, &mut VecInputBuffer::new(&add_val.to_ne_bytes()))
862 .unwrap(),
863 std::mem::size_of::<u64>()
864 );
865
866 assert_eq!(
867 epoll_file
868 .wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO)
869 .unwrap()
870 .len(),
871 1
872 );
873
874 epoll_file.delete(&event).unwrap();
876
877 assert_eq!(
879 epoll_file
880 .wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO)
881 .unwrap()
882 .len(),
883 0
884 );
885 })
887 .await;
888 }
889
890 #[::fuchsia::test]
891 async fn test_add_then_modify() {
892 spawn_kernel_and_run(async |locked, current_task| {
893 let (socket1, _socket2) = UnixSocket::new_pair(
894 locked,
895 ¤t_task,
896 SocketDomain::Unix,
897 SocketType::Stream,
898 OpenFlags::RDWR,
899 )
900 .expect("Failed to create socket pair.");
901
902 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
903 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
904
905 const EVENT_DATA: u64 = 42;
906 epoll_file
907 .add(
908 locked,
909 ¤t_task,
910 &socket1,
911 &epoll_file_handle,
912 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
913 )
914 .unwrap();
915 assert_eq!(
916 epoll_file
917 .wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO)
918 .unwrap()
919 .len(),
920 0
921 );
922
923 let read_write_event = FdEvents::POLLIN | FdEvents::POLLOUT;
924 epoll_file
925 .modify(
926 locked,
927 ¤t_task,
928 &socket1,
929 EpollEvent::new(read_write_event, EVENT_DATA),
930 )
931 .unwrap();
932 let triggered_events =
933 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO).unwrap();
934 assert_eq!(1, triggered_events.len());
935 let event = &triggered_events[0];
936 assert_eq!(event.events(), FdEvents::POLLOUT);
937 assert_eq!(event.data(), EVENT_DATA);
938 })
939 .await;
940 }
941
942 #[::fuchsia::test]
943 async fn test_waiter_removal() {
944 spawn_kernel_and_run(async |locked, current_task| {
945 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
946 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
947 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
948
949 const EVENT_DATA: u64 = 42;
950 epoll_file
951 .add(
952 locked,
953 ¤t_task,
954 &event,
955 &epoll_file_handle,
956 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
957 )
958 .unwrap();
959
960 std::mem::drop(event);
961
962 assert!(epoll_file.waitable_state.lock().waiters.is_empty());
963 })
964 .await;
965 }
966}