1use crate::power::WakeupSourceOrigin;
6use crate::task::{
7 CurrentTask, EventHandler, ReadyItem, ReadyItemKey, WaitCanceler, WaitQueue, Waiter,
8};
9use crate::vfs::{
10 Anon, FileHandle, FileObject, FileObjectState, FileOps, WeakFileHandle, fileops_impl_dataless,
11 fileops_impl_nonseekable, fileops_impl_noop_sync,
12};
13use itertools::Itertools;
14use starnix_logging::log_warn;
15use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex};
16use starnix_uapi::error;
17use starnix_uapi::errors::{EINTR, ETIMEDOUT, Errno};
18use starnix_uapi::open_flags::OpenFlags;
19use starnix_uapi::vfs::{EpollEvent, FdEvents};
20use std::collections::hash_map::Entry;
21use std::collections::{HashMap, VecDeque};
22use std::sync::Arc;
23
24const MAX_NESTED_DEPTH: u32 = 5;
27
28struct WaitObject {
34 target: WeakFileHandle,
35 events: FdEvents,
36 data: u64,
37 wait_canceler: Option<WaitCanceler>,
38}
39
40impl WaitObject {
41 fn target(&self) -> Option<FileHandle> {
46 self.target.upgrade()
47 }
48}
49
50pub type EpollKey = usize;
53
54pub fn wakeup_source_name_for_epoll(current_task: &CurrentTask, key: EpollKey) -> String {
55 format!("[{}] {}", current_task.command(), key)
56}
57
58#[derive(Default)]
61pub struct EpollFileObject {
62 waiter: Waiter,
63 state: Mutex<EpollState>,
65 waitable_state: Arc<Mutex<EpollWaitableState>>,
66}
67
68#[derive(Default)]
69struct EpollState {
70 wait_objects: HashMap<ReadyItemKey, WaitObject>,
73 processing_list: VecDeque<ReadyItem>,
81 recheck_list: Vec<ReadyItemKey>,
87}
88
89#[derive(Default)]
90struct EpollWaitableState {
91 trigger_list: VecDeque<ReadyItem>,
94 waiters: WaitQueue,
97}
98
99impl EpollFileObject {
100 pub fn new_file<L>(locked: &mut Locked<L>, current_task: &CurrentTask) -> FileHandle
102 where
103 L: LockEqualOrBefore<FileOpsCore>,
104 {
105 let epoll = Box::new(EpollFileObject::default());
106
107 #[cfg(any(test, debug_assertions))]
108 {
109 let _l1 = epoll.state.lock();
110 let _l2 = epoll.waitable_state.lock();
111 }
112
113 Anon::new_private_file(locked, current_task, epoll, OpenFlags::RDWR, "[eventpoll]")
114 }
115
116 fn wait_on_file<L>(
117 &self,
118 locked: &mut Locked<L>,
119 current_task: &CurrentTask,
120 key: ReadyItemKey,
121 wait_object: &mut WaitObject,
122 ) -> Result<(), Errno>
123 where
124 L: LockEqualOrBefore<FileOpsCore>,
125 {
126 self.wait_on_file_edge_triggered(locked, current_task, key, wait_object)?;
128
129 self.do_recheck(locked, current_task, wait_object, key)?;
130
131 Ok(())
132 }
133
134 fn do_recheck<L>(
135 &self,
136 locked: &mut Locked<L>,
137 current_task: &CurrentTask,
138 wait_object: &mut WaitObject,
139 key: ReadyItemKey,
140 ) -> Result<(), Errno>
141 where
142 L: LockEqualOrBefore<FileOpsCore>,
143 {
144 let Some(target) = wait_object.target() else { return Ok(()) };
145 let events = target.query_events(locked, current_task)?;
146 if !(events & wait_object.events).is_empty() {
147 self.waiter.wake_immediately(events, self.new_wait_handler(key));
148 if let Some(wait_canceler) = wait_object.wait_canceler.take() {
149 wait_canceler.cancel();
150 } else {
151 log_warn!("wait canceler should have been set by `wait_on_file_edge_triggered`");
152 }
153 }
154 Ok(())
155 }
156
157 fn wait_on_file_edge_triggered<L>(
158 &self,
159 locked: &mut Locked<L>,
160 current_task: &CurrentTask,
161 key: ReadyItemKey,
162 wait_object: &mut WaitObject,
163 ) -> Result<(), Errno>
164 where
165 L: LockEqualOrBefore<FileOpsCore>,
166 {
167 let Some(target) = wait_object.target() else {
168 return Ok(());
169 };
170
171 wait_object.wait_canceler = target.wait_async(
172 locked,
173 current_task,
174 &self.waiter,
175 wait_object.events,
176 self.new_wait_handler(key),
177 );
178 if wait_object.wait_canceler.is_none() {
179 return error!(EPERM);
180 }
181 Ok(())
182 }
183
184 fn check_eloop(&self, parent: &FileHandle, depth_left: u32) -> Result<(), Errno> {
187 if depth_left == 0 {
188 return error!(ELOOP);
189 }
190
191 let state = self.state.lock();
192 for nested_object in state.wait_objects.values() {
193 let Some(child) = nested_object.target() else {
194 continue;
195 };
196 let Some(child_file) = child.downcast_file::<EpollFileObject>() else {
197 continue;
198 };
199
200 if Arc::ptr_eq(&child, parent) {
201 return error!(ELOOP);
202 }
203 child_file.check_eloop(parent, depth_left - 1)?;
204 }
205
206 Ok(())
207 }
208
209 pub fn add<L>(
211 &self,
212 locked: &mut Locked<L>,
213 current_task: &CurrentTask,
214 file: &FileHandle,
215 epoll_file_handle: &FileHandle,
216 epoll_event: EpollEvent,
217 ) -> Result<(), Errno>
218 where
219 L: LockEqualOrBefore<FileOpsCore>,
220 {
221 if let Some(epoll_to_add) = file.downcast_file::<EpollFileObject>() {
223 epoll_to_add.check_eloop(epoll_file_handle, MAX_NESTED_DEPTH - 1)?;
226 }
227
228 let mut state = self.state.lock();
229 let key = file.id.as_epoll_key().into();
230 match state.wait_objects.entry(key) {
231 Entry::Occupied(_) => error!(EEXIST),
232 Entry::Vacant(entry) => {
233 let wait_object = entry.insert(WaitObject {
234 target: Arc::downgrade(file),
235 events: epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR,
236 data: epoll_event.data(),
237 wait_canceler: None,
238 });
239 self.wait_on_file(locked, current_task, key, wait_object)
240 }
241 }
242 }
243
244 pub fn modify<L>(
246 &self,
247 locked: &mut Locked<L>,
248 current_task: &CurrentTask,
249 file: &FileHandle,
250 epoll_event: EpollEvent,
251 ) -> Result<(), Errno>
252 where
253 L: LockEqualOrBefore<FileOpsCore>,
254 {
255 let mut state = self.state.lock();
256 let key = file.id.as_epoll_key();
257 state.recheck_list.retain(|x| *x != key.into());
258 let Some(wait_object) = state.wait_objects.get_mut(&key.into()) else {
259 return error!(ENOENT);
260 };
261 if let Some(wait_canceler) = wait_object.wait_canceler.take() {
262 wait_canceler.cancel();
263 }
264 wait_object.events = epoll_event.events() | FdEvents::POLLHUP | FdEvents::POLLERR;
265 wait_object.data = epoll_event.data();
266 if wait_object.events.contains(FdEvents::EPOLLWAKEUP)
270 && !epoll_event.events().contains(FdEvents::EPOLLWAKEUP)
271 {
272 current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
273 &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key)),
274 );
275 }
276 self.wait_on_file(locked, current_task, key.into(), wait_object)
277 }
278
279 pub fn delete(&self, current_task: &CurrentTask, file: &FileObject) -> Result<(), Errno> {
282 let mut state = self.state.lock();
283 let key = file.id.as_epoll_key().into();
284 let ReadyItemKey::Usize(key_usize) = key else {
285 return error!(EINVAL);
287 };
288 if let Some(mut wait_object) = state.wait_objects.remove(&key) {
289 if let Some(wait_canceler) = wait_object.wait_canceler.take() {
290 wait_canceler.cancel();
291 }
292 state.recheck_list.retain(|x| *x != key);
293 current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
295 &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key_usize)),
296 );
297 Ok(())
298 } else {
299 error!(ENOENT)
300 }
301 }
302
303 fn process_triggered_events<L>(
311 &self,
312 locked: &mut Locked<L>,
313 current_task: &CurrentTask,
314 pending_list: &mut Vec<ReadyItem>,
315 max_events: usize,
316 ) -> Result<(), Errno>
317 where
318 L: LockEqualOrBefore<FileOpsCore>,
319 {
320 let mut state = self.state.lock();
321 state.processing_list.append(&mut self.waitable_state.lock().trigger_list);
326 while pending_list.len() < max_events && !state.processing_list.is_empty() {
327 if let Some(pending) = state.processing_list.pop_front() {
328 if let Some(wait) = state.wait_objects.get_mut(&pending.key) {
329 if let Some(target) = wait.target.upgrade() {
333 let ready = ReadyItem {
334 key: pending.key,
335 events: target.query_events(locked, current_task)?,
336 };
337 if ready.events.intersects(wait.events) {
338 pending_list.push(ready);
339 } else {
340 self.wait_on_file(locked, current_task, pending.key, wait)?;
342 }
343 }
344 }
345 }
346 }
347 Ok(())
348 }
349
350 fn wait_until_pending_event<L>(
353 &self,
354 locked: &mut Locked<L>,
355 current_task: &CurrentTask,
356 max_events: usize,
357 mut wait_deadline: zx::MonotonicInstant,
358 ) -> Result<Vec<ReadyItem>, Errno>
359 where
360 L: LockEqualOrBefore<FileOpsCore>,
361 {
362 let mut pending_list = Vec::new();
363
364 loop {
365 self.process_triggered_events(locked, current_task, &mut pending_list, max_events)?;
366
367 if pending_list.len() == max_events {
368 break; }
370
371 if !pending_list.is_empty() {
372 wait_deadline = zx::MonotonicInstant::ZERO;
376 }
377
378 match self.waiter.wait_until(locked, current_task, wait_deadline) {
392 Err(err) if err == ETIMEDOUT => break,
393 Err(err) if err == EINTR => {
394 debug_assert!(
399 pending_list.is_empty(),
400 "Got EINTR from wait of {}ns with {} items pending.",
401 wait_deadline.into_nanos(),
402 pending_list.len()
403 );
404 return Err(err);
405 }
406 result => result?,
408 }
409 }
410
411 Ok(pending_list)
412 }
413
414 pub fn wait<L>(
416 &self,
417 locked: &mut Locked<L>,
418 current_task: &CurrentTask,
419 max_events: usize,
420 deadline: zx::MonotonicInstant,
421 ) -> Result<Vec<EpollEvent>, Errno>
422 where
423 L: LockEqualOrBefore<FileOpsCore>,
424 {
425 {
426 let mut state = self.state.lock();
427 let recheck_list = std::mem::take(&mut state.recheck_list);
428 for key in recheck_list {
429 if let ReadyItemKey::Usize(key) = key {
430 current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
431 &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, key)),
432 );
433 }
434 let wait_object = state.wait_objects.get_mut(&key).unwrap();
435 self.do_recheck(locked, current_task, wait_object, key)?;
436 }
437 }
438
439 let pending_list =
440 self.wait_until_pending_event(locked, current_task, max_events, deadline)?;
441
442 let mut result = vec![];
445 let mut state = self.state.lock();
446 let state = &mut *state;
447 for pending_event in pending_list.iter().unique_by(|e| e.key) {
448 let Some(wait) = state.wait_objects.get_mut(&pending_event.key) else { continue };
451
452 let reported_events = pending_event.events & wait.events;
453 result.push(EpollEvent::new(reported_events, wait.data));
454
455 if wait.events.contains(FdEvents::EPOLLONESHOT) {
458 continue;
459 }
460
461 self.wait_on_file_edge_triggered(locked, current_task, pending_event.key, wait)?;
462
463 if !wait.events.contains(FdEvents::EPOLLET) {
464 state.recheck_list.push(pending_event.key);
465 }
466
467 if !wait.events.contains(FdEvents::EPOLLET) {
469 if wait.events.contains(FdEvents::EPOLLWAKEUP) {
472 if let ReadyItemKey::Usize(key) = pending_event.key {
473 current_task.kernel().suspend_resume_manager.activate_wakeup_source(
474 WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(
475 current_task,
476 key,
477 )),
478 );
479 }
480 }
481 }
482 }
483
484 Ok(result)
485 }
486}
487
488impl FileOps for EpollFileObject {
489 fileops_impl_nonseekable!();
490 fileops_impl_noop_sync!();
491 fileops_impl_dataless!();
492
493 fn wait_async(
494 &self,
495 _locked: &mut Locked<FileOpsCore>,
496 _file: &FileObject,
497 _current_task: &CurrentTask,
498 waiter: &Waiter,
499 events: FdEvents,
500 handler: EventHandler,
501 ) -> Option<WaitCanceler> {
502 Some(self.waitable_state.lock().waiters.wait_async_fd_events(waiter, events, handler))
503 }
504
505 fn query_events(
506 &self,
507 locked: &mut Locked<FileOpsCore>,
508 _file: &FileObject,
509 current_task: &CurrentTask,
510 ) -> Result<FdEvents, Errno> {
511 let mut events = FdEvents::empty();
512 let state = self.state.lock();
513 if !state.processing_list.is_empty() || !self.waitable_state.lock().trigger_list.is_empty()
514 {
515 events |= FdEvents::POLLIN;
516 } else {
517 for key in &state.recheck_list {
518 let wait_object = state.wait_objects.get(&key).unwrap();
519 let Some(target) = wait_object.target() else { continue };
520 if !(target.query_events(locked, current_task)? & wait_object.events).is_empty() {
521 events |= FdEvents::POLLIN;
522 break;
523 }
524 }
525 }
526 Ok(events)
527 }
528
529 fn close(
530 self: Box<Self>,
531 _locked: &mut Locked<FileOpsCore>,
532 _file: &FileObjectState,
533 current_task: &CurrentTask,
534 ) {
535 let guard = self.state.lock();
536 for (key, _wait_object) in guard.wait_objects.iter() {
537 if let ReadyItemKey::Usize(key) = key {
538 current_task.kernel().suspend_resume_manager.deactivate_wakeup_source(
539 &WakeupSourceOrigin::Epoll(wakeup_source_name_for_epoll(current_task, *key)),
540 );
541 }
542 }
543 }
544}
545
546#[derive(Clone)]
547pub struct EpollEventHandler {
548 key: ReadyItemKey,
549 waitable_state: Arc<Mutex<EpollWaitableState>>,
550}
551
552impl EpollEventHandler {
553 pub fn handle(self, events: FdEvents) {
554 let mut waitable_state = self.waitable_state.lock();
555 waitable_state.trigger_list.push_back(ReadyItem { key: self.key, events });
556 waitable_state.waiters.notify_fd_events(FdEvents::POLLIN);
557 }
558}
559
560impl EpollFileObject {
561 fn new_wait_handler(&self, key: ReadyItemKey) -> EventHandler {
562 EventHandler::Epoll(EpollEventHandler {
563 key,
564 waitable_state: Arc::clone(&self.waitable_state),
565 })
566 }
567}
568
569#[cfg(test)]
570mod tests {
571 use super::*;
572 use crate::fs::fuchsia::create_fuchsia_pipe;
573 use crate::task::Waiter;
574 use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
575 use crate::testing::spawn_kernel_and_run;
576 use crate::vfs::buffers::{VecInputBuffer, VecOutputBuffer};
577 use crate::vfs::eventfd::{EventFdType, new_eventfd};
578 use crate::vfs::fs_registry::FsRegistry;
579 use crate::vfs::pipe::{new_pipe, register_pipe_fs};
580 use crate::vfs::socket::{SocketDomain, SocketType, UnixSocket};
581 use starnix_lifecycle::AtomicCounter;
582 use starnix_sync::Unlocked;
583 use starnix_uapi::vfs::{EpollEvent, FdEvents};
584 use syncio::Zxio;
585
586 #[::fuchsia::test]
587 async fn test_epoll_read_ready() {
588 static WRITE_COUNT: AtomicCounter<usize> = AtomicCounter::<usize>::new_const(0);
589 const EVENT_DATA: u64 = 42;
590
591 spawn_kernel_and_run(async |locked, current_task| {
592 let kernel = current_task.kernel();
593 register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
594
595 let (pipe_out, pipe_in) = new_pipe(locked, ¤t_task).unwrap();
596
597 let test_string = "hello starnix".to_string();
598 let test_len = test_string.len();
599
600 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
601 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
602 epoll_file
603 .add(
604 locked,
605 ¤t_task,
606 &pipe_out,
607 &epoll_file_handle,
608 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
609 )
610 .unwrap();
611
612 let (sender, receiver) = std::sync::mpsc::channel();
613 let value = test_string.clone();
614 let closure = move |locked: &mut Locked<Unlocked>, task: &CurrentTask| {
615 let bytes_written = pipe_in
616 .write(locked, &task, &mut VecInputBuffer::new(value.as_bytes()))
617 .unwrap();
618 assert_eq!(bytes_written, test_len);
619 WRITE_COUNT.add(bytes_written);
620 sender.send(()).unwrap();
621 };
622 let req = SpawnRequestBuilder::new().with_sync_closure(closure).build();
623 kernel.kthreads.spawner().spawn_from_request(req);
624 let events =
625 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
626 receiver.recv().unwrap();
627 assert_eq!(1, events.len());
628 let event = &events[0];
629 assert!(event.events().contains(FdEvents::POLLIN));
630 assert_eq!(event.data(), EVENT_DATA);
631
632 let mut buffer = VecOutputBuffer::new(test_len);
633 let bytes_read = pipe_out.read(locked, ¤t_task, &mut buffer).unwrap();
634 assert_eq!(bytes_read, WRITE_COUNT.get());
635 assert_eq!(bytes_read, test_len);
636 assert_eq!(buffer.data(), test_string.as_bytes());
637 })
638 .await;
639 }
640
641 #[::fuchsia::test]
642 async fn test_epoll_ready_then_wait() {
643 const EVENT_DATA: u64 = 42;
644
645 spawn_kernel_and_run(async |locked, current_task| {
646 let kernel = current_task.kernel();
647 register_pipe_fs(kernel.expando.get::<FsRegistry>().as_ref());
648
649 let (pipe_out, pipe_in) = new_pipe(locked, ¤t_task).unwrap();
650
651 let test_string = "hello starnix".to_string();
652 let test_bytes = test_string.as_bytes();
653 let test_len = test_bytes.len();
654
655 assert_eq!(
656 pipe_in.write(locked, ¤t_task, &mut VecInputBuffer::new(test_bytes)).unwrap(),
657 test_bytes.len()
658 );
659
660 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
661 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
662 epoll_file
663 .add(
664 locked,
665 ¤t_task,
666 &pipe_out,
667 &epoll_file_handle,
668 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
669 )
670 .unwrap();
671
672 let events =
673 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::INFINITE).unwrap();
674 assert_eq!(1, events.len());
675 let event = &events[0];
676 assert!(event.events().contains(FdEvents::POLLIN));
677 assert_eq!(event.data(), EVENT_DATA);
678
679 let mut buffer = VecOutputBuffer::new(test_len);
680 let bytes_read = pipe_out.read(locked, ¤t_task, &mut buffer).unwrap();
681 assert_eq!(bytes_read, test_len);
682 assert_eq!(buffer.data(), test_bytes);
683 })
684 .await;
685 }
686
687 #[::fuchsia::test]
688 async fn test_epoll_ctl_cancel() {
689 spawn_kernel_and_run(async |locked, current_task| {
690 for do_cancel in [true, false] {
691 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
692 let waiter = Waiter::new();
693
694 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
695 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
696 const EVENT_DATA: u64 = 42;
697 epoll_file
698 .add(
699 locked,
700 ¤t_task,
701 &event,
702 &epoll_file_handle,
703 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
704 )
705 .unwrap();
706
707 if do_cancel {
708 epoll_file.delete(¤t_task, &event).unwrap();
709 }
710
711 let wait_canceler = event
712 .wait_async(
713 locked,
714 ¤t_task,
715 &waiter,
716 FdEvents::POLLIN,
717 EventHandler::None,
718 )
719 .expect("wait_async");
720 if do_cancel {
721 wait_canceler.cancel();
722 }
723
724 let add_val = 1u64;
725
726 assert_eq!(
727 event
728 .write(
729 locked,
730 ¤t_task,
731 &mut VecInputBuffer::new(&add_val.to_ne_bytes())
732 )
733 .unwrap(),
734 std::mem::size_of::<u64>()
735 );
736
737 let events =
738 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO).unwrap();
739
740 if do_cancel {
741 assert_eq!(0, events.len());
742 } else {
743 assert_eq!(1, events.len());
744 let event = &events[0];
745 assert!(event.events().contains(FdEvents::POLLIN));
746 assert_eq!(event.data(), EVENT_DATA);
747 }
748 }
749 })
750 .await;
751 }
752
753 #[::fuchsia::test]
754 async fn test_multiple_events() {
755 spawn_kernel_and_run(async |locked, current_task| {
756 let (client1, server1) = zx::Socket::create_stream();
757 let (client2, server2) = zx::Socket::create_stream();
758 let pipe1 = create_fuchsia_pipe(locked, ¤t_task, client1, OpenFlags::RDWR)
759 .expect("create_fuchsia_pipe");
760 let pipe2 = create_fuchsia_pipe(locked, ¤t_task, client2, OpenFlags::RDWR)
761 .expect("create_fuchsia_pipe");
762 let server1_zxio = Zxio::create(server1.into_handle()).expect("Zxio::create");
763 let server2_zxio = Zxio::create(server2.into_handle()).expect("Zxio::create");
764
765 let poll = |locked: &mut Locked<Unlocked>| {
766 let epoll_object = EpollFileObject::new_file(locked, ¤t_task);
767 let epoll_file = epoll_object.downcast_file::<EpollFileObject>().unwrap();
768 epoll_file
769 .add(
770 locked,
771 ¤t_task,
772 &pipe1,
773 &epoll_object,
774 EpollEvent::new(FdEvents::POLLIN, 1),
775 )
776 .expect("epoll_file.add");
777 epoll_file
778 .add(
779 locked,
780 ¤t_task,
781 &pipe2,
782 &epoll_object,
783 EpollEvent::new(FdEvents::POLLIN, 2),
784 )
785 .expect("epoll_file.add");
786 epoll_file.wait(locked, ¤t_task, 2, zx::MonotonicInstant::ZERO).expect("wait")
787 };
788
789 let fds = poll(locked);
790 assert!(fds.is_empty());
791
792 assert_eq!(server1_zxio.write(&[0]).expect("write"), 1);
793
794 let fds = poll(locked);
795 assert_eq!(fds.len(), 1);
796 assert_eq!(fds[0].events(), FdEvents::POLLIN);
797 assert_eq!(fds[0].data(), 1);
798 assert_eq!(
799 pipe1.read(locked, ¤t_task, &mut VecOutputBuffer::new(64)).expect("read"),
800 1
801 );
802
803 let fds = poll(locked);
804 assert!(fds.is_empty());
805
806 assert_eq!(server2_zxio.write(&[0]).expect("write"), 1);
807
808 let fds = poll(locked);
809 assert_eq!(fds.len(), 1);
810 assert_eq!(fds[0].events(), FdEvents::POLLIN);
811 assert_eq!(fds[0].data(), 2);
812 assert_eq!(
813 pipe2.read(locked, ¤t_task, &mut VecOutputBuffer::new(64)).expect("read"),
814 1
815 );
816
817 let fds = poll(locked);
818 assert!(fds.is_empty());
819 })
820 .await;
821 }
822
823 #[::fuchsia::test]
824 async fn test_cancel_after_notify() {
825 spawn_kernel_and_run(async |locked, current_task| {
826 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
827 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
828 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
829
830 const EVENT_DATA: u64 = 42;
832 epoll_file
833 .add(
834 locked,
835 ¤t_task,
836 &event,
837 &epoll_file_handle,
838 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
839 )
840 .unwrap();
841
842 let add_val = 1u64;
844 assert_eq!(
845 event
846 .write(locked, ¤t_task, &mut VecInputBuffer::new(&add_val.to_ne_bytes()))
847 .unwrap(),
848 std::mem::size_of::<u64>()
849 );
850
851 assert_eq!(
852 epoll_file
853 .wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO)
854 .unwrap()
855 .len(),
856 1
857 );
858
859 epoll_file.delete(¤t_task, &event).unwrap();
861
862 assert_eq!(
864 epoll_file
865 .wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO)
866 .unwrap()
867 .len(),
868 0
869 );
870 })
872 .await;
873 }
874
875 #[::fuchsia::test]
876 async fn test_add_then_modify() {
877 spawn_kernel_and_run(async |locked, current_task| {
878 let (socket1, _socket2) = UnixSocket::new_pair(
879 locked,
880 ¤t_task,
881 SocketDomain::Unix,
882 SocketType::Stream,
883 OpenFlags::RDWR,
884 )
885 .expect("Failed to create socket pair.");
886
887 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
888 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
889
890 const EVENT_DATA: u64 = 42;
891 epoll_file
892 .add(
893 locked,
894 ¤t_task,
895 &socket1,
896 &epoll_file_handle,
897 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
898 )
899 .unwrap();
900 assert_eq!(
901 epoll_file
902 .wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO)
903 .unwrap()
904 .len(),
905 0
906 );
907
908 let read_write_event = FdEvents::POLLIN | FdEvents::POLLOUT;
909 epoll_file
910 .modify(
911 locked,
912 ¤t_task,
913 &socket1,
914 EpollEvent::new(read_write_event, EVENT_DATA),
915 )
916 .unwrap();
917 let triggered_events =
918 epoll_file.wait(locked, ¤t_task, 10, zx::MonotonicInstant::ZERO).unwrap();
919 assert_eq!(1, triggered_events.len());
920 let event = &triggered_events[0];
921 assert_eq!(event.events(), FdEvents::POLLOUT);
922 assert_eq!(event.data(), EVENT_DATA);
923 })
924 .await;
925 }
926
927 #[::fuchsia::test]
928 async fn test_waiter_removal() {
929 spawn_kernel_and_run(async |locked, current_task| {
930 let event = new_eventfd(locked, ¤t_task, 0, EventFdType::Counter, true);
931 let epoll_file_handle = EpollFileObject::new_file(locked, ¤t_task);
932 let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap();
933
934 const EVENT_DATA: u64 = 42;
935 epoll_file
936 .add(
937 locked,
938 ¤t_task,
939 &event,
940 &epoll_file_handle,
941 EpollEvent::new(FdEvents::POLLIN, EVENT_DATA),
942 )
943 .unwrap();
944
945 std::mem::drop(event);
946
947 assert!(epoll_file.waitable_state.lock().waiters.is_empty());
948 })
949 .await;
950 }
951}