starnix_core/vfs/
pipe.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::mm::{
6    MemoryAccessorExt, NumberOfElementsRead, PAGE_SIZE, TaskMemoryAccessor, read_to_vec,
7};
8use crate::security;
9use crate::signals::{SignalInfo, send_standard_signal};
10use crate::task::{CurrentTask, EventHandler, WaitCallback, WaitCanceler, WaitQueue, Waiter};
11use crate::vfs::buffers::{
12    Buffer, InputBuffer, InputBufferCallback, MessageData, MessageQueue, OutputBuffer,
13    OutputBufferCallback, PeekBufferSegmentsCallback, PipeMessageData, UserBuffersOutputBuffer,
14};
15use crate::vfs::fs_registry::FsRegistry;
16use crate::vfs::{
17    CacheMode, FileHandle, FileObject, FileObjectState, FileOps, FileSystem, FileSystemHandle,
18    FileSystemOps, FileSystemOptions, FsNodeInfo, FsStr, SpecialNode, default_fcntl, default_ioctl,
19    fileops_impl_nonseekable, fileops_impl_noop_sync,
20};
21use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex, MutexGuard, Unlocked};
22use starnix_syscalls::{SUCCESS, SyscallArg, SyscallResult};
23use starnix_types::user_buffer::{UserBuffer, UserBuffers};
24use starnix_types::vfs::default_statfs;
25use starnix_uapi::auth::CAP_SYS_RESOURCE;
26use starnix_uapi::errors::Errno;
27use starnix_uapi::file_mode::mode;
28use starnix_uapi::open_flags::OpenFlags;
29use starnix_uapi::signals::SIGPIPE;
30use starnix_uapi::user_address::{UserAddress, UserRef};
31use starnix_uapi::vfs::FdEvents;
32use starnix_uapi::{
33    F_GETPIPE_SZ, F_SETPIPE_SZ, FIONREAD, PIPEFS_MAGIC, errno, error, statfs, uapi,
34};
35use std::cmp::Ordering;
36use std::sync::Arc;
37
38const ATOMIC_IO_BYTES: u16 = 4096;
39
40/// The maximum size of a pipe, independent of task capabilities and sysctl limits.
41const PIPE_MAX_SIZE: usize = 1 << 31;
42
43fn round_up(value: usize, increment: usize) -> usize {
44    (value + (increment - 1)) & !(increment - 1)
45}
46
47#[derive(Debug)]
48pub struct Pipe {
49    messages: MessageQueue<PipeMessageData>,
50
51    waiters: WaitQueue,
52
53    /// The number of open readers.
54    reader_count: usize,
55
56    /// Whether the pipe has ever had a reader.
57    had_reader: bool,
58
59    /// The number of open writers.
60    writer_count: usize,
61
62    /// Whether the pipe has ever had a writer.
63    had_writer: bool,
64}
65
66pub type PipeHandle = Arc<Mutex<Pipe>>;
67
68impl Pipe {
69    pub fn new(default_pipe_capacity: usize) -> PipeHandle {
70        Arc::new(Mutex::new(Pipe {
71            messages: MessageQueue::new(default_pipe_capacity),
72            waiters: WaitQueue::default(),
73            reader_count: 0,
74            had_reader: false,
75            writer_count: 0,
76            had_writer: false,
77        }))
78    }
79
80    pub fn open(
81        locked: &mut Locked<Unlocked>,
82        current_task: &CurrentTask,
83        pipe: &Arc<Mutex<Self>>,
84        flags: OpenFlags,
85    ) -> Result<Box<dyn FileOps>, Errno> {
86        let mut events = FdEvents::empty();
87        let mut pipe_locked = pipe.lock();
88        let mut must_wait_events = FdEvents::empty();
89        if flags.can_read() {
90            if !pipe_locked.had_reader {
91                events |= FdEvents::POLLOUT;
92            }
93            pipe_locked.add_reader();
94            if !flags.contains(OpenFlags::NONBLOCK) && !flags.can_write() && !pipe_locked.had_writer
95            {
96                must_wait_events |= FdEvents::POLLIN;
97            }
98        }
99        if flags.can_write() {
100            // https://man7.org/linux/man-pages/man2/open.2.html says:
101            //
102            //  ENXIO  O_NONBLOCK | O_WRONLY is set, the named file is a FIFO,
103            //         and no process has the FIFO open for reading.
104            if flags.contains(OpenFlags::NONBLOCK) && pipe_locked.reader_count == 0 {
105                assert!(!flags.can_read()); // Otherwise we would have called add_reader() above.
106                return error!(ENXIO);
107            }
108            if !pipe_locked.had_writer {
109                events |= FdEvents::POLLIN;
110            }
111            pipe_locked.add_writer();
112            if !flags.contains(OpenFlags::NONBLOCK) && !pipe_locked.had_reader {
113                must_wait_events |= FdEvents::POLLOUT;
114            }
115        }
116        if events != FdEvents::empty() {
117            pipe_locked.waiters.notify_fd_events(events);
118        }
119        let ops = PipeFileObject { pipe: Arc::clone(pipe) };
120        if must_wait_events == FdEvents::empty() {
121            return Ok(Box::new(ops));
122        }
123
124        // Ensures that the new PipeFileObject is closed if is it dropped before being returned.
125        let ops = scopeguard::guard(ops, |ops| {
126            ops.on_close(flags);
127        });
128
129        // Wait for the pipe to be connected.
130        let waiter = Waiter::new();
131        loop {
132            pipe_locked.waiters.wait_async_fd_events(
133                &waiter,
134                must_wait_events,
135                WaitCallback::none(),
136            );
137            std::mem::drop(pipe_locked);
138            match waiter.wait(locked, current_task) {
139                Err(e) => {
140                    return Err(e);
141                }
142                _ => {}
143            }
144            pipe_locked = pipe.lock();
145            if pipe_locked.had_writer && pipe_locked.had_reader {
146                return Ok(Box::new(scopeguard::ScopeGuard::into_inner(ops)));
147            }
148        }
149    }
150
151    /// Increments the reader count for this pipe by 1.
152    pub fn add_reader(&mut self) {
153        self.reader_count += 1;
154        self.had_reader = true;
155    }
156
157    /// Increments the writer count for this pipe by 1.
158    pub fn add_writer(&mut self) {
159        self.writer_count += 1;
160        self.had_writer = true;
161    }
162
163    /// Called whenever a fd to the pipe is closed. Reset the pipe state if there is not more
164    /// reader or writer.
165    pub fn on_close(&mut self) {
166        if self.reader_count == 0 && self.writer_count == 0 {
167            self.had_reader = false;
168            self.had_writer = false;
169            self.messages = MessageQueue::new(self.messages.capacity());
170            self.waiters = WaitQueue::default();
171        }
172    }
173
174    fn is_empty(&self) -> bool {
175        self.messages.is_empty()
176    }
177
178    fn capacity(&self) -> usize {
179        self.messages.capacity()
180    }
181
182    fn set_capacity(
183        &mut self,
184        current_task: &CurrentTask,
185        mut requested_capacity: usize,
186    ) -> Result<(), Errno> {
187        if requested_capacity > PIPE_MAX_SIZE {
188            return error!(EINVAL);
189        }
190        if requested_capacity
191            > current_task
192                .kernel()
193                .system_limits
194                .pipe_max_size
195                .load(std::sync::atomic::Ordering::Relaxed)
196        {
197            security::check_task_capable(current_task, CAP_SYS_RESOURCE)?;
198        }
199        let page_size = *PAGE_SIZE as usize;
200        if requested_capacity < page_size {
201            requested_capacity = page_size;
202        }
203        requested_capacity = round_up(requested_capacity, page_size);
204        self.messages.set_capacity(requested_capacity)
205    }
206
207    fn is_readable(&self) -> bool {
208        !self.is_empty() || (self.writer_count == 0 && self.had_writer)
209    }
210
211    /// Returns whether the pipe can accommodate at least part of a message of length `data_size`.
212    fn is_writable(&self, data_size: usize) -> bool {
213        let available_capacity = self.messages.available_capacity();
214        // POSIX requires that a write smaller than PIPE_BUF be atomic, but requires no
215        // atomicity for writes larger than this.
216        self.had_reader
217            && (available_capacity >= data_size
218                || (available_capacity > 0 && data_size > uapi::PIPE_BUF as usize))
219    }
220
221    pub fn read(&mut self, data: &mut dyn OutputBuffer) -> Result<usize, Errno> {
222        // If there isn't any data to read from the pipe, then the behavior
223        // depends on whether there are any open writers. If there is an
224        // open writer, then we return EAGAIN, to signal that the callers
225        // should wait for the writer to write something into the pipe.
226        // Otherwise, we'll fall through the rest of this function and
227        // return that we have read zero bytes, which will let the caller
228        // know that they're done reading the pipe.
229
230        if !self.is_readable() {
231            return error!(EAGAIN);
232        }
233
234        self.messages.read_stream(data).map(|info| info.bytes_read)
235    }
236
237    pub fn write(
238        &mut self,
239        locked: &mut Locked<FileOpsCore>,
240        current_task: &CurrentTask,
241        data: &mut dyn InputBuffer,
242    ) -> Result<usize, Errno> {
243        if !self.had_reader {
244            return error!(EAGAIN);
245        }
246
247        if self.reader_count == 0 {
248            send_standard_signal(locked, current_task, SignalInfo::default(SIGPIPE));
249            return error!(EPIPE);
250        }
251
252        if !self.is_writable(data.available()) {
253            return error!(EAGAIN);
254        }
255
256        self.messages.write_stream(data, None, &mut vec![])
257    }
258
259    fn query_events(&self, flags: OpenFlags) -> FdEvents {
260        let mut events = FdEvents::empty();
261
262        if flags.can_read() && self.is_readable() {
263            let writer_closed = self.writer_count == 0 && self.had_writer;
264            let has_data = !self.is_empty();
265            if writer_closed {
266                events |= FdEvents::POLLHUP;
267            }
268            if !writer_closed || has_data {
269                events |= FdEvents::POLLIN;
270            }
271        }
272
273        if flags.can_write() && self.is_writable(1) {
274            if self.reader_count == 0 && self.had_reader {
275                events |= FdEvents::POLLERR;
276            }
277
278            events |= FdEvents::POLLOUT;
279        }
280
281        events
282    }
283
284    fn fcntl(
285        &mut self,
286        _file: &FileObject,
287        current_task: &CurrentTask,
288        cmd: u32,
289        arg: u64,
290    ) -> Result<SyscallResult, Errno> {
291        match cmd {
292            F_GETPIPE_SZ => Ok(self.capacity().into()),
293            F_SETPIPE_SZ => {
294                self.set_capacity(current_task, arg as usize)?;
295                Ok(self.capacity().into())
296            }
297            _ => default_fcntl(cmd),
298        }
299    }
300
301    fn ioctl(
302        &self,
303        file: &FileObject,
304        locked: &mut Locked<Unlocked>,
305        current_task: &CurrentTask,
306        request: u32,
307        arg: SyscallArg,
308    ) -> Result<SyscallResult, Errno> {
309        let user_addr = UserAddress::from(arg);
310        match request {
311            FIONREAD => {
312                let addr = UserRef::<i32>::new(user_addr);
313                let value: i32 = self.messages.len().try_into().map_err(|_| errno!(EINVAL))?;
314                current_task.write_object(addr, &value)?;
315                Ok(SUCCESS)
316            }
317            _ => default_ioctl(file, locked, current_task, request, arg),
318        }
319    }
320
321    fn notify_fd_events(&self, events: FdEvents) {
322        self.waiters.notify_fd_events(events);
323    }
324
325    /// Splice from the `from` pipe to the `to` pipe.
326    pub fn splice(from: &mut Pipe, to: &mut Pipe, len: usize) -> Result<usize, Errno> {
327        if len == 0 {
328            return Ok(0);
329        }
330        let to_was_empty = to.is_empty();
331        let mut bytes_transferred = 0;
332        loop {
333            let limit = std::cmp::min(len - bytes_transferred, to.messages.available_capacity());
334            if limit == 0 {
335                // We no longer want to transfer any bytes.
336                break;
337            }
338            let Some(mut message) = from.messages.read_message() else {
339                // The `from` pipe is empty.
340                break;
341            };
342            if let Some(data) = MessageData::split_off(&mut message.data, limit) {
343                // Some data is left in the message. Push it back.
344                assert!(data.len() > 0);
345                from.messages.write_front(data.into());
346            }
347            bytes_transferred += message.len();
348            to.messages.write_message(message);
349        }
350        if bytes_transferred > 0 {
351            if from.is_empty() {
352                from.notify_fd_events(FdEvents::POLLOUT);
353            }
354            if to_was_empty {
355                to.notify_fd_events(FdEvents::POLLIN);
356            }
357        }
358        return Ok(bytes_transferred);
359    }
360
361    /// Tee from the `from` pipe to the `to` pipe.
362    pub fn tee(from: &mut Pipe, to: &mut Pipe, len: usize) -> Result<usize, Errno> {
363        if len == 0 {
364            return Ok(0);
365        }
366        let to_was_empty = to.is_empty();
367        let mut bytes_transferred = 0;
368        for message in from.messages.peek_queue().iter() {
369            let limit = std::cmp::min(len - bytes_transferred, to.messages.available_capacity());
370            if limit == 0 {
371                break;
372            }
373            let message = message.clone_at_most(limit);
374            bytes_transferred += message.len();
375            to.messages.write_message(message);
376        }
377        if bytes_transferred > 0 && to_was_empty {
378            to.notify_fd_events(FdEvents::POLLIN);
379        }
380        return Ok(bytes_transferred);
381    }
382}
383
384/// Creates a new pipe between the two returned FileObjects.
385///
386/// The first FileObject is the read endpoint of the pipe. The second is the
387/// write endpoint of the pipe. This order matches the order expected by
388/// sys_pipe2().
389pub fn new_pipe(
390    locked: &mut Locked<Unlocked>,
391    current_task: &CurrentTask,
392) -> Result<(FileHandle, FileHandle), Errno> {
393    let fs = current_task
394        .kernel()
395        .expando
396        .get::<FsRegistry>()
397        .create(locked, current_task, "pipefs".into(), FileSystemOptions::default())
398        .ok_or_else(|| errno!(EINVAL))??;
399    let mut info = FsNodeInfo::new(mode!(IFIFO, 0o600), current_task.current_fscred());
400    info.blksize = ATOMIC_IO_BYTES.into();
401    let node = fs.create_node_and_allocate_node_id(SpecialNode, info);
402    let pipe = node.fifo(current_task);
403    {
404        let mut state = pipe.lock();
405        state.add_reader();
406        state.add_writer();
407    }
408
409    let mut open = |flags: OpenFlags| {
410        let ops = PipeFileObject { pipe: Arc::clone(pipe) };
411        Ok(FileObject::new_anonymous(locked, current_task, Box::new(ops), Arc::clone(&node), flags))
412    };
413
414    Ok((open(OpenFlags::RDONLY)?, open(OpenFlags::WRONLY)?))
415}
416
417struct PipeFs;
418impl FileSystemOps for PipeFs {
419    fn statfs(
420        &self,
421        _locked: &mut Locked<FileOpsCore>,
422        _fs: &FileSystem,
423        _current_task: &CurrentTask,
424    ) -> Result<statfs, Errno> {
425        Ok(default_statfs(PIPEFS_MAGIC))
426    }
427    fn name(&self) -> &'static FsStr {
428        "pipefs".into()
429    }
430}
431
432fn pipe_fs(
433    locked: &mut Locked<Unlocked>,
434    current_task: &CurrentTask,
435    _options: FileSystemOptions,
436) -> Result<FileSystemHandle, Errno> {
437    struct PipeFsHandle(FileSystemHandle);
438
439    let kernel = current_task.kernel();
440    Ok(kernel
441        .expando
442        .get_or_init(|| {
443            PipeFsHandle(
444                FileSystem::new(
445                    locked,
446                    kernel,
447                    CacheMode::Uncached,
448                    PipeFs,
449                    FileSystemOptions::default(),
450                )
451                .expect("pipefs constructed with valid options"),
452            )
453        })
454        .0
455        .clone())
456}
457
458pub fn register_pipe_fs(fs_registry: &FsRegistry) {
459    fs_registry.register("pipefs".into(), pipe_fs);
460}
461
462pub struct PipeFileObject {
463    pipe: Arc<Mutex<Pipe>>,
464}
465
466impl FileOps for PipeFileObject {
467    fileops_impl_nonseekable!();
468    fileops_impl_noop_sync!();
469
470    fn close(
471        self: Box<Self>,
472        _locked: &mut Locked<FileOpsCore>,
473        file: &FileObjectState,
474        _current_task: &CurrentTask,
475    ) {
476        self.on_close(file.flags());
477    }
478
479    fn read(
480        &self,
481        locked: &mut Locked<FileOpsCore>,
482        file: &FileObject,
483        current_task: &CurrentTask,
484        offset: usize,
485        data: &mut dyn OutputBuffer,
486    ) -> Result<usize, Errno> {
487        debug_assert!(offset == 0);
488        file.blocking_op(locked, current_task, FdEvents::POLLIN | FdEvents::POLLHUP, None, |_| {
489            let mut pipe = self.pipe.lock();
490            let actual = pipe.read(data)?;
491            if actual > 0 && pipe.is_empty() {
492                pipe.notify_fd_events(FdEvents::POLLOUT);
493            }
494            Ok(actual)
495        })
496    }
497
498    fn write(
499        &self,
500        locked: &mut Locked<FileOpsCore>,
501        file: &FileObject,
502        current_task: &CurrentTask,
503        offset: usize,
504        data: &mut dyn InputBuffer,
505    ) -> Result<usize, Errno> {
506        debug_assert!(offset == 0);
507        debug_assert!(data.bytes_read() == 0);
508
509        let result = file.blocking_op(locked, current_task, FdEvents::POLLOUT, None, |locked| {
510            let mut pipe = self.pipe.lock();
511            let was_empty = pipe.is_empty();
512            let offset_before = data.bytes_read();
513            let bytes_written = pipe.write(locked, current_task, data)?;
514            debug_assert!(data.bytes_read() - offset_before == bytes_written);
515            if bytes_written > 0 && was_empty {
516                pipe.notify_fd_events(FdEvents::POLLIN);
517            }
518            if data.available() > 0 {
519                return error!(EAGAIN);
520            }
521            Ok(())
522        });
523
524        let bytes_written = data.bytes_read();
525        if bytes_written == 0 {
526            // We can only return an error if no data was actually sent. If partial data was
527            // sent, swallow the error and return how much was sent.
528            result?;
529        }
530        Ok(bytes_written)
531    }
532
533    fn wait_async(
534        &self,
535        _locked: &mut Locked<FileOpsCore>,
536        file: &FileObject,
537        _current_task: &CurrentTask,
538        waiter: &Waiter,
539        mut events: FdEvents,
540        handler: EventHandler,
541    ) -> Option<WaitCanceler> {
542        let flags = file.flags();
543        if !flags.can_read() {
544            events.remove(FdEvents::POLLIN);
545        }
546        if !flags.can_write() {
547            events.remove(FdEvents::POLLOUT);
548        }
549        Some(self.pipe.lock().waiters.wait_async_fd_events(waiter, events, handler))
550    }
551
552    fn query_events(
553        &self,
554        _locked: &mut Locked<FileOpsCore>,
555        file: &FileObject,
556        _current_task: &CurrentTask,
557    ) -> Result<FdEvents, Errno> {
558        Ok(self.pipe.lock().query_events(file.flags()))
559    }
560
561    fn fcntl(
562        &self,
563        file: &FileObject,
564        current_task: &CurrentTask,
565        cmd: u32,
566        arg: u64,
567    ) -> Result<SyscallResult, Errno> {
568        self.pipe.lock().fcntl(file, current_task, cmd, arg)
569    }
570
571    fn ioctl(
572        &self,
573        locked: &mut Locked<Unlocked>,
574        file: &FileObject,
575        current_task: &CurrentTask,
576        request: u32,
577        arg: SyscallArg,
578    ) -> Result<SyscallResult, Errno> {
579        self.pipe.lock().ioctl(file, locked, current_task, request, arg)
580    }
581}
582
583/// An OutputBuffer that will write the data to `pipe`.
584#[derive(Debug)]
585struct SpliceOutputBuffer<'a> {
586    pipe: &'a mut Pipe,
587    len: usize,
588    available: usize,
589}
590
591impl<'a> Buffer for SpliceOutputBuffer<'a> {
592    fn segments_count(&self) -> Result<usize, Errno> {
593        error!(ENOTSUP)
594    }
595
596    fn peek_each_segment(
597        &mut self,
598        _callback: &mut PeekBufferSegmentsCallback<'_>,
599    ) -> Result<(), Errno> {
600        error!(ENOTSUP)
601    }
602}
603
604impl<'a> OutputBuffer for SpliceOutputBuffer<'a> {
605    fn write_each(&mut self, callback: &mut OutputBufferCallback<'_>) -> Result<usize, Errno> {
606        // SAFETY: `callback` returns the number of bytes read on success.
607        let bytes = unsafe {
608            read_to_vec::<u8, _>(self.available, |buf| callback(buf).map(NumberOfElementsRead))
609        }?;
610        let bytes_len = bytes.len();
611        if bytes_len > 0 {
612            let was_empty = self.pipe.is_empty();
613            self.pipe.messages.write_message(PipeMessageData::from(bytes).into());
614            if was_empty {
615                self.pipe.notify_fd_events(FdEvents::POLLIN);
616            }
617            self.available -= bytes_len;
618        }
619        Ok(bytes_len)
620    }
621
622    fn available(&self) -> usize {
623        self.available
624    }
625
626    fn bytes_written(&self) -> usize {
627        self.len - self.available
628    }
629
630    fn zero(&mut self) -> Result<usize, Errno> {
631        let bytes = vec![0; self.available];
632        let len = bytes.len();
633        if len > 0 {
634            let was_empty = self.pipe.is_empty();
635            self.pipe.messages.write_message(PipeMessageData::from(bytes).into());
636            if was_empty {
637                self.pipe.notify_fd_events(FdEvents::POLLIN);
638            }
639            self.available -= len;
640        }
641        Ok(len)
642    }
643
644    unsafe fn advance(&mut self, _length: usize) -> Result<(), Errno> {
645        error!(ENOTSUP)
646    }
647}
648
649/// An InputBuffer that will read the data from `pipe`.
650#[derive(Debug)]
651struct SpliceInputBuffer<'a> {
652    pipe: &'a mut Pipe,
653    len: usize,
654    available: usize,
655}
656
657impl<'a> Buffer for SpliceInputBuffer<'a> {
658    fn segments_count(&self) -> Result<usize, Errno> {
659        Ok(self.pipe.messages.len())
660    }
661
662    fn peek_each_segment(
663        &mut self,
664        callback: &mut PeekBufferSegmentsCallback<'_>,
665    ) -> Result<(), Errno> {
666        let mut available = self.available;
667        for message in self.pipe.messages.messages() {
668            let to_read = std::cmp::min(available, message.len());
669            callback(&UserBuffer {
670                address: UserAddress::from(message.data.ptr()? as u64),
671                length: to_read,
672            });
673            available -= to_read;
674        }
675        Ok(())
676    }
677}
678
679impl<'a> InputBuffer for SpliceInputBuffer<'a> {
680    fn peek_each(&mut self, callback: &mut InputBufferCallback<'_>) -> Result<usize, Errno> {
681        let mut read = 0;
682        let mut available = self.available;
683        for message in self.pipe.messages.messages() {
684            let to_read = std::cmp::min(available, message.len());
685            let result = message.data.with_bytes(|bytes| callback(&bytes[0..to_read]))?;
686            if result > to_read {
687                return error!(EINVAL);
688            }
689            read += result;
690            available -= result;
691            if result != to_read {
692                break;
693            }
694        }
695        Ok(read)
696    }
697
698    fn available(&self) -> usize {
699        self.available
700    }
701
702    fn bytes_read(&self) -> usize {
703        self.len - self.available
704    }
705
706    fn drain(&mut self) -> usize {
707        let result = self.available;
708        self.available = 0;
709        result
710    }
711
712    fn advance(&mut self, mut length: usize) -> Result<(), Errno> {
713        if length == 0 {
714            return Ok(());
715        }
716        if length > self.available {
717            return error!(EINVAL);
718        }
719        self.available -= length;
720        while let Some(mut message) = self.pipe.messages.read_message() {
721            if let Some(data) = MessageData::split_off(&mut message.data, length) {
722                // Some data is left in the message. Push it back.
723                self.pipe.messages.write_front(data.into());
724            }
725            length -= message.len();
726            if length == 0 {
727                if self.pipe.is_empty() {
728                    self.pipe.notify_fd_events(FdEvents::POLLOUT);
729                }
730                return Ok(());
731            }
732        }
733        panic!();
734    }
735}
736
737impl PipeFileObject {
738    /// Called whenever a fd to a pipe is closed.
739    fn on_close(&self, flags: OpenFlags) {
740        let mut events = FdEvents::empty();
741        let mut pipe = self.pipe.lock();
742        if flags.can_read() {
743            assert!(pipe.reader_count > 0);
744            pipe.reader_count -= 1;
745            if pipe.reader_count == 0 {
746                events |= FdEvents::POLLOUT | FdEvents::POLLERR;
747            }
748        }
749        if flags.can_write() {
750            assert!(pipe.writer_count > 0);
751            pipe.writer_count -= 1;
752            if pipe.writer_count == 0 {
753                if pipe.reader_count > 0 {
754                    events |= FdEvents::POLLHUP;
755                }
756                if !pipe.is_empty() {
757                    events |= FdEvents::POLLIN;
758                }
759            }
760        }
761        if events != FdEvents::empty() {
762            pipe.waiters.notify_fd_events(events);
763        }
764        pipe.on_close();
765    }
766
767    /// Returns the result of `pregen` and a lock on pipe, once `condition` returns true, ensuring
768    /// `pregen` is run before the pipe is locked.
769    ///
770    /// This will wait on `events` if the file is opened in blocking mode. If the file is opened in
771    /// not blocking mode and `condition` is not realized, this will return EAGAIN.
772    fn wait_for_condition<'a, L, F, G, V>(
773        &'a self,
774        locked: &mut Locked<L>,
775        current_task: &CurrentTask,
776        file: &FileHandle,
777        condition: F,
778        pregen: G,
779        events: FdEvents,
780    ) -> Result<(V, MutexGuard<'a, Pipe>), Errno>
781    where
782        L: LockEqualOrBefore<FileOpsCore>,
783        F: Fn(&Pipe) -> bool,
784        G: Fn(&mut Locked<L>) -> Result<V, Errno>,
785    {
786        file.blocking_op(locked, current_task, events, None, |locked| {
787            let other = pregen(locked)?;
788            let pipe = self.pipe.lock();
789            if condition(&pipe) { Ok((other, pipe)) } else { error!(EAGAIN) }
790        })
791    }
792
793    /// Lock the pipe for reading, after having run `pregen`.
794    fn lock_pipe_for_reading_with<'a, L, G, V>(
795        &'a self,
796        locked: &mut Locked<L>,
797        current_task: &CurrentTask,
798        file: &FileHandle,
799        pregen: G,
800        non_blocking: bool,
801    ) -> Result<(V, MutexGuard<'a, Pipe>), Errno>
802    where
803        L: LockEqualOrBefore<FileOpsCore>,
804        G: Fn(&mut Locked<L>) -> Result<V, Errno>,
805    {
806        if non_blocking {
807            let other = pregen(locked)?;
808            let pipe = self.pipe.lock();
809            if !pipe.is_readable() {
810                return error!(EAGAIN);
811            }
812            Ok((other, pipe))
813        } else {
814            self.wait_for_condition(
815                locked,
816                current_task,
817                file,
818                |pipe| pipe.is_readable(),
819                pregen,
820                FdEvents::POLLIN | FdEvents::POLLHUP,
821            )
822        }
823    }
824
825    fn lock_pipe_for_reading<'a, L>(
826        &'a self,
827        locked: &mut Locked<L>,
828        current_task: &CurrentTask,
829        file: &FileHandle,
830        non_blocking: bool,
831    ) -> Result<MutexGuard<'a, Pipe>, Errno>
832    where
833        L: LockEqualOrBefore<FileOpsCore>,
834    {
835        self.lock_pipe_for_reading_with(locked, current_task, file, |_| Ok(()), non_blocking)
836            .map(|(_, l)| l)
837    }
838
839    /// Lock the pipe for writing, after having run `pregen`.
840    fn lock_pipe_for_writing_with<'a, L, G, V>(
841        &'a self,
842        locked: &mut Locked<L>,
843        current_task: &CurrentTask,
844        file: &FileHandle,
845        pregen: G,
846        non_blocking: bool,
847        len: usize,
848    ) -> Result<(V, MutexGuard<'a, Pipe>), Errno>
849    where
850        L: LockEqualOrBefore<FileOpsCore>,
851        G: Fn(&mut Locked<L>) -> Result<V, Errno>,
852    {
853        if non_blocking {
854            let other = pregen(locked)?;
855            let pipe = self.pipe.lock();
856            if !pipe.is_writable(len) {
857                return error!(EAGAIN);
858            }
859            Ok((other, pipe))
860        } else {
861            self.wait_for_condition(
862                locked,
863                current_task,
864                file,
865                |pipe| pipe.is_writable(len),
866                pregen,
867                FdEvents::POLLOUT,
868            )
869        }
870    }
871
872    fn lock_pipe_for_writing<'a, L>(
873        &'a self,
874        locked: &mut Locked<L>,
875        current_task: &CurrentTask,
876        file: &FileHandle,
877        non_blocking: bool,
878        len: usize,
879    ) -> Result<MutexGuard<'a, Pipe>, Errno>
880    where
881        L: LockEqualOrBefore<FileOpsCore>,
882    {
883        self.lock_pipe_for_writing_with(locked, current_task, file, |_| Ok(()), non_blocking, len)
884            .map(|(_, l)| l)
885    }
886
887    /// Splice from the given file handle to this pipe.
888    ///
889    /// The given file handle must not be a pipe. If you wish to splice between two pipes, use
890    /// `lock_pipes` and `Pipe::splice`.
891    pub fn splice_from<L>(
892        &self,
893        locked: &mut Locked<L>,
894        current_task: &CurrentTask,
895        self_file: &FileHandle,
896        from: &FileHandle,
897        maybe_offset: Option<usize>,
898        len: usize,
899        non_blocking: bool,
900    ) -> Result<usize, Errno>
901    where
902        L: LockEqualOrBefore<FileOpsCore>,
903    {
904        // If both ends are pipes, use `lock_pipes` and `Pipe::splice`.
905        assert!(from.downcast_file::<PipeFileObject>().is_none());
906
907        let mut pipe =
908            self.lock_pipe_for_writing(locked, current_task, self_file, non_blocking, len)?;
909        let len = std::cmp::min(len, pipe.messages.available_capacity());
910        let mut buffer = SpliceOutputBuffer { pipe: &mut pipe, len, available: len };
911        if let Some(offset) = maybe_offset {
912            from.read_at(locked, current_task, offset, &mut buffer)
913        } else {
914            from.read(locked, current_task, &mut buffer)
915        }
916    }
917
918    /// Splice from this pipe to the given file handle.
919    ///
920    /// The given file handle must not be a pipe. If you wish to splice between two pipes, use
921    /// `lock_pipes` and `Pipe::splice`.
922    pub fn splice_to<L>(
923        &self,
924        locked: &mut Locked<L>,
925        current_task: &CurrentTask,
926        self_file: &FileHandle,
927        to: &FileHandle,
928        maybe_offset: Option<usize>,
929        len: usize,
930        non_blocking: bool,
931    ) -> Result<usize, Errno>
932    where
933        L: LockEqualOrBefore<FileOpsCore>,
934    {
935        // If both ends are pipes, use `lock_pipes` and `Pipe::splice`.
936        assert!(to.downcast_file::<PipeFileObject>().is_none());
937
938        let mut pipe = self.lock_pipe_for_reading(locked, current_task, self_file, non_blocking)?;
939        let len = std::cmp::min(len, pipe.messages.len());
940        let mut buffer = SpliceInputBuffer { pipe: &mut pipe, len, available: len };
941        if let Some(offset) = maybe_offset {
942            to.write_at(locked, current_task, offset, &mut buffer)
943        } else {
944            to.write(locked, current_task, &mut buffer)
945        }
946    }
947
948    /// Share the mappings backing the given input buffer into the pipe.
949    ///
950    /// Returns the number of bytes enqueued.
951    pub fn vmsplice_from<L>(
952        &self,
953        locked: &mut Locked<L>,
954        current_task: &CurrentTask,
955        self_file: &FileHandle,
956        mut iovec: UserBuffers,
957        non_blocking: bool,
958    ) -> Result<usize, Errno>
959    where
960        L: LockEqualOrBefore<FileOpsCore>,
961    {
962        let locked = locked.cast_locked::<FileOpsCore>();
963        let locked = locked;
964        let available = UserBuffer::cap_buffers_to_max_rw_count(
965            current_task.maximum_valid_address().ok_or_else(|| errno!(EINVAL))?,
966            &mut iovec,
967        )?;
968        let mappings = current_task.mm()?.get_mappings_for_vmsplice(&iovec)?;
969
970        let mut pipe =
971            self.lock_pipe_for_writing(locked, current_task, self_file, non_blocking, available)?;
972
973        if pipe.reader_count == 0 {
974            send_standard_signal(locked, current_task, SignalInfo::default(SIGPIPE));
975            return error!(EPIPE);
976        }
977
978        let was_empty = pipe.is_empty();
979        let mut remaining = std::cmp::min(available, pipe.messages.available_capacity());
980
981        let mut bytes_transferred = 0;
982        for mut mapping in mappings.into_iter() {
983            mapping.truncate(remaining);
984            let actual = mapping.len();
985
986            pipe.messages.write_message(PipeMessageData::Vmspliced(mapping).into());
987            remaining -= actual;
988            bytes_transferred += actual;
989
990            if remaining == 0 {
991                break;
992            }
993        }
994        if bytes_transferred > 0 && was_empty {
995            pipe.notify_fd_events(FdEvents::POLLIN);
996        }
997        Ok(bytes_transferred)
998    }
999
1000    /// Copy data from the pipe to the given output buffer.
1001    ///
1002    /// Returns the number of bytes transferred.
1003    pub fn vmsplice_to<L>(
1004        &self,
1005        locked: &mut Locked<L>,
1006        current_task: &CurrentTask,
1007        self_file: &FileHandle,
1008        iovec: UserBuffers,
1009        non_blocking: bool,
1010    ) -> Result<usize, Errno>
1011    where
1012        L: LockEqualOrBefore<FileOpsCore>,
1013    {
1014        let mut pipe = self.lock_pipe_for_reading(locked, current_task, self_file, non_blocking)?;
1015
1016        let mut data = UserBuffersOutputBuffer::unified_new(current_task, iovec)?;
1017        let len = std::cmp::min(data.available(), pipe.messages.len());
1018        let mut buffer = SpliceInputBuffer { pipe: &mut pipe, len, available: len };
1019        data.write_buffer(&mut buffer)
1020    }
1021
1022    /// Obtain the pipe objects from the given file handles, if they are both pipes.
1023    ///
1024    /// Returns EINVAL if one (or both) of the given file handles is not a pipe.
1025    ///
1026    /// Obtains the locks on the pipes in the correct order to avoid deadlocks.
1027    pub fn lock_pipes<'a, 'b, L>(
1028        locked: &mut Locked<L>,
1029        current_task: &CurrentTask,
1030        file_in: &'a FileHandle,
1031        file_out: &'b FileHandle,
1032        len: usize,
1033        non_blocking: bool,
1034    ) -> Result<PipeOperands<'a, 'b>, Errno>
1035    where
1036        L: LockEqualOrBefore<FileOpsCore>,
1037    {
1038        let pipe_in = file_in.downcast_file::<PipeFileObject>().ok_or_else(|| errno!(EINVAL))?;
1039        let pipe_out = file_out.downcast_file::<PipeFileObject>().ok_or_else(|| errno!(EINVAL))?;
1040
1041        let node_cmp =
1042            Arc::as_ptr(&file_in.name.entry.node).cmp(&Arc::as_ptr(&file_out.name.entry.node));
1043
1044        match node_cmp {
1045            Ordering::Equal => error!(EINVAL),
1046            Ordering::Less => {
1047                let (write, read) = pipe_in.lock_pipe_for_reading_with(
1048                    locked,
1049                    current_task,
1050                    file_in,
1051                    |locked| {
1052                        pipe_out.lock_pipe_for_writing(
1053                            locked,
1054                            current_task,
1055                            file_out,
1056                            non_blocking,
1057                            len,
1058                        )
1059                    },
1060                    non_blocking,
1061                )?;
1062                Ok(PipeOperands { read, write })
1063            }
1064            Ordering::Greater => {
1065                let (read, write) = pipe_out.lock_pipe_for_writing_with(
1066                    locked,
1067                    current_task,
1068                    file_out,
1069                    |locked| {
1070                        pipe_in.lock_pipe_for_reading(locked, current_task, file_in, non_blocking)
1071                    },
1072                    non_blocking,
1073                    len,
1074                )?;
1075                Ok(PipeOperands { read, write })
1076            }
1077        }
1078    }
1079}
1080
1081pub struct PipeOperands<'a, 'b> {
1082    pub read: MutexGuard<'a, Pipe>,
1083    pub write: MutexGuard<'b, Pipe>,
1084}