Skip to main content

starnix_core/vfs/
aio.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::mm::{
6    DesiredAddress, IOVecPtr, MappingName, MappingOptions, MemoryAccessorExt, ProtectionFlags,
7    TaskMemoryAccessor,
8};
9use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
10use crate::task::{CurrentTask, SimpleWaiter, Task, WaitQueue};
11use crate::vfs::eventfd::EventFdFileObject;
12use crate::vfs::syscalls::IocbPtr;
13use crate::vfs::{
14    FdNumber, FileHandle, InputBuffer, OutputBuffer, UserBuffersInputBuffer,
15    UserBuffersOutputBuffer, VecInputBuffer, VecOutputBuffer, WeakFileHandle,
16    checked_add_offset_and_length,
17};
18use smallvec::smallvec;
19use starnix_logging::track_stub;
20use starnix_sync::{
21    AioEventsLock, AioPendingOperationsLock, InterruptibleEvent, LockDepMutex, Locked, Unlocked,
22};
23use starnix_syscalls::SyscallResult;
24use starnix_types::user_buffer::{UserBuffer, UserBuffers};
25use starnix_uapi::errors::{EINTR, ETIMEDOUT, Errno};
26use starnix_uapi::{
27    IOCB_CMD_PREAD, IOCB_CMD_PREADV, IOCB_CMD_PWRITE, IOCB_CMD_PWRITEV, IOCB_FLAG_RESFD,
28    aio_context_t, errno, error, io_event, iocb,
29};
30use std::collections::VecDeque;
31use std::sync::{Arc, Weak};
32use zerocopy::IntoBytes;
33
34/// From aio.go in gVisor.
35const AIO_RING_SIZE: usize = 32;
36
37/// Kernel state-machine-based implementation of asynchronous I/O.
38/// See https://man7.org/linux/man-pages/man7/aio.7.html#NOTES
39pub struct AioContext {
40    inner: Arc<AioContextInner>,
41}
42
43impl AioContext {
44    pub fn create(
45        current_task: &CurrentTask,
46        max_operations: usize,
47    ) -> Result<aio_context_t, Errno> {
48        let context = Arc::new(AioContext { inner: AioContextInner::new(max_operations) });
49        context.inner.spawn_worker_for(current_task, WorkerType::Read);
50        context.inner.spawn_worker_for(current_task, WorkerType::Write);
51        let context_addr = current_task.mm()?.map_anonymous(
52            DesiredAddress::Any,
53            AIO_RING_SIZE,
54            ProtectionFlags::READ | ProtectionFlags::WRITE,
55            MappingOptions::ANONYMOUS | MappingOptions::DONT_EXPAND,
56            MappingName::AioContext(context),
57        )?;
58        Ok(context_addr.ptr() as aio_context_t)
59    }
60
61    pub fn get_events(
62        &self,
63        current_task: &CurrentTask,
64        min_results: usize,
65        max_results: usize,
66        deadline: zx::MonotonicInstant,
67    ) -> Result<Vec<io_event>, Errno> {
68        self.inner.get_events(current_task, min_results, max_results, deadline)
69    }
70
71    pub fn submit(
72        self: &Arc<Self>,
73        current_task: &CurrentTask,
74        control_block: iocb,
75        iocb_addr: IocbPtr,
76    ) -> Result<(), Errno> {
77        self.inner.submit(current_task, control_block, iocb_addr)
78    }
79
80    pub fn cancel(
81        self: &Arc<Self>,
82        _current_task: &CurrentTask,
83        control_block: iocb,
84        iocb_addr: IocbPtr,
85    ) -> Result<(), Errno> {
86        self.inner.cancel(control_block, iocb_addr)
87    }
88}
89
90impl std::fmt::Debug for AioContext {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        f.debug_struct("AioContext").finish()
93    }
94}
95
96impl std::cmp::PartialEq for AioContext {
97    fn eq(&self, other: &AioContext) -> bool {
98        Arc::ptr_eq(&self.inner, &other.inner)
99    }
100}
101
102impl std::cmp::Eq for AioContext {}
103
104impl Drop for AioContext {
105    fn drop(&mut self) {
106        self.inner.stop();
107    }
108}
109
110struct AioContextInner {
111    operations: OperationQueue,
112    results: ResultQueue,
113}
114
115impl AioContextInner {
116    fn new(max_operations: usize) -> Arc<Self> {
117        Arc::new(Self {
118            operations: OperationQueue::new(max_operations),
119            results: Default::default(),
120        })
121    }
122
123    fn stop(&self) {
124        self.operations.stop();
125    }
126
127    fn get_events(
128        &self,
129        current_task: &CurrentTask,
130        min_results: usize,
131        max_results: usize,
132        deadline: zx::MonotonicInstant,
133    ) -> Result<Vec<io_event>, Errno> {
134        let mut events = self.results.dequeue(max_results);
135        if events.len() >= min_results {
136            return Ok(events);
137        }
138        let event = InterruptibleEvent::new();
139        loop {
140            let (mut waiter, guard) = SimpleWaiter::new(&event);
141            self.results.waiters.wait_async_simple(&mut waiter);
142            events.extend(self.results.dequeue(max_results - events.len()));
143            if events.len() >= min_results {
144                return Ok(events);
145            }
146            match current_task.block_until(guard, deadline) {
147                Err(err) if err == ETIMEDOUT => {
148                    return Ok(events);
149                }
150                Err(err) if err == EINTR => {
151                    if events.is_empty() {
152                        Err(err)
153                    } else {
154                        return Ok(events);
155                    }
156                }
157                result => result,
158            }?;
159        }
160    }
161
162    fn submit(
163        self: &Arc<Self>,
164        current_task: &CurrentTask,
165        control_block: iocb,
166        iocb_addr: IocbPtr,
167    ) -> Result<(), Errno> {
168        let op = IoOperation::new(current_task, control_block, iocb_addr)?;
169        self.operations.enqueue(op)
170    }
171
172    fn cancel(self: &Arc<Self>, control_block: iocb, iocb_addr: IocbPtr) -> Result<(), Errno> {
173        let op_type: OpType = (control_block.aio_lio_opcode as u32).try_into()?;
174        self.operations.remove(op_type.worker_type(), iocb_addr)
175    }
176
177    fn spawn_worker_for(self: &Arc<Self>, current_task: &CurrentTask, worker_type: WorkerType) {
178        let creds = current_task.current_creds().clone();
179        let inner = self.clone();
180        let closure = move |locked: &mut Locked<Unlocked>, current_task: &CurrentTask| {
181            current_task.override_creds(creds, || {
182                inner.perform_next_action(locked, current_task, worker_type)
183            })
184        };
185        let req = SpawnRequestBuilder::new()
186            .with_debug_name("aio-worker")
187            .with_sync_closure(closure)
188            .build();
189        current_task.kernel().kthreads.spawner().spawn_from_request(req);
190    }
191
192    fn perform_next_action(
193        &self,
194        locked: &mut Locked<Unlocked>,
195        current_task: &CurrentTask,
196        worker_type: WorkerType,
197    ) {
198        while let Ok(IoAction::Op(op)) =
199            self.operations.block_until_dequeue(current_task, worker_type)
200        {
201            let Some(result) = op.execute(locked, current_task) else {
202                return;
203            };
204            self.results.enqueue(op.complete(result));
205
206            if let Some(eventfd) = op.eventfd {
207                if let Some(eventfd) = eventfd.upgrade() {
208                    let mut input_buffer = VecInputBuffer::new(1u64.as_bytes());
209                    let _ = eventfd.write(locked, current_task, &mut input_buffer);
210                }
211            }
212        }
213    }
214}
215
216#[derive(Debug, Clone, Copy)]
217enum WorkerType {
218    Read,
219    Write,
220}
221
222#[derive(Debug, Clone, Copy)]
223enum OpType {
224    PRead,
225    PReadV,
226    // TODO: IOCB_CMD_FSYNC
227    // TODO: IOCB_CMD_FDSYNC
228    // TODO: IOCB_CMD_POLL
229    // TODO: IOCB_CMD_NOOP
230    PWrite,
231    PWriteV,
232}
233
234impl OpType {
235    fn worker_type(self) -> WorkerType {
236        match self {
237            OpType::PRead | OpType::PReadV => WorkerType::Read,
238            OpType::PWrite | OpType::PWriteV => WorkerType::Write,
239        }
240    }
241}
242
243impl TryFrom<u32> for OpType {
244    type Error = Errno;
245
246    fn try_from(opcode: u32) -> Result<Self, Self::Error> {
247        match opcode {
248            IOCB_CMD_PREAD => Ok(Self::PRead),
249            IOCB_CMD_PREADV => Ok(Self::PReadV),
250            IOCB_CMD_PWRITE => Ok(Self::PWrite),
251            IOCB_CMD_PWRITEV => Ok(Self::PWriteV),
252            _ => {
253                track_stub!(TODO("https://fxbug.dev/297433877"), "io_submit opcode", opcode);
254                return error!(ENOSYS);
255            }
256        }
257    }
258}
259struct IoOperation {
260    op_type: OpType,
261    file: WeakFileHandle,
262    task: Weak<Task>,
263    buffers: UserBuffers,
264    offset: usize,
265    id: u64,
266    iocb_addr: IocbPtr,
267    eventfd: Option<WeakFileHandle>,
268}
269
270impl IoOperation {
271    fn new(
272        current_task: &CurrentTask,
273        control_block: iocb,
274        iocb_addr: IocbPtr,
275    ) -> Result<Self, Errno> {
276        if control_block.aio_reserved2 != 0 {
277            return error!(EINVAL);
278        }
279        let file = current_task.get_file(FdNumber::from_raw(control_block.aio_fildes as i32))?;
280        let op_type = (control_block.aio_lio_opcode as u32).try_into()?;
281        let offset = control_block.aio_offset.try_into().map_err(|_| errno!(EINVAL))?;
282        let flags = control_block.aio_flags;
283
284        match op_type {
285            OpType::PRead | OpType::PReadV => {
286                if !file.can_read() {
287                    return error!(EBADF);
288                }
289            }
290            OpType::PWrite | OpType::PWriteV => {
291                if !file.can_write() {
292                    return error!(EBADF);
293                }
294            }
295        }
296        let mut buffers = match op_type {
297            OpType::PRead | OpType::PWrite => smallvec![UserBuffer {
298                address: control_block.aio_buf.into(),
299                length: control_block.aio_nbytes as usize,
300            }],
301            OpType::PReadV | OpType::PWriteV => {
302                let iovec_addr = IOVecPtr::new(current_task, control_block.aio_buf);
303                let count: i32 = control_block.aio_nbytes.try_into().map_err(|_| errno!(EINVAL))?;
304                current_task.read_iovec(iovec_addr, count.into())?
305            }
306        };
307
308        // Validate the user buffers and offset synchronously.
309        let buffer_length = UserBuffer::cap_buffers_to_max_rw_count(
310            current_task.maximum_valid_address().ok_or_else(|| errno!(EINVAL))?,
311            &mut buffers,
312        )?;
313        checked_add_offset_and_length(offset, buffer_length)?;
314
315        let eventfd = if flags & IOCB_FLAG_RESFD != 0 {
316            let eventfd = current_task
317                .running_state()
318                .files
319                .get(FdNumber::from_raw(control_block.aio_resfd as i32))?;
320            if eventfd.downcast_file::<EventFdFileObject>().is_none() {
321                return error!(EINVAL);
322            }
323            Some(Arc::downgrade(&eventfd))
324        } else {
325            None
326        };
327
328        Ok(IoOperation {
329            op_type,
330            file: Arc::downgrade(&file),
331            task: Arc::downgrade(&current_task.task),
332            buffers,
333            offset,
334            id: control_block.aio_data,
335            iocb_addr,
336            eventfd,
337        })
338    }
339
340    fn execute(
341        &self,
342        locked: &mut Locked<Unlocked>,
343        current_task: &CurrentTask,
344    ) -> Option<Result<SyscallResult, Errno>> {
345        let Some(file) = self.file.upgrade() else {
346            // The FileHandle can close while async IO operations are ongoing.
347            // Ignore this operation when this happens.
348            return None;
349        };
350
351        let result = match self.op_type {
352            OpType::PRead | OpType::PReadV => {
353                self.do_read(locked, current_task, file).map(Into::into)
354            }
355            OpType::PWrite | OpType::PWriteV => {
356                self.do_write(locked, current_task, file).map(Into::into)
357            }
358        };
359        Some(result)
360    }
361
362    fn complete(&self, result: Result<SyscallResult, Errno>) -> io_event {
363        let res = match result {
364            Ok(return_value) => return_value.value() as i64,
365            Err(errno) => errno.return_value() as i64,
366        };
367
368        io_event { data: self.id, obj: self.iocb_addr.addr().into(), res, ..Default::default() }
369    }
370
371    fn do_read(
372        &self,
373        locked: &mut Locked<Unlocked>,
374        current_task: &CurrentTask,
375        file: FileHandle,
376    ) -> Result<usize, Errno> {
377        let mut output_buffer = {
378            let task = self.task.upgrade().ok_or_else(|| errno!(EFAULT))?;
379            let sink = UserBuffersOutputBuffer::syscall_new(&task, self.buffers.clone())?;
380            VecOutputBuffer::new(sink.available())
381        };
382
383        file.read_at(locked, current_task, self.offset, &mut output_buffer)?;
384
385        let task = self.task.upgrade().ok_or_else(|| errno!(EFAULT))?;
386        let mut sink = UserBuffersOutputBuffer::syscall_new(&task, self.buffers.clone())?;
387        sink.write(&output_buffer.data())
388    }
389
390    fn do_write(
391        &self,
392        locked: &mut Locked<Unlocked>,
393        current_task: &CurrentTask,
394        file: FileHandle,
395    ) -> Result<usize, Errno> {
396        let mut input_buffer = {
397            let task = self.task.upgrade().ok_or_else(|| errno!(EFAULT))?;
398            let mut source = UserBuffersInputBuffer::syscall_new(&task, self.buffers.clone())?;
399            VecInputBuffer::new(&source.read_all()?)
400        };
401
402        file.write_at(locked, current_task, self.offset, &mut input_buffer)
403    }
404}
405
406enum IoAction {
407    Op(IoOperation),
408    Stop,
409}
410
411#[derive(Default)]
412struct PendingOperations {
413    is_stopped: bool,
414    // We currently queue the read and write operations to separate threads.
415    // That behavior is incorrect, but it keeps our clients working well enough while we work on
416    // getting the correct parallelism.
417    read_ops: VecDeque<IoOperation>,
418    write_ops: VecDeque<IoOperation>,
419}
420
421impl PendingOperations {
422    fn ops_for(&mut self, worker_type: WorkerType) -> &mut VecDeque<IoOperation> {
423        match worker_type {
424            WorkerType::Read => &mut self.read_ops,
425            WorkerType::Write => &mut self.write_ops,
426        }
427    }
428
429    fn ops_len(&self) -> usize {
430        self.read_ops.len() + self.write_ops.len()
431    }
432}
433
434struct OperationQueue {
435    max_operations: usize,
436    pending: LockDepMutex<PendingOperations, AioPendingOperationsLock>,
437    read_waiters: WaitQueue,
438    write_waiters: WaitQueue,
439}
440
441impl OperationQueue {
442    fn new(max_operations: usize) -> Self {
443        Self {
444            max_operations,
445            pending: LockDepMutex::new(Default::default()),
446            read_waiters: Default::default(),
447            write_waiters: Default::default(),
448        }
449    }
450
451    fn waiters_for(&self, worker_type: WorkerType) -> &WaitQueue {
452        match worker_type {
453            WorkerType::Read => &self.read_waiters,
454            WorkerType::Write => &self.write_waiters,
455        }
456    }
457
458    fn enqueue(&self, op: IoOperation) -> Result<(), Errno> {
459        let worker_type = op.op_type.worker_type();
460        {
461            let mut pending = self.pending.lock();
462            if pending.is_stopped {
463                return error!(EINVAL);
464            }
465            if pending.ops_len() >= self.max_operations {
466                return error!(EAGAIN);
467            }
468            pending.ops_for(worker_type).push_back(op);
469        }
470        self.waiters_for(worker_type).notify_unordered_count(1);
471        Ok(())
472    }
473
474    fn stop(&self) {
475        let mut pending = self.pending.lock();
476        pending.is_stopped = true;
477        pending.read_ops.clear();
478        pending.write_ops.clear();
479        self.read_waiters.notify_all();
480        self.write_waiters.notify_all();
481    }
482
483    fn dequeue(&self, worker_type: WorkerType) -> Option<IoAction> {
484        let mut pending = self.pending.lock();
485        if pending.is_stopped {
486            return Some(IoAction::Stop);
487        }
488        pending.ops_for(worker_type).pop_front().map(IoAction::Op)
489    }
490
491    fn remove(&self, worker_type: WorkerType, iocb_addr: IocbPtr) -> Result<(), Errno> {
492        {
493            let mut pending = self.pending.lock();
494            if pending.is_stopped {
495                return error!(EINVAL);
496            }
497            // TODO: Use pop_front_if when available.
498            if let Some(idx) = pending
499                .ops_for(worker_type)
500                .iter()
501                .position(|value| value.iocb_addr.addr() == iocb_addr.addr())
502            {
503                pending.ops_for(worker_type).remove(idx);
504            } else {
505                return error!(EAGAIN);
506            }
507        }
508        Ok(())
509    }
510
511    fn block_until_dequeue(
512        &self,
513        current_task: &CurrentTask,
514        worker_type: WorkerType,
515    ) -> Result<IoAction, Errno> {
516        if let Some(action) = self.dequeue(worker_type) {
517            return Ok(action);
518        }
519        loop {
520            let event = InterruptibleEvent::new();
521            let (mut waiter, guard) = SimpleWaiter::new(&event);
522            self.waiters_for(worker_type).wait_async_simple(&mut waiter);
523            if let Some(action) = self.dequeue(worker_type) {
524                return Ok(action);
525            }
526            current_task.block_until(guard, zx::MonotonicInstant::INFINITE)?;
527        }
528    }
529}
530
531#[derive(Default)]
532struct ResultQueue {
533    waiters: WaitQueue,
534    events: LockDepMutex<VecDeque<io_event>, AioEventsLock>,
535}
536
537impl ResultQueue {
538    fn enqueue(&self, event: io_event) {
539        self.events.lock().push_back(event);
540        self.waiters.notify_unordered_count(1);
541    }
542
543    fn dequeue(&self, limit: usize) -> Vec<io_event> {
544        let mut events = self.events.lock();
545        let len = std::cmp::min(events.len(), limit);
546        events.drain(..len).collect()
547    }
548}