1use crate::mm::{
6 DesiredAddress, IOVecPtr, MappingName, MappingOptions, MemoryAccessorExt, ProtectionFlags,
7 TaskMemoryAccessor,
8};
9use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
10use crate::task::{CurrentTask, SimpleWaiter, Task, WaitQueue};
11use crate::vfs::eventfd::EventFdFileObject;
12use crate::vfs::syscalls::IocbPtr;
13use crate::vfs::{
14 FdNumber, FileHandle, InputBuffer, OutputBuffer, UserBuffersInputBuffer,
15 UserBuffersOutputBuffer, VecInputBuffer, VecOutputBuffer, WeakFileHandle,
16 checked_add_offset_and_length,
17};
18use smallvec::smallvec;
19use starnix_logging::track_stub;
20use starnix_sync::{
21 AioEventsLock, AioPendingOperationsLock, InterruptibleEvent, LockDepMutex, Locked, Unlocked,
22};
23use starnix_syscalls::SyscallResult;
24use starnix_types::user_buffer::{UserBuffer, UserBuffers};
25use starnix_uapi::errors::{EINTR, ETIMEDOUT, Errno};
26use starnix_uapi::{
27 IOCB_CMD_PREAD, IOCB_CMD_PREADV, IOCB_CMD_PWRITE, IOCB_CMD_PWRITEV, IOCB_FLAG_RESFD,
28 aio_context_t, errno, error, io_event, iocb,
29};
30use std::collections::VecDeque;
31use std::sync::{Arc, Weak};
32use zerocopy::IntoBytes;
33
34const AIO_RING_SIZE: usize = 32;
36
37pub struct AioContext {
40 inner: Arc<AioContextInner>,
41}
42
43impl AioContext {
44 pub fn create(
45 current_task: &CurrentTask,
46 max_operations: usize,
47 ) -> Result<aio_context_t, Errno> {
48 let context = Arc::new(AioContext { inner: AioContextInner::new(max_operations) });
49 context.inner.spawn_worker_for(current_task, WorkerType::Read);
50 context.inner.spawn_worker_for(current_task, WorkerType::Write);
51 let context_addr = current_task.mm()?.map_anonymous(
52 DesiredAddress::Any,
53 AIO_RING_SIZE,
54 ProtectionFlags::READ | ProtectionFlags::WRITE,
55 MappingOptions::ANONYMOUS | MappingOptions::DONT_EXPAND,
56 MappingName::AioContext(context),
57 )?;
58 Ok(context_addr.ptr() as aio_context_t)
59 }
60
61 pub fn get_events(
62 &self,
63 current_task: &CurrentTask,
64 min_results: usize,
65 max_results: usize,
66 deadline: zx::MonotonicInstant,
67 ) -> Result<Vec<io_event>, Errno> {
68 self.inner.get_events(current_task, min_results, max_results, deadline)
69 }
70
71 pub fn submit(
72 self: &Arc<Self>,
73 current_task: &CurrentTask,
74 control_block: iocb,
75 iocb_addr: IocbPtr,
76 ) -> Result<(), Errno> {
77 self.inner.submit(current_task, control_block, iocb_addr)
78 }
79
80 pub fn cancel(
81 self: &Arc<Self>,
82 _current_task: &CurrentTask,
83 control_block: iocb,
84 iocb_addr: IocbPtr,
85 ) -> Result<(), Errno> {
86 self.inner.cancel(control_block, iocb_addr)
87 }
88}
89
90impl std::fmt::Debug for AioContext {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 f.debug_struct("AioContext").finish()
93 }
94}
95
96impl std::cmp::PartialEq for AioContext {
97 fn eq(&self, other: &AioContext) -> bool {
98 Arc::ptr_eq(&self.inner, &other.inner)
99 }
100}
101
102impl std::cmp::Eq for AioContext {}
103
104impl Drop for AioContext {
105 fn drop(&mut self) {
106 self.inner.stop();
107 }
108}
109
110struct AioContextInner {
111 operations: OperationQueue,
112 results: ResultQueue,
113}
114
115impl AioContextInner {
116 fn new(max_operations: usize) -> Arc<Self> {
117 Arc::new(Self {
118 operations: OperationQueue::new(max_operations),
119 results: Default::default(),
120 })
121 }
122
123 fn stop(&self) {
124 self.operations.stop();
125 }
126
127 fn get_events(
128 &self,
129 current_task: &CurrentTask,
130 min_results: usize,
131 max_results: usize,
132 deadline: zx::MonotonicInstant,
133 ) -> Result<Vec<io_event>, Errno> {
134 let mut events = self.results.dequeue(max_results);
135 if events.len() >= min_results {
136 return Ok(events);
137 }
138 let event = InterruptibleEvent::new();
139 loop {
140 let (mut waiter, guard) = SimpleWaiter::new(&event);
141 self.results.waiters.wait_async_simple(&mut waiter);
142 events.extend(self.results.dequeue(max_results - events.len()));
143 if events.len() >= min_results {
144 return Ok(events);
145 }
146 match current_task.block_until(guard, deadline) {
147 Err(err) if err == ETIMEDOUT => {
148 return Ok(events);
149 }
150 Err(err) if err == EINTR => {
151 if events.is_empty() {
152 Err(err)
153 } else {
154 return Ok(events);
155 }
156 }
157 result => result,
158 }?;
159 }
160 }
161
162 fn submit(
163 self: &Arc<Self>,
164 current_task: &CurrentTask,
165 control_block: iocb,
166 iocb_addr: IocbPtr,
167 ) -> Result<(), Errno> {
168 let op = IoOperation::new(current_task, control_block, iocb_addr)?;
169 self.operations.enqueue(op)
170 }
171
172 fn cancel(self: &Arc<Self>, control_block: iocb, iocb_addr: IocbPtr) -> Result<(), Errno> {
173 let op_type: OpType = (control_block.aio_lio_opcode as u32).try_into()?;
174 self.operations.remove(op_type.worker_type(), iocb_addr)
175 }
176
177 fn spawn_worker_for(self: &Arc<Self>, current_task: &CurrentTask, worker_type: WorkerType) {
178 let creds = current_task.current_creds().clone();
179 let inner = self.clone();
180 let closure = move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
181 current_task.override_creds(creds, || {
182 inner.perform_next_action(locked, current_task, worker_type)
183 })
184 };
185 let req = SpawnRequestBuilder::new()
186 .with_debug_name("aio-worker")
187 .with_sync_closure(closure)
188 .build();
189 current_task.kernel().kthreads.spawner().spawn_from_request(req);
190 }
191
192 fn perform_next_action(
193 &self,
194 locked: &mut Locked<Unlocked>,
195 current_task: &CurrentTask,
196 worker_type: WorkerType,
197 ) {
198 while let Ok(IoAction::Op(op)) =
199 self.operations.block_until_dequeue(current_task, worker_type)
200 {
201 let Some(result) = op.execute(locked, current_task) else {
202 return;
203 };
204 self.results.enqueue(op.complete(result));
205
206 if let Some(eventfd) = op.eventfd {
207 if let Some(eventfd) = eventfd.upgrade() {
208 let mut input_buffer = VecInputBuffer::new(1u64.as_bytes());
209 let _ = eventfd.write(locked, current_task, &mut input_buffer);
210 }
211 }
212 }
213 }
214}
215
216#[derive(Debug, Clone, Copy)]
217enum WorkerType {
218 Read,
219 Write,
220}
221
222#[derive(Debug, Clone, Copy)]
223enum OpType {
224 PRead,
225 PReadV,
226 PWrite,
231 PWriteV,
232}
233
234impl OpType {
235 fn worker_type(self) -> WorkerType {
236 match self {
237 OpType::PRead | OpType::PReadV => WorkerType::Read,
238 OpType::PWrite | OpType::PWriteV => WorkerType::Write,
239 }
240 }
241}
242
243impl TryFrom<u32> for OpType {
244 type Error = Errno;
245
246 fn try_from(opcode: u32) -> Result<Self, Self::Error> {
247 match opcode {
248 IOCB_CMD_PREAD => Ok(Self::PRead),
249 IOCB_CMD_PREADV => Ok(Self::PReadV),
250 IOCB_CMD_PWRITE => Ok(Self::PWrite),
251 IOCB_CMD_PWRITEV => Ok(Self::PWriteV),
252 _ => {
253 track_stub!(TODO("https://fxbug.dev/297433877"), "io_submit opcode", opcode);
254 return error!(ENOSYS);
255 }
256 }
257 }
258}
259struct IoOperation {
260 op_type: OpType,
261 file: WeakFileHandle,
262 task: Weak<Task>,
263 buffers: UserBuffers,
264 offset: usize,
265 id: u64,
266 iocb_addr: IocbPtr,
267 eventfd: Option<WeakFileHandle>,
268}
269
270impl IoOperation {
271 fn new(
272 current_task: &CurrentTask,
273 control_block: iocb,
274 iocb_addr: IocbPtr,
275 ) -> Result<Self, Errno> {
276 if control_block.aio_reserved2 != 0 {
277 return error!(EINVAL);
278 }
279 let file = current_task.get_file(FdNumber::from_raw(control_block.aio_fildes as i32))?;
280 let op_type = (control_block.aio_lio_opcode as u32).try_into()?;
281 let offset = control_block.aio_offset.try_into().map_err(|_| errno!(EINVAL))?;
282 let flags = control_block.aio_flags;
283
284 match op_type {
285 OpType::PRead | OpType::PReadV => {
286 if !file.can_read() {
287 return error!(EBADF);
288 }
289 }
290 OpType::PWrite | OpType::PWriteV => {
291 if !file.can_write() {
292 return error!(EBADF);
293 }
294 }
295 }
296 let mut buffers = match op_type {
297 OpType::PRead | OpType::PWrite => smallvec![UserBuffer {
298 address: control_block.aio_buf.into(),
299 length: control_block.aio_nbytes as usize,
300 }],
301 OpType::PReadV | OpType::PWriteV => {
302 let iovec_addr = IOVecPtr::new(current_task, control_block.aio_buf);
303 let count: i32 = control_block.aio_nbytes.try_into().map_err(|_| errno!(EINVAL))?;
304 current_task.read_iovec(iovec_addr, count.into())?
305 }
306 };
307
308 let buffer_length = UserBuffer::cap_buffers_to_max_rw_count(
310 current_task.maximum_valid_address().ok_or_else(|| errno!(EINVAL))?,
311 &mut buffers,
312 )?;
313 checked_add_offset_and_length(offset, buffer_length)?;
314
315 let eventfd = if flags & IOCB_FLAG_RESFD != 0 {
316 let eventfd = current_task
317 .running_state()
318 .files
319 .get(FdNumber::from_raw(control_block.aio_resfd as i32))?;
320 if eventfd.downcast_file::<EventFdFileObject>().is_none() {
321 return error!(EINVAL);
322 }
323 Some(Arc::downgrade(&eventfd))
324 } else {
325 None
326 };
327
328 Ok(IoOperation {
329 op_type,
330 file: Arc::downgrade(&file),
331 task: Arc::downgrade(¤t_task.task),
332 buffers,
333 offset,
334 id: control_block.aio_data,
335 iocb_addr,
336 eventfd,
337 })
338 }
339
340 fn execute(
341 &self,
342 locked: &mut Locked<Unlocked>,
343 current_task: &CurrentTask,
344 ) -> Option<Result<SyscallResult, Errno>> {
345 let Some(file) = self.file.upgrade() else {
346 return None;
349 };
350
351 let result = match self.op_type {
352 OpType::PRead | OpType::PReadV => {
353 self.do_read(locked, current_task, file).map(Into::into)
354 }
355 OpType::PWrite | OpType::PWriteV => {
356 self.do_write(locked, current_task, file).map(Into::into)
357 }
358 };
359 Some(result)
360 }
361
362 fn complete(&self, result: Result<SyscallResult, Errno>) -> io_event {
363 let res = match result {
364 Ok(return_value) => return_value.value() as i64,
365 Err(errno) => errno.return_value() as i64,
366 };
367
368 io_event { data: self.id, obj: self.iocb_addr.addr().into(), res, ..Default::default() }
369 }
370
371 fn do_read(
372 &self,
373 locked: &mut Locked<Unlocked>,
374 current_task: &CurrentTask,
375 file: FileHandle,
376 ) -> Result<usize, Errno> {
377 let mut output_buffer = {
378 let task = self.task.upgrade().ok_or_else(|| errno!(EFAULT))?;
379 let sink = UserBuffersOutputBuffer::syscall_new(&task, self.buffers.clone())?;
380 VecOutputBuffer::new(sink.available())
381 };
382
383 file.read_at(locked, current_task, self.offset, &mut output_buffer)?;
384
385 let task = self.task.upgrade().ok_or_else(|| errno!(EFAULT))?;
386 let mut sink = UserBuffersOutputBuffer::syscall_new(&task, self.buffers.clone())?;
387 sink.write(&output_buffer.data())
388 }
389
390 fn do_write(
391 &self,
392 locked: &mut Locked<Unlocked>,
393 current_task: &CurrentTask,
394 file: FileHandle,
395 ) -> Result<usize, Errno> {
396 let mut input_buffer = {
397 let task = self.task.upgrade().ok_or_else(|| errno!(EFAULT))?;
398 let mut source = UserBuffersInputBuffer::syscall_new(&task, self.buffers.clone())?;
399 VecInputBuffer::new(&source.read_all()?)
400 };
401
402 file.write_at(locked, current_task, self.offset, &mut input_buffer)
403 }
404}
405
406enum IoAction {
407 Op(IoOperation),
408 Stop,
409}
410
411#[derive(Default)]
412struct PendingOperations {
413 is_stopped: bool,
414 read_ops: VecDeque<IoOperation>,
418 write_ops: VecDeque<IoOperation>,
419}
420
421impl PendingOperations {
422 fn ops_for(&mut self, worker_type: WorkerType) -> &mut VecDeque<IoOperation> {
423 match worker_type {
424 WorkerType::Read => &mut self.read_ops,
425 WorkerType::Write => &mut self.write_ops,
426 }
427 }
428
429 fn ops_len(&self) -> usize {
430 self.read_ops.len() + self.write_ops.len()
431 }
432}
433
434struct OperationQueue {
435 max_operations: usize,
436 pending: LockDepMutex<PendingOperations, AioPendingOperationsLock>,
437 read_waiters: WaitQueue,
438 write_waiters: WaitQueue,
439}
440
441impl OperationQueue {
442 fn new(max_operations: usize) -> Self {
443 Self {
444 max_operations,
445 pending: LockDepMutex::new(Default::default()),
446 read_waiters: Default::default(),
447 write_waiters: Default::default(),
448 }
449 }
450
451 fn waiters_for(&self, worker_type: WorkerType) -> &WaitQueue {
452 match worker_type {
453 WorkerType::Read => &self.read_waiters,
454 WorkerType::Write => &self.write_waiters,
455 }
456 }
457
458 fn enqueue(&self, op: IoOperation) -> Result<(), Errno> {
459 let worker_type = op.op_type.worker_type();
460 {
461 let mut pending = self.pending.lock();
462 if pending.is_stopped {
463 return error!(EINVAL);
464 }
465 if pending.ops_len() >= self.max_operations {
466 return error!(EAGAIN);
467 }
468 pending.ops_for(worker_type).push_back(op);
469 }
470 self.waiters_for(worker_type).notify_unordered_count(1);
471 Ok(())
472 }
473
474 fn stop(&self) {
475 let mut pending = self.pending.lock();
476 pending.is_stopped = true;
477 pending.read_ops.clear();
478 pending.write_ops.clear();
479 self.read_waiters.notify_all();
480 self.write_waiters.notify_all();
481 }
482
483 fn dequeue(&self, worker_type: WorkerType) -> Option<IoAction> {
484 let mut pending = self.pending.lock();
485 if pending.is_stopped {
486 return Some(IoAction::Stop);
487 }
488 pending.ops_for(worker_type).pop_front().map(IoAction::Op)
489 }
490
491 fn remove(&self, worker_type: WorkerType, iocb_addr: IocbPtr) -> Result<(), Errno> {
492 {
493 let mut pending = self.pending.lock();
494 if pending.is_stopped {
495 return error!(EINVAL);
496 }
497 if let Some(idx) = pending
499 .ops_for(worker_type)
500 .iter()
501 .position(|value| value.iocb_addr.addr() == iocb_addr.addr())
502 {
503 pending.ops_for(worker_type).remove(idx);
504 } else {
505 return error!(EAGAIN);
506 }
507 }
508 Ok(())
509 }
510
511 fn block_until_dequeue(
512 &self,
513 current_task: &CurrentTask,
514 worker_type: WorkerType,
515 ) -> Result<IoAction, Errno> {
516 if let Some(action) = self.dequeue(worker_type) {
517 return Ok(action);
518 }
519 loop {
520 let event = InterruptibleEvent::new();
521 let (mut waiter, guard) = SimpleWaiter::new(&event);
522 self.waiters_for(worker_type).wait_async_simple(&mut waiter);
523 if let Some(action) = self.dequeue(worker_type) {
524 return Ok(action);
525 }
526 current_task.block_until(guard, zx::MonotonicInstant::INFINITE)?;
527 }
528 }
529}
530
531#[derive(Default)]
532struct ResultQueue {
533 waiters: WaitQueue,
534 events: LockDepMutex<VecDeque<io_event>, AioEventsLock>,
535}
536
537impl ResultQueue {
538 fn enqueue(&self, event: io_event) {
539 self.events.lock().push_back(event);
540 self.waiters.notify_unordered_count(1);
541 }
542
543 fn dequeue(&self, limit: usize) -> Vec<io_event> {
544 let mut events = self.events.lock();
545 let len = std::cmp::min(events.len(), limit);
546 events.drain(..len).collect()
547 }
548}