Skip to main content

starnix_core/vfs/
pipe.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::mm::{
6    MemoryAccessorExt, NumberOfElementsRead, PAGE_SIZE, TaskMemoryAccessor, read_to_vec,
7};
8use crate::security;
9use crate::signals::{SignalInfo, send_standard_signal};
10use crate::task::{CurrentTask, EventHandler, WaitCallback, WaitCanceler, WaitQueue, Waiter};
11use crate::vfs::buffers::{
12    Buffer, InputBuffer, InputBufferCallback, MessageData, MessageQueue, OutputBuffer,
13    OutputBufferCallback, PeekBufferSegmentsCallback, PipeMessageData, UserBuffersOutputBuffer,
14};
15use crate::vfs::fs_registry::FsRegistry;
16use crate::vfs::{
17    CacheMode, FileHandle, FileObject, FileObjectState, FileOps, FileSystem, FileSystemHandle,
18    FileSystemOps, FileSystemOptions, FsNodeInfo, FsStr, SpecialNode, default_fcntl, default_ioctl,
19    fileops_impl_nonseekable, fileops_impl_noop_sync,
20};
21use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex, MutexGuard, Unlocked};
22use starnix_syscalls::{SUCCESS, SyscallArg, SyscallResult};
23use starnix_types::user_buffer::{UserBuffer, UserBuffers};
24use starnix_types::vfs::default_statfs;
25use starnix_uapi::auth::CAP_SYS_RESOURCE;
26use starnix_uapi::errors::Errno;
27use starnix_uapi::file_mode::mode;
28use starnix_uapi::open_flags::OpenFlags;
29use starnix_uapi::signals::SIGPIPE;
30use starnix_uapi::user_address::{UserAddress, UserRef};
31use starnix_uapi::vfs::FdEvents;
32use starnix_uapi::{
33    F_GETPIPE_SZ, F_SETPIPE_SZ, FIONREAD, PIPEFS_MAGIC, errno, error, statfs, uapi,
34};
35use std::cmp::Ordering;
36use std::sync::Arc;
37
38const ATOMIC_IO_BYTES: u16 = 4096;
39
40/// The maximum size of a pipe, independent of task capabilities and sysctl limits.
41const PIPE_MAX_SIZE: usize = 1 << 31;
42
43fn round_up(value: usize, increment: usize) -> usize {
44    (value + (increment - 1)) & !(increment - 1)
45}
46
47#[derive(Debug)]
48pub struct Pipe {
49    messages: MessageQueue<PipeMessageData>,
50
51    waiters: WaitQueue,
52
53    /// The number of open readers.
54    reader_count: usize,
55
56    /// Whether the pipe has ever had a reader.
57    had_reader: bool,
58
59    /// The number of open writers.
60    writer_count: usize,
61
62    /// Whether the pipe has ever had a writer.
63    had_writer: bool,
64}
65
66pub type PipeHandle = Arc<Mutex<Pipe>>;
67
68impl Pipe {
69    pub fn new(default_pipe_capacity: usize) -> PipeHandle {
70        Arc::new(Mutex::new(Pipe {
71            messages: MessageQueue::new(default_pipe_capacity),
72            waiters: WaitQueue::default(),
73            reader_count: 0,
74            had_reader: false,
75            writer_count: 0,
76            had_writer: false,
77        }))
78    }
79
80    pub fn open(
81        locked: &mut Locked<Unlocked>,
82        current_task: &CurrentTask,
83        pipe: &Arc<Mutex<Self>>,
84        flags: OpenFlags,
85    ) -> Result<Box<dyn FileOps>, Errno> {
86        let mut events = FdEvents::empty();
87        let mut pipe_locked = pipe.lock();
88        let mut must_wait_events = FdEvents::empty();
89        if flags.can_read() {
90            if !pipe_locked.had_reader {
91                events |= FdEvents::POLLOUT;
92            }
93            pipe_locked.add_reader();
94            if !flags.contains(OpenFlags::NONBLOCK) && !flags.can_write() && !pipe_locked.had_writer
95            {
96                must_wait_events |= FdEvents::POLLIN;
97            }
98        }
99        if flags.can_write() {
100            // https://man7.org/linux/man-pages/man2/open.2.html says:
101            //
102            //  ENXIO  O_NONBLOCK | O_WRONLY is set, the named file is a FIFO,
103            //         and no process has the FIFO open for reading.
104            if flags.contains(OpenFlags::NONBLOCK) && pipe_locked.reader_count == 0 {
105                assert!(!flags.can_read()); // Otherwise we would have called add_reader() above.
106                return error!(ENXIO);
107            }
108            if !pipe_locked.had_writer {
109                events |= FdEvents::POLLIN;
110            }
111            pipe_locked.add_writer();
112            if !flags.contains(OpenFlags::NONBLOCK) && !pipe_locked.had_reader {
113                must_wait_events |= FdEvents::POLLOUT;
114            }
115        }
116        if events != FdEvents::empty() {
117            pipe_locked.waiters.notify_fd_events(events);
118        }
119        let ops = PipeFileObject { pipe: Arc::clone(pipe) };
120        if must_wait_events == FdEvents::empty() {
121            return Ok(Box::new(ops));
122        }
123
124        // Ensures that the new PipeFileObject is closed if is it dropped before being returned.
125        let ops = scopeguard::guard(ops, |ops| {
126            ops.on_close(flags);
127        });
128
129        // Wait for the pipe to be connected.
130        let waiter = Waiter::new();
131        loop {
132            pipe_locked.waiters.wait_async_fd_events(
133                &waiter,
134                must_wait_events,
135                WaitCallback::none(),
136            );
137            std::mem::drop(pipe_locked);
138            match waiter.wait(locked, current_task) {
139                Err(e) => {
140                    return Err(e);
141                }
142                _ => {}
143            }
144            pipe_locked = pipe.lock();
145            if pipe_locked.had_writer && pipe_locked.had_reader {
146                return Ok(Box::new(scopeguard::ScopeGuard::into_inner(ops)));
147            }
148        }
149    }
150
151    /// Increments the reader count for this pipe by 1.
152    pub fn add_reader(&mut self) {
153        self.reader_count += 1;
154        self.had_reader = true;
155    }
156
157    /// Increments the writer count for this pipe by 1.
158    pub fn add_writer(&mut self) {
159        self.writer_count += 1;
160        self.had_writer = true;
161    }
162
163    /// Called whenever a fd to the pipe is closed. Reset the pipe state if there is not more
164    /// reader or writer.
165    pub fn on_close(&mut self) {
166        if self.reader_count == 0 && self.writer_count == 0 {
167            self.had_reader = false;
168            self.had_writer = false;
169            self.messages = MessageQueue::new(self.messages.capacity());
170            self.waiters = WaitQueue::default();
171        }
172    }
173
174    fn is_empty(&self) -> bool {
175        self.messages.is_empty()
176    }
177
178    fn capacity(&self) -> usize {
179        self.messages.capacity()
180    }
181
182    fn set_capacity(
183        &mut self,
184        current_task: &CurrentTask,
185        mut requested_capacity: usize,
186    ) -> Result<(), Errno> {
187        if requested_capacity > PIPE_MAX_SIZE {
188            return error!(EINVAL);
189        }
190        if requested_capacity
191            > current_task
192                .kernel()
193                .system_limits
194                .pipe_max_size
195                .load(std::sync::atomic::Ordering::Relaxed)
196        {
197            security::check_task_capable(current_task, CAP_SYS_RESOURCE)?;
198        }
199        let page_size = *PAGE_SIZE as usize;
200        if requested_capacity < page_size {
201            requested_capacity = page_size;
202        }
203        requested_capacity = round_up(requested_capacity, page_size);
204        self.messages.set_capacity(requested_capacity)
205    }
206
207    fn is_readable(&self) -> bool {
208        !self.is_empty() || (self.writer_count == 0 && self.had_writer)
209    }
210
211    /// Returns whether the pipe can accommodate at least part of a message of length `data_size`.
212    fn is_writable(&self, data_size: usize) -> bool {
213        let available_capacity = self.messages.available_capacity();
214        // POSIX requires that a write smaller than PIPE_BUF be atomic, but requires no
215        // atomicity for writes larger than this.
216        self.had_reader
217            && (available_capacity >= data_size
218                || (available_capacity > 0 && data_size > uapi::PIPE_BUF as usize))
219    }
220
221    pub fn read(&mut self, data: &mut dyn OutputBuffer) -> Result<usize, Errno> {
222        // If there isn't any data to read from the pipe, then the behavior
223        // depends on whether there are any open writers. If there is an
224        // open writer, then we return EAGAIN, to signal that the callers
225        // should wait for the writer to write something into the pipe.
226        // Otherwise, we'll fall through the rest of this function and
227        // return that we have read zero bytes, which will let the caller
228        // know that they're done reading the pipe.
229
230        if !self.is_readable() {
231            return error!(EAGAIN);
232        }
233
234        self.messages.read_stream(data).map(|info| info.bytes_read)
235    }
236
237    pub fn write(
238        &mut self,
239        locked: &mut Locked<FileOpsCore>,
240        current_task: &CurrentTask,
241        data: &mut dyn InputBuffer,
242    ) -> Result<usize, Errno> {
243        if !self.had_reader {
244            return error!(EAGAIN);
245        }
246
247        if self.reader_count == 0 {
248            send_standard_signal(locked, current_task, SignalInfo::kernel(SIGPIPE));
249            return error!(EPIPE);
250        }
251
252        if !self.is_writable(data.available()) {
253            return error!(EAGAIN);
254        }
255
256        self.messages.write_stream(data, None, &mut vec![])
257    }
258
259    fn query_events(&self, flags: OpenFlags) -> FdEvents {
260        let mut events = FdEvents::empty();
261
262        if flags.can_read() && self.is_readable() {
263            let writer_closed = self.writer_count == 0 && self.had_writer;
264            let has_data = !self.is_empty();
265            if writer_closed {
266                events |= FdEvents::POLLHUP;
267            }
268            if !writer_closed || has_data {
269                events |= FdEvents::POLLIN;
270            }
271        }
272
273        if flags.can_write() && self.is_writable(1) {
274            if self.reader_count == 0 && self.had_reader {
275                events |= FdEvents::POLLERR;
276            }
277
278            events |= FdEvents::POLLOUT;
279        }
280
281        events
282    }
283
284    fn fcntl(
285        &mut self,
286        _file: &FileObject,
287        current_task: &CurrentTask,
288        cmd: u32,
289        arg: u64,
290    ) -> Result<SyscallResult, Errno> {
291        match cmd {
292            F_GETPIPE_SZ => Ok(self.capacity().into()),
293            F_SETPIPE_SZ => {
294                self.set_capacity(current_task, arg as usize)?;
295                Ok(self.capacity().into())
296            }
297            _ => default_fcntl(cmd),
298        }
299    }
300
301    fn ioctl(
302        &self,
303        file: &FileObject,
304        locked: &mut Locked<Unlocked>,
305        current_task: &CurrentTask,
306        request: u32,
307        arg: SyscallArg,
308    ) -> Result<SyscallResult, Errno> {
309        let user_addr = UserAddress::from(arg);
310        match request {
311            FIONREAD => {
312                let addr = UserRef::<i32>::new(user_addr);
313                let value: i32 = self.messages.len().try_into().map_err(|_| errno!(EINVAL))?;
314                current_task.write_object(addr, &value)?;
315                Ok(SUCCESS)
316            }
317            _ => default_ioctl(file, locked, current_task, request, arg),
318        }
319    }
320
321    fn notify_fd_events(&self, events: FdEvents) {
322        self.waiters.notify_fd_events(events);
323    }
324
325    /// Splice from the `from` pipe to the `to` pipe.
326    pub fn splice(from: &mut Pipe, to: &mut Pipe, len: usize) -> Result<usize, Errno> {
327        if len == 0 {
328            return Ok(0);
329        }
330        let to_was_empty = to.is_empty();
331        let mut bytes_transferred = 0;
332        loop {
333            let limit = std::cmp::min(len - bytes_transferred, to.messages.available_capacity());
334            if limit == 0 {
335                // We no longer want to transfer any bytes.
336                break;
337            }
338            let Some(mut message) = from.messages.read_message() else {
339                // The `from` pipe is empty.
340                break;
341            };
342            if let Some(data) = MessageData::split_off(&mut message.data, limit) {
343                // Some data is left in the message. Push it back.
344                assert!(data.len() > 0);
345                from.messages.write_front(data.into());
346            }
347            bytes_transferred += message.len();
348            to.messages.write_message(message);
349        }
350        if bytes_transferred > 0 {
351            if from.is_empty() {
352                from.notify_fd_events(FdEvents::POLLOUT);
353            }
354            if to_was_empty {
355                to.notify_fd_events(FdEvents::POLLIN);
356            }
357        }
358        return Ok(bytes_transferred);
359    }
360
361    /// Tee from the `from` pipe to the `to` pipe.
362    pub fn tee(from: &mut Pipe, to: &mut Pipe, len: usize) -> Result<usize, Errno> {
363        if len == 0 {
364            return Ok(0);
365        }
366        let to_was_empty = to.is_empty();
367        let mut bytes_transferred = 0;
368        for message in from.messages.peek_queue().iter() {
369            let limit = std::cmp::min(len - bytes_transferred, to.messages.available_capacity());
370            if limit == 0 {
371                break;
372            }
373            let message = message.clone_at_most(limit);
374            bytes_transferred += message.len();
375            to.messages.write_message(message);
376        }
377        if bytes_transferred > 0 && to_was_empty {
378            to.notify_fd_events(FdEvents::POLLIN);
379        }
380        return Ok(bytes_transferred);
381    }
382}
383
384/// Creates a new pipe between the two returned FileObjects.
385///
386/// The first FileObject is the read endpoint of the pipe. The second is the
387/// write endpoint of the pipe. This order matches the order expected by
388/// sys_pipe2().
389pub fn new_pipe(
390    locked: &mut Locked<Unlocked>,
391    current_task: &CurrentTask,
392) -> Result<(FileHandle, FileHandle), Errno> {
393    let fs = current_task
394        .kernel()
395        .expando
396        .get::<FsRegistry>()
397        .create(locked, current_task, "pipefs".into(), FileSystemOptions::default())
398        .ok_or_else(|| errno!(EINVAL))??;
399    let mut info = FsNodeInfo::new(mode!(IFIFO, 0o600), current_task.current_fscred());
400    info.blksize = ATOMIC_IO_BYTES.into();
401    let node = fs.create_node_and_allocate_node_id(SpecialNode, info);
402    let pipe = node.fifo(current_task);
403    {
404        let mut state = pipe.lock();
405        state.add_reader();
406        state.add_writer();
407    }
408
409    // Creating the readable `FileObject` takes care of initializing the `node` security label.
410    let read_ops = PipeFileObject { pipe: Arc::clone(pipe) };
411    let read_file = FileObject::new_anonymous(
412        locked,
413        current_task,
414        Box::new(read_ops),
415        Arc::clone(&node),
416        OpenFlags::RDONLY,
417    );
418
419    // Create the writable `FileObject` using the readable object's `name` to reduce overhead.
420    let write_ops = PipeFileObject { pipe: Arc::clone(pipe) };
421    let write_file = FileObject::new(
422        locked,
423        current_task,
424        Box::new(write_ops),
425        read_file.name.to_passive(),
426        OpenFlags::WRONLY,
427    )?;
428
429    Ok((read_file, write_file))
430}
431
432struct PipeFs;
433impl FileSystemOps for PipeFs {
434    fn statfs(
435        &self,
436        _locked: &mut Locked<FileOpsCore>,
437        _fs: &FileSystem,
438        _current_task: &CurrentTask,
439    ) -> Result<statfs, Errno> {
440        Ok(default_statfs(PIPEFS_MAGIC))
441    }
442    fn name(&self) -> &'static FsStr {
443        "pipefs".into()
444    }
445}
446
447fn pipe_fs(
448    locked: &mut Locked<Unlocked>,
449    current_task: &CurrentTask,
450    _options: FileSystemOptions,
451) -> Result<FileSystemHandle, Errno> {
452    struct PipeFsHandle(FileSystemHandle);
453
454    let kernel = current_task.kernel();
455    Ok(kernel
456        .expando
457        .get_or_init(|| {
458            PipeFsHandle(
459                FileSystem::new(
460                    locked,
461                    kernel,
462                    CacheMode::Uncached,
463                    PipeFs,
464                    FileSystemOptions::default(),
465                )
466                .expect("pipefs constructed with valid options"),
467            )
468        })
469        .0
470        .clone())
471}
472
473pub fn register_pipe_fs(fs_registry: &FsRegistry) {
474    fs_registry.register("pipefs".into(), pipe_fs);
475}
476
477pub struct PipeFileObject {
478    pipe: Arc<Mutex<Pipe>>,
479}
480
481impl FileOps for PipeFileObject {
482    fileops_impl_nonseekable!();
483    fileops_impl_noop_sync!();
484
485    fn close(
486        self: Box<Self>,
487        _locked: &mut Locked<FileOpsCore>,
488        file: &FileObjectState,
489        _current_task: &CurrentTask,
490    ) {
491        self.on_close(file.flags());
492    }
493
494    fn read(
495        &self,
496        locked: &mut Locked<FileOpsCore>,
497        file: &FileObject,
498        current_task: &CurrentTask,
499        offset: usize,
500        data: &mut dyn OutputBuffer,
501    ) -> Result<usize, Errno> {
502        debug_assert!(offset == 0);
503        file.blocking_op(locked, current_task, FdEvents::POLLIN | FdEvents::POLLHUP, None, |_| {
504            let mut pipe = self.pipe.lock();
505            let actual = pipe.read(data)?;
506            if actual > 0 && pipe.is_empty() {
507                pipe.notify_fd_events(FdEvents::POLLOUT);
508            }
509            Ok(actual)
510        })
511    }
512
513    fn write(
514        &self,
515        locked: &mut Locked<FileOpsCore>,
516        file: &FileObject,
517        current_task: &CurrentTask,
518        offset: usize,
519        data: &mut dyn InputBuffer,
520    ) -> Result<usize, Errno> {
521        debug_assert!(offset == 0);
522        debug_assert!(data.bytes_read() == 0);
523
524        let result = file.blocking_op(locked, current_task, FdEvents::POLLOUT, None, |locked| {
525            let mut pipe = self.pipe.lock();
526            let was_empty = pipe.is_empty();
527            let offset_before = data.bytes_read();
528            let bytes_written = pipe.write(locked, current_task, data)?;
529            debug_assert!(data.bytes_read() - offset_before == bytes_written);
530            if bytes_written > 0 && was_empty {
531                pipe.notify_fd_events(FdEvents::POLLIN);
532            }
533            if data.available() > 0 {
534                return error!(EAGAIN);
535            }
536            Ok(())
537        });
538
539        let bytes_written = data.bytes_read();
540        if bytes_written == 0 {
541            // We can only return an error if no data was actually sent. If partial data was
542            // sent, swallow the error and return how much was sent.
543            result?;
544        }
545        Ok(bytes_written)
546    }
547
548    fn wait_async(
549        &self,
550        _locked: &mut Locked<FileOpsCore>,
551        file: &FileObject,
552        _current_task: &CurrentTask,
553        waiter: &Waiter,
554        mut events: FdEvents,
555        handler: EventHandler,
556    ) -> Option<WaitCanceler> {
557        let flags = file.flags();
558        if !flags.can_read() {
559            events.remove(FdEvents::POLLIN);
560        }
561        if !flags.can_write() {
562            events.remove(FdEvents::POLLOUT);
563        }
564        Some(self.pipe.lock().waiters.wait_async_fd_events(waiter, events, handler))
565    }
566
567    fn query_events(
568        &self,
569        _locked: &mut Locked<FileOpsCore>,
570        file: &FileObject,
571        _current_task: &CurrentTask,
572    ) -> Result<FdEvents, Errno> {
573        Ok(self.pipe.lock().query_events(file.flags()))
574    }
575
576    fn fcntl(
577        &self,
578        file: &FileObject,
579        current_task: &CurrentTask,
580        cmd: u32,
581        arg: u64,
582    ) -> Result<SyscallResult, Errno> {
583        self.pipe.lock().fcntl(file, current_task, cmd, arg)
584    }
585
586    fn ioctl(
587        &self,
588        locked: &mut Locked<Unlocked>,
589        file: &FileObject,
590        current_task: &CurrentTask,
591        request: u32,
592        arg: SyscallArg,
593    ) -> Result<SyscallResult, Errno> {
594        self.pipe.lock().ioctl(file, locked, current_task, request, arg)
595    }
596}
597
598/// An OutputBuffer that will write the data to `pipe`.
599#[derive(Debug)]
600struct SpliceOutputBuffer<'a> {
601    pipe: &'a mut Pipe,
602    len: usize,
603    available: usize,
604}
605
606impl<'a> Buffer for SpliceOutputBuffer<'a> {
607    fn segments_count(&self) -> Result<usize, Errno> {
608        error!(ENOTSUP)
609    }
610
611    fn peek_each_segment(
612        &mut self,
613        _callback: &mut PeekBufferSegmentsCallback<'_>,
614    ) -> Result<(), Errno> {
615        error!(ENOTSUP)
616    }
617}
618
619impl<'a> OutputBuffer for SpliceOutputBuffer<'a> {
620    fn write_each(&mut self, callback: &mut OutputBufferCallback<'_>) -> Result<usize, Errno> {
621        // SAFETY: `callback` returns the number of bytes read on success.
622        let bytes = unsafe {
623            read_to_vec::<u8, _>(self.available, |buf| callback(buf).map(NumberOfElementsRead))
624        }?;
625        let bytes_len = bytes.len();
626        if bytes_len > 0 {
627            let was_empty = self.pipe.is_empty();
628            self.pipe.messages.write_message(PipeMessageData::from(bytes).into());
629            if was_empty {
630                self.pipe.notify_fd_events(FdEvents::POLLIN);
631            }
632            self.available -= bytes_len;
633        }
634        Ok(bytes_len)
635    }
636
637    fn available(&self) -> usize {
638        self.available
639    }
640
641    fn bytes_written(&self) -> usize {
642        self.len - self.available
643    }
644
645    fn zero(&mut self) -> Result<usize, Errno> {
646        let bytes = vec![0; self.available];
647        let len = bytes.len();
648        if len > 0 {
649            let was_empty = self.pipe.is_empty();
650            self.pipe.messages.write_message(PipeMessageData::from(bytes).into());
651            if was_empty {
652                self.pipe.notify_fd_events(FdEvents::POLLIN);
653            }
654            self.available -= len;
655        }
656        Ok(len)
657    }
658
659    unsafe fn advance(&mut self, _length: usize) -> Result<(), Errno> {
660        error!(ENOTSUP)
661    }
662}
663
664/// An InputBuffer that will read the data from `pipe`.
665#[derive(Debug)]
666struct SpliceInputBuffer<'a> {
667    pipe: &'a mut Pipe,
668    len: usize,
669    available: usize,
670}
671
672impl<'a> Buffer for SpliceInputBuffer<'a> {
673    fn segments_count(&self) -> Result<usize, Errno> {
674        Ok(self.pipe.messages.len())
675    }
676
677    fn peek_each_segment(
678        &mut self,
679        callback: &mut PeekBufferSegmentsCallback<'_>,
680    ) -> Result<(), Errno> {
681        let mut available = self.available;
682        for message in self.pipe.messages.messages() {
683            let to_read = std::cmp::min(available, message.len());
684            callback(&UserBuffer {
685                address: UserAddress::from(message.data.ptr()? as u64),
686                length: to_read,
687            });
688            available -= to_read;
689        }
690        Ok(())
691    }
692}
693
694impl<'a> InputBuffer for SpliceInputBuffer<'a> {
695    fn peek_each(&mut self, callback: &mut InputBufferCallback<'_>) -> Result<usize, Errno> {
696        let mut read = 0;
697        let mut available = self.available;
698        for message in self.pipe.messages.messages() {
699            let to_read = std::cmp::min(available, message.len());
700            let result = message.data.with_bytes(|bytes| callback(&bytes[0..to_read]))?;
701            if result > to_read {
702                return error!(EINVAL);
703            }
704            read += result;
705            available -= result;
706            if result != to_read {
707                break;
708            }
709        }
710        Ok(read)
711    }
712
713    fn available(&self) -> usize {
714        self.available
715    }
716
717    fn bytes_read(&self) -> usize {
718        self.len - self.available
719    }
720
721    fn drain(&mut self) -> usize {
722        let result = self.available;
723        self.available = 0;
724        result
725    }
726
727    fn advance(&mut self, mut length: usize) -> Result<(), Errno> {
728        if length == 0 {
729            return Ok(());
730        }
731        if length > self.available {
732            return error!(EINVAL);
733        }
734        self.available -= length;
735        while let Some(mut message) = self.pipe.messages.read_message() {
736            if let Some(data) = MessageData::split_off(&mut message.data, length) {
737                // Some data is left in the message. Push it back.
738                self.pipe.messages.write_front(data.into());
739            }
740            length -= message.len();
741            if length == 0 {
742                if self.pipe.is_empty() {
743                    self.pipe.notify_fd_events(FdEvents::POLLOUT);
744                }
745                return Ok(());
746            }
747        }
748        panic!();
749    }
750}
751
752impl PipeFileObject {
753    /// Called whenever a fd to a pipe is closed.
754    fn on_close(&self, flags: OpenFlags) {
755        let mut events = FdEvents::empty();
756        let mut pipe = self.pipe.lock();
757        if flags.can_read() {
758            assert!(pipe.reader_count > 0);
759            pipe.reader_count -= 1;
760            if pipe.reader_count == 0 {
761                events |= FdEvents::POLLOUT | FdEvents::POLLERR;
762            }
763        }
764        if flags.can_write() {
765            assert!(pipe.writer_count > 0);
766            pipe.writer_count -= 1;
767            if pipe.writer_count == 0 {
768                if pipe.reader_count > 0 {
769                    events |= FdEvents::POLLHUP;
770                }
771                if !pipe.is_empty() {
772                    events |= FdEvents::POLLIN;
773                }
774            }
775        }
776        if events != FdEvents::empty() {
777            pipe.waiters.notify_fd_events(events);
778        }
779        pipe.on_close();
780    }
781
782    /// Returns the result of `pregen` and a lock on pipe, once `condition` returns true, ensuring
783    /// `pregen` is run before the pipe is locked.
784    ///
785    /// This will wait on `events` if the file is opened in blocking mode. If the file is opened in
786    /// not blocking mode and `condition` is not realized, this will return EAGAIN.
787    fn wait_for_condition<'a, L, F, G, V>(
788        &'a self,
789        locked: &mut Locked<L>,
790        current_task: &CurrentTask,
791        file: &FileHandle,
792        condition: F,
793        pregen: G,
794        events: FdEvents,
795    ) -> Result<(V, MutexGuard<'a, Pipe>), Errno>
796    where
797        L: LockEqualOrBefore<FileOpsCore>,
798        F: Fn(&Pipe) -> bool,
799        G: Fn(&mut Locked<L>) -> Result<V, Errno>,
800    {
801        file.blocking_op(locked, current_task, events, None, |locked| {
802            let other = pregen(locked)?;
803            let pipe = self.pipe.lock();
804            if condition(&pipe) { Ok((other, pipe)) } else { error!(EAGAIN) }
805        })
806    }
807
808    /// Lock the pipe for reading, after having run `pregen`.
809    fn lock_pipe_for_reading_with<'a, L, G, V>(
810        &'a self,
811        locked: &mut Locked<L>,
812        current_task: &CurrentTask,
813        file: &FileHandle,
814        pregen: G,
815        non_blocking: bool,
816    ) -> Result<(V, MutexGuard<'a, Pipe>), Errno>
817    where
818        L: LockEqualOrBefore<FileOpsCore>,
819        G: Fn(&mut Locked<L>) -> Result<V, Errno>,
820    {
821        if non_blocking {
822            let other = pregen(locked)?;
823            let pipe = self.pipe.lock();
824            if !pipe.is_readable() {
825                return error!(EAGAIN);
826            }
827            Ok((other, pipe))
828        } else {
829            self.wait_for_condition(
830                locked,
831                current_task,
832                file,
833                |pipe| pipe.is_readable(),
834                pregen,
835                FdEvents::POLLIN | FdEvents::POLLHUP,
836            )
837        }
838    }
839
840    fn lock_pipe_for_reading<'a, L>(
841        &'a self,
842        locked: &mut Locked<L>,
843        current_task: &CurrentTask,
844        file: &FileHandle,
845        non_blocking: bool,
846    ) -> Result<MutexGuard<'a, Pipe>, Errno>
847    where
848        L: LockEqualOrBefore<FileOpsCore>,
849    {
850        self.lock_pipe_for_reading_with(locked, current_task, file, |_| Ok(()), non_blocking)
851            .map(|(_, l)| l)
852    }
853
854    /// Lock the pipe for writing, after having run `pregen`.
855    fn lock_pipe_for_writing_with<'a, L, G, V>(
856        &'a self,
857        locked: &mut Locked<L>,
858        current_task: &CurrentTask,
859        file: &FileHandle,
860        pregen: G,
861        non_blocking: bool,
862        len: usize,
863    ) -> Result<(V, MutexGuard<'a, Pipe>), Errno>
864    where
865        L: LockEqualOrBefore<FileOpsCore>,
866        G: Fn(&mut Locked<L>) -> Result<V, Errno>,
867    {
868        if non_blocking {
869            let other = pregen(locked)?;
870            let pipe = self.pipe.lock();
871            if !pipe.is_writable(len) {
872                return error!(EAGAIN);
873            }
874            Ok((other, pipe))
875        } else {
876            self.wait_for_condition(
877                locked,
878                current_task,
879                file,
880                |pipe| pipe.is_writable(len),
881                pregen,
882                FdEvents::POLLOUT,
883            )
884        }
885    }
886
887    fn lock_pipe_for_writing<'a, L>(
888        &'a self,
889        locked: &mut Locked<L>,
890        current_task: &CurrentTask,
891        file: &FileHandle,
892        non_blocking: bool,
893        len: usize,
894    ) -> Result<MutexGuard<'a, Pipe>, Errno>
895    where
896        L: LockEqualOrBefore<FileOpsCore>,
897    {
898        self.lock_pipe_for_writing_with(locked, current_task, file, |_| Ok(()), non_blocking, len)
899            .map(|(_, l)| l)
900    }
901
902    /// Splice from the given file handle to this pipe.
903    ///
904    /// The given file handle must not be a pipe. If you wish to splice between two pipes, use
905    /// `lock_pipes` and `Pipe::splice`.
906    pub fn splice_from<L>(
907        &self,
908        locked: &mut Locked<L>,
909        current_task: &CurrentTask,
910        self_file: &FileHandle,
911        from: &FileHandle,
912        maybe_offset: Option<usize>,
913        len: usize,
914        non_blocking: bool,
915    ) -> Result<usize, Errno>
916    where
917        L: LockEqualOrBefore<FileOpsCore>,
918    {
919        // If both ends are pipes, use `lock_pipes` and `Pipe::splice`.
920        assert!(from.downcast_file::<PipeFileObject>().is_none());
921
922        let mut pipe =
923            self.lock_pipe_for_writing(locked, current_task, self_file, non_blocking, len)?;
924        let len = std::cmp::min(len, pipe.messages.available_capacity());
925        let mut buffer = SpliceOutputBuffer { pipe: &mut pipe, len, available: len };
926        if let Some(offset) = maybe_offset {
927            from.read_at(locked, current_task, offset, &mut buffer)
928        } else {
929            from.read(locked, current_task, &mut buffer)
930        }
931    }
932
933    /// Splice from this pipe to the given file handle.
934    ///
935    /// The given file handle must not be a pipe. If you wish to splice between two pipes, use
936    /// `lock_pipes` and `Pipe::splice`.
937    pub fn splice_to<L>(
938        &self,
939        locked: &mut Locked<L>,
940        current_task: &CurrentTask,
941        self_file: &FileHandle,
942        to: &FileHandle,
943        maybe_offset: Option<usize>,
944        len: usize,
945        non_blocking: bool,
946    ) -> Result<usize, Errno>
947    where
948        L: LockEqualOrBefore<FileOpsCore>,
949    {
950        // If both ends are pipes, use `lock_pipes` and `Pipe::splice`.
951        assert!(to.downcast_file::<PipeFileObject>().is_none());
952
953        let mut pipe = self.lock_pipe_for_reading(locked, current_task, self_file, non_blocking)?;
954        let len = std::cmp::min(len, pipe.messages.len());
955        let mut buffer = SpliceInputBuffer { pipe: &mut pipe, len, available: len };
956        if let Some(offset) = maybe_offset {
957            to.write_at(locked, current_task, offset, &mut buffer)
958        } else {
959            to.write(locked, current_task, &mut buffer)
960        }
961    }
962
963    /// Share the mappings backing the given input buffer into the pipe.
964    ///
965    /// Returns the number of bytes enqueued.
966    pub fn vmsplice_from<L>(
967        &self,
968        locked: &mut Locked<L>,
969        current_task: &CurrentTask,
970        self_file: &FileHandle,
971        mut iovec: UserBuffers,
972        non_blocking: bool,
973    ) -> Result<usize, Errno>
974    where
975        L: LockEqualOrBefore<FileOpsCore>,
976    {
977        let locked = locked.cast_locked::<FileOpsCore>();
978        let locked = locked;
979        let available = UserBuffer::cap_buffers_to_max_rw_count(
980            current_task.maximum_valid_address().ok_or_else(|| errno!(EINVAL))?,
981            &mut iovec,
982        )?;
983        let mappings = current_task.mm()?.get_mappings_for_vmsplice(&iovec)?;
984
985        let mut pipe =
986            self.lock_pipe_for_writing(locked, current_task, self_file, non_blocking, available)?;
987
988        if pipe.reader_count == 0 {
989            send_standard_signal(locked, current_task, SignalInfo::kernel(SIGPIPE));
990            return error!(EPIPE);
991        }
992
993        let was_empty = pipe.is_empty();
994        let mut remaining = std::cmp::min(available, pipe.messages.available_capacity());
995
996        let mut bytes_transferred = 0;
997        for mut mapping in mappings.into_iter() {
998            mapping.truncate(remaining);
999            let actual = mapping.len();
1000
1001            pipe.messages.write_message(PipeMessageData::Vmspliced(mapping).into());
1002            remaining -= actual;
1003            bytes_transferred += actual;
1004
1005            if remaining == 0 {
1006                break;
1007            }
1008        }
1009        if bytes_transferred > 0 && was_empty {
1010            pipe.notify_fd_events(FdEvents::POLLIN);
1011        }
1012        Ok(bytes_transferred)
1013    }
1014
1015    /// Copy data from the pipe to the given output buffer.
1016    ///
1017    /// Returns the number of bytes transferred.
1018    pub fn vmsplice_to<L>(
1019        &self,
1020        locked: &mut Locked<L>,
1021        current_task: &CurrentTask,
1022        self_file: &FileHandle,
1023        iovec: UserBuffers,
1024        non_blocking: bool,
1025    ) -> Result<usize, Errno>
1026    where
1027        L: LockEqualOrBefore<FileOpsCore>,
1028    {
1029        let mut pipe = self.lock_pipe_for_reading(locked, current_task, self_file, non_blocking)?;
1030
1031        let mut data = UserBuffersOutputBuffer::unified_new(current_task, iovec)?;
1032        let len = std::cmp::min(data.available(), pipe.messages.len());
1033        let mut buffer = SpliceInputBuffer { pipe: &mut pipe, len, available: len };
1034        data.write_buffer(&mut buffer)
1035    }
1036
1037    /// Obtain the pipe objects from the given file handles, if they are both pipes.
1038    ///
1039    /// Returns EINVAL if one (or both) of the given file handles is not a pipe.
1040    ///
1041    /// Obtains the locks on the pipes in the correct order to avoid deadlocks.
1042    pub fn lock_pipes<'a, 'b, L>(
1043        locked: &mut Locked<L>,
1044        current_task: &CurrentTask,
1045        file_in: &'a FileHandle,
1046        file_out: &'b FileHandle,
1047        len: usize,
1048        non_blocking: bool,
1049    ) -> Result<PipeOperands<'a, 'b>, Errno>
1050    where
1051        L: LockEqualOrBefore<FileOpsCore>,
1052    {
1053        let pipe_in = file_in.downcast_file::<PipeFileObject>().ok_or_else(|| errno!(EINVAL))?;
1054        let pipe_out = file_out.downcast_file::<PipeFileObject>().ok_or_else(|| errno!(EINVAL))?;
1055
1056        let node_cmp =
1057            Arc::as_ptr(&file_in.name.entry.node).cmp(&Arc::as_ptr(&file_out.name.entry.node));
1058
1059        match node_cmp {
1060            Ordering::Equal => error!(EINVAL),
1061            Ordering::Less => {
1062                let (write, read) = pipe_in.lock_pipe_for_reading_with(
1063                    locked,
1064                    current_task,
1065                    file_in,
1066                    |locked| {
1067                        pipe_out.lock_pipe_for_writing(
1068                            locked,
1069                            current_task,
1070                            file_out,
1071                            non_blocking,
1072                            len,
1073                        )
1074                    },
1075                    non_blocking,
1076                )?;
1077                Ok(PipeOperands { read, write })
1078            }
1079            Ordering::Greater => {
1080                let (read, write) = pipe_out.lock_pipe_for_writing_with(
1081                    locked,
1082                    current_task,
1083                    file_out,
1084                    |locked| {
1085                        pipe_in.lock_pipe_for_reading(locked, current_task, file_in, non_blocking)
1086                    },
1087                    non_blocking,
1088                    len,
1089                )?;
1090                Ok(PipeOperands { read, write })
1091            }
1092        }
1093    }
1094}
1095
1096pub struct PipeOperands<'a, 'b> {
1097    pub read: MutexGuard<'a, Pipe>,
1098    pub write: MutexGuard<'b, Pipe>,
1099}