1use crate::mm::{
6 MemoryAccessorExt, NumberOfElementsRead, PAGE_SIZE, TaskMemoryAccessor, read_to_vec,
7};
8use crate::security;
9use crate::signals::{SignalInfo, send_standard_signal};
10use crate::task::{CurrentTask, EventHandler, WaitCallback, WaitCanceler, WaitQueue, Waiter};
11use crate::vfs::buffers::{
12 Buffer, InputBuffer, InputBufferCallback, MessageData, MessageQueue, OutputBuffer,
13 OutputBufferCallback, PeekBufferSegmentsCallback, PipeMessageData, UserBuffersOutputBuffer,
14};
15use crate::vfs::fs_registry::FsRegistry;
16use crate::vfs::{
17 CacheMode, FileHandle, FileObject, FileObjectState, FileOps, FileSystem, FileSystemHandle,
18 FileSystemOps, FileSystemOptions, FsNodeInfo, FsStr, SpecialNode, default_fcntl, default_ioctl,
19 fileops_impl_nonseekable, fileops_impl_noop_sync,
20};
21use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex, MutexGuard, Unlocked};
22use starnix_syscalls::{SUCCESS, SyscallArg, SyscallResult};
23use starnix_types::user_buffer::{UserBuffer, UserBuffers};
24use starnix_types::vfs::default_statfs;
25use starnix_uapi::auth::CAP_SYS_RESOURCE;
26use starnix_uapi::errors::Errno;
27use starnix_uapi::file_mode::mode;
28use starnix_uapi::open_flags::OpenFlags;
29use starnix_uapi::signals::SIGPIPE;
30use starnix_uapi::user_address::{UserAddress, UserRef};
31use starnix_uapi::vfs::FdEvents;
32use starnix_uapi::{
33 F_GETPIPE_SZ, F_SETPIPE_SZ, FIONREAD, PIPEFS_MAGIC, errno, error, statfs, uapi,
34};
35use std::cmp::Ordering;
36use std::sync::Arc;
37
38const ATOMIC_IO_BYTES: u16 = 4096;
39
40const PIPE_MAX_SIZE: usize = 1 << 31;
42
43fn round_up(value: usize, increment: usize) -> usize {
44 (value + (increment - 1)) & !(increment - 1)
45}
46
47#[derive(Debug)]
48pub struct Pipe {
49 messages: MessageQueue<PipeMessageData>,
50
51 waiters: WaitQueue,
52
53 reader_count: usize,
55
56 had_reader: bool,
58
59 writer_count: usize,
61
62 had_writer: bool,
64}
65
66pub type PipeHandle = Arc<Mutex<Pipe>>;
67
68impl Pipe {
69 pub fn new(default_pipe_capacity: usize) -> PipeHandle {
70 Arc::new(Mutex::new(Pipe {
71 messages: MessageQueue::new(default_pipe_capacity),
72 waiters: WaitQueue::default(),
73 reader_count: 0,
74 had_reader: false,
75 writer_count: 0,
76 had_writer: false,
77 }))
78 }
79
80 pub fn open(
81 locked: &mut Locked<Unlocked>,
82 current_task: &CurrentTask,
83 pipe: &Arc<Mutex<Self>>,
84 flags: OpenFlags,
85 ) -> Result<Box<dyn FileOps>, Errno> {
86 let mut events = FdEvents::empty();
87 let mut pipe_locked = pipe.lock();
88 let mut must_wait_events = FdEvents::empty();
89 if flags.can_read() {
90 if !pipe_locked.had_reader {
91 events |= FdEvents::POLLOUT;
92 }
93 pipe_locked.add_reader();
94 if !flags.contains(OpenFlags::NONBLOCK) && !flags.can_write() && !pipe_locked.had_writer
95 {
96 must_wait_events |= FdEvents::POLLIN;
97 }
98 }
99 if flags.can_write() {
100 if flags.contains(OpenFlags::NONBLOCK) && pipe_locked.reader_count == 0 {
105 assert!(!flags.can_read()); return error!(ENXIO);
107 }
108 if !pipe_locked.had_writer {
109 events |= FdEvents::POLLIN;
110 }
111 pipe_locked.add_writer();
112 if !flags.contains(OpenFlags::NONBLOCK) && !pipe_locked.had_reader {
113 must_wait_events |= FdEvents::POLLOUT;
114 }
115 }
116 if events != FdEvents::empty() {
117 pipe_locked.waiters.notify_fd_events(events);
118 }
119 let ops = PipeFileObject { pipe: Arc::clone(pipe) };
120 if must_wait_events == FdEvents::empty() {
121 return Ok(Box::new(ops));
122 }
123
124 let ops = scopeguard::guard(ops, |ops| {
126 ops.on_close(flags);
127 });
128
129 let waiter = Waiter::new();
131 loop {
132 pipe_locked.waiters.wait_async_fd_events(
133 &waiter,
134 must_wait_events,
135 WaitCallback::none(),
136 );
137 std::mem::drop(pipe_locked);
138 match waiter.wait(locked, current_task) {
139 Err(e) => {
140 return Err(e);
141 }
142 _ => {}
143 }
144 pipe_locked = pipe.lock();
145 if pipe_locked.had_writer && pipe_locked.had_reader {
146 return Ok(Box::new(scopeguard::ScopeGuard::into_inner(ops)));
147 }
148 }
149 }
150
151 pub fn add_reader(&mut self) {
153 self.reader_count += 1;
154 self.had_reader = true;
155 }
156
157 pub fn add_writer(&mut self) {
159 self.writer_count += 1;
160 self.had_writer = true;
161 }
162
163 pub fn on_close(&mut self) {
166 if self.reader_count == 0 && self.writer_count == 0 {
167 self.had_reader = false;
168 self.had_writer = false;
169 self.messages = MessageQueue::new(self.messages.capacity());
170 self.waiters = WaitQueue::default();
171 }
172 }
173
174 fn is_empty(&self) -> bool {
175 self.messages.is_empty()
176 }
177
178 fn capacity(&self) -> usize {
179 self.messages.capacity()
180 }
181
182 fn set_capacity(
183 &mut self,
184 current_task: &CurrentTask,
185 mut requested_capacity: usize,
186 ) -> Result<(), Errno> {
187 if requested_capacity > PIPE_MAX_SIZE {
188 return error!(EINVAL);
189 }
190 if requested_capacity
191 > current_task
192 .kernel()
193 .system_limits
194 .pipe_max_size
195 .load(std::sync::atomic::Ordering::Relaxed)
196 {
197 security::check_task_capable(current_task, CAP_SYS_RESOURCE)?;
198 }
199 let page_size = *PAGE_SIZE as usize;
200 if requested_capacity < page_size {
201 requested_capacity = page_size;
202 }
203 requested_capacity = round_up(requested_capacity, page_size);
204 self.messages.set_capacity(requested_capacity)
205 }
206
207 fn is_readable(&self) -> bool {
208 !self.is_empty() || (self.writer_count == 0 && self.had_writer)
209 }
210
211 fn is_writable(&self, data_size: usize) -> bool {
213 let available_capacity = self.messages.available_capacity();
214 self.had_reader
217 && (available_capacity >= data_size
218 || (available_capacity > 0 && data_size > uapi::PIPE_BUF as usize))
219 }
220
221 pub fn read(&mut self, data: &mut dyn OutputBuffer) -> Result<usize, Errno> {
222 if !self.is_readable() {
231 return error!(EAGAIN);
232 }
233
234 self.messages.read_stream(data).map(|info| info.bytes_read)
235 }
236
237 pub fn write(
238 &mut self,
239 locked: &mut Locked<FileOpsCore>,
240 current_task: &CurrentTask,
241 data: &mut dyn InputBuffer,
242 ) -> Result<usize, Errno> {
243 if !self.had_reader {
244 return error!(EAGAIN);
245 }
246
247 if self.reader_count == 0 {
248 send_standard_signal(locked, current_task, SignalInfo::default(SIGPIPE));
249 return error!(EPIPE);
250 }
251
252 if !self.is_writable(data.available()) {
253 return error!(EAGAIN);
254 }
255
256 self.messages.write_stream(data, None, &mut vec![])
257 }
258
259 fn query_events(&self, flags: OpenFlags) -> FdEvents {
260 let mut events = FdEvents::empty();
261
262 if flags.can_read() && self.is_readable() {
263 let writer_closed = self.writer_count == 0 && self.had_writer;
264 let has_data = !self.is_empty();
265 if writer_closed {
266 events |= FdEvents::POLLHUP;
267 }
268 if !writer_closed || has_data {
269 events |= FdEvents::POLLIN;
270 }
271 }
272
273 if flags.can_write() && self.is_writable(1) {
274 if self.reader_count == 0 && self.had_reader {
275 events |= FdEvents::POLLERR;
276 }
277
278 events |= FdEvents::POLLOUT;
279 }
280
281 events
282 }
283
284 fn fcntl(
285 &mut self,
286 _file: &FileObject,
287 current_task: &CurrentTask,
288 cmd: u32,
289 arg: u64,
290 ) -> Result<SyscallResult, Errno> {
291 match cmd {
292 F_GETPIPE_SZ => Ok(self.capacity().into()),
293 F_SETPIPE_SZ => {
294 self.set_capacity(current_task, arg as usize)?;
295 Ok(self.capacity().into())
296 }
297 _ => default_fcntl(cmd),
298 }
299 }
300
301 fn ioctl(
302 &self,
303 file: &FileObject,
304 locked: &mut Locked<Unlocked>,
305 current_task: &CurrentTask,
306 request: u32,
307 arg: SyscallArg,
308 ) -> Result<SyscallResult, Errno> {
309 let user_addr = UserAddress::from(arg);
310 match request {
311 FIONREAD => {
312 let addr = UserRef::<i32>::new(user_addr);
313 let value: i32 = self.messages.len().try_into().map_err(|_| errno!(EINVAL))?;
314 current_task.write_object(addr, &value)?;
315 Ok(SUCCESS)
316 }
317 _ => default_ioctl(file, locked, current_task, request, arg),
318 }
319 }
320
321 fn notify_fd_events(&self, events: FdEvents) {
322 self.waiters.notify_fd_events(events);
323 }
324
325 pub fn splice(from: &mut Pipe, to: &mut Pipe, len: usize) -> Result<usize, Errno> {
327 if len == 0 {
328 return Ok(0);
329 }
330 let to_was_empty = to.is_empty();
331 let mut bytes_transferred = 0;
332 loop {
333 let limit = std::cmp::min(len - bytes_transferred, to.messages.available_capacity());
334 if limit == 0 {
335 break;
337 }
338 let Some(mut message) = from.messages.read_message() else {
339 break;
341 };
342 if let Some(data) = MessageData::split_off(&mut message.data, limit) {
343 assert!(data.len() > 0);
345 from.messages.write_front(data.into());
346 }
347 bytes_transferred += message.len();
348 to.messages.write_message(message);
349 }
350 if bytes_transferred > 0 {
351 if from.is_empty() {
352 from.notify_fd_events(FdEvents::POLLOUT);
353 }
354 if to_was_empty {
355 to.notify_fd_events(FdEvents::POLLIN);
356 }
357 }
358 return Ok(bytes_transferred);
359 }
360
361 pub fn tee(from: &mut Pipe, to: &mut Pipe, len: usize) -> Result<usize, Errno> {
363 if len == 0 {
364 return Ok(0);
365 }
366 let to_was_empty = to.is_empty();
367 let mut bytes_transferred = 0;
368 for message in from.messages.peek_queue().iter() {
369 let limit = std::cmp::min(len - bytes_transferred, to.messages.available_capacity());
370 if limit == 0 {
371 break;
372 }
373 let message = message.clone_at_most(limit);
374 bytes_transferred += message.len();
375 to.messages.write_message(message);
376 }
377 if bytes_transferred > 0 && to_was_empty {
378 to.notify_fd_events(FdEvents::POLLIN);
379 }
380 return Ok(bytes_transferred);
381 }
382}
383
384pub fn new_pipe(
390 locked: &mut Locked<Unlocked>,
391 current_task: &CurrentTask,
392) -> Result<(FileHandle, FileHandle), Errno> {
393 let fs = current_task
394 .kernel()
395 .expando
396 .get::<FsRegistry>()
397 .create(locked, current_task, "pipefs".into(), FileSystemOptions::default())
398 .ok_or_else(|| errno!(EINVAL))??;
399 let mut info = FsNodeInfo::new(mode!(IFIFO, 0o600), current_task.current_fscred());
400 info.blksize = ATOMIC_IO_BYTES.into();
401 let node = fs.create_node_and_allocate_node_id(SpecialNode, info);
402 let pipe = node.fifo(current_task);
403 {
404 let mut state = pipe.lock();
405 state.add_reader();
406 state.add_writer();
407 }
408
409 let mut open = |flags: OpenFlags| {
410 let ops = PipeFileObject { pipe: Arc::clone(pipe) };
411 Ok(FileObject::new_anonymous(locked, current_task, Box::new(ops), Arc::clone(&node), flags))
412 };
413
414 Ok((open(OpenFlags::RDONLY)?, open(OpenFlags::WRONLY)?))
415}
416
417struct PipeFs;
418impl FileSystemOps for PipeFs {
419 fn statfs(
420 &self,
421 _locked: &mut Locked<FileOpsCore>,
422 _fs: &FileSystem,
423 _current_task: &CurrentTask,
424 ) -> Result<statfs, Errno> {
425 Ok(default_statfs(PIPEFS_MAGIC))
426 }
427 fn name(&self) -> &'static FsStr {
428 "pipefs".into()
429 }
430}
431
432fn pipe_fs(
433 locked: &mut Locked<Unlocked>,
434 current_task: &CurrentTask,
435 _options: FileSystemOptions,
436) -> Result<FileSystemHandle, Errno> {
437 struct PipeFsHandle(FileSystemHandle);
438
439 let kernel = current_task.kernel();
440 Ok(kernel
441 .expando
442 .get_or_init(|| {
443 PipeFsHandle(
444 FileSystem::new(
445 locked,
446 kernel,
447 CacheMode::Uncached,
448 PipeFs,
449 FileSystemOptions::default(),
450 )
451 .expect("pipefs constructed with valid options"),
452 )
453 })
454 .0
455 .clone())
456}
457
458pub fn register_pipe_fs(fs_registry: &FsRegistry) {
459 fs_registry.register("pipefs".into(), pipe_fs);
460}
461
462pub struct PipeFileObject {
463 pipe: Arc<Mutex<Pipe>>,
464}
465
466impl FileOps for PipeFileObject {
467 fileops_impl_nonseekable!();
468 fileops_impl_noop_sync!();
469
470 fn close(
471 self: Box<Self>,
472 _locked: &mut Locked<FileOpsCore>,
473 file: &FileObjectState,
474 _current_task: &CurrentTask,
475 ) {
476 self.on_close(file.flags());
477 }
478
479 fn read(
480 &self,
481 locked: &mut Locked<FileOpsCore>,
482 file: &FileObject,
483 current_task: &CurrentTask,
484 offset: usize,
485 data: &mut dyn OutputBuffer,
486 ) -> Result<usize, Errno> {
487 debug_assert!(offset == 0);
488 file.blocking_op(locked, current_task, FdEvents::POLLIN | FdEvents::POLLHUP, None, |_| {
489 let mut pipe = self.pipe.lock();
490 let actual = pipe.read(data)?;
491 if actual > 0 && pipe.is_empty() {
492 pipe.notify_fd_events(FdEvents::POLLOUT);
493 }
494 Ok(actual)
495 })
496 }
497
498 fn write(
499 &self,
500 locked: &mut Locked<FileOpsCore>,
501 file: &FileObject,
502 current_task: &CurrentTask,
503 offset: usize,
504 data: &mut dyn InputBuffer,
505 ) -> Result<usize, Errno> {
506 debug_assert!(offset == 0);
507 debug_assert!(data.bytes_read() == 0);
508
509 let result = file.blocking_op(locked, current_task, FdEvents::POLLOUT, None, |locked| {
510 let mut pipe = self.pipe.lock();
511 let was_empty = pipe.is_empty();
512 let offset_before = data.bytes_read();
513 let bytes_written = pipe.write(locked, current_task, data)?;
514 debug_assert!(data.bytes_read() - offset_before == bytes_written);
515 if bytes_written > 0 && was_empty {
516 pipe.notify_fd_events(FdEvents::POLLIN);
517 }
518 if data.available() > 0 {
519 return error!(EAGAIN);
520 }
521 Ok(())
522 });
523
524 let bytes_written = data.bytes_read();
525 if bytes_written == 0 {
526 result?;
529 }
530 Ok(bytes_written)
531 }
532
533 fn wait_async(
534 &self,
535 _locked: &mut Locked<FileOpsCore>,
536 file: &FileObject,
537 _current_task: &CurrentTask,
538 waiter: &Waiter,
539 mut events: FdEvents,
540 handler: EventHandler,
541 ) -> Option<WaitCanceler> {
542 let flags = file.flags();
543 if !flags.can_read() {
544 events.remove(FdEvents::POLLIN);
545 }
546 if !flags.can_write() {
547 events.remove(FdEvents::POLLOUT);
548 }
549 Some(self.pipe.lock().waiters.wait_async_fd_events(waiter, events, handler))
550 }
551
552 fn query_events(
553 &self,
554 _locked: &mut Locked<FileOpsCore>,
555 file: &FileObject,
556 _current_task: &CurrentTask,
557 ) -> Result<FdEvents, Errno> {
558 Ok(self.pipe.lock().query_events(file.flags()))
559 }
560
561 fn fcntl(
562 &self,
563 file: &FileObject,
564 current_task: &CurrentTask,
565 cmd: u32,
566 arg: u64,
567 ) -> Result<SyscallResult, Errno> {
568 self.pipe.lock().fcntl(file, current_task, cmd, arg)
569 }
570
571 fn ioctl(
572 &self,
573 locked: &mut Locked<Unlocked>,
574 file: &FileObject,
575 current_task: &CurrentTask,
576 request: u32,
577 arg: SyscallArg,
578 ) -> Result<SyscallResult, Errno> {
579 self.pipe.lock().ioctl(file, locked, current_task, request, arg)
580 }
581}
582
583#[derive(Debug)]
585struct SpliceOutputBuffer<'a> {
586 pipe: &'a mut Pipe,
587 len: usize,
588 available: usize,
589}
590
591impl<'a> Buffer for SpliceOutputBuffer<'a> {
592 fn segments_count(&self) -> Result<usize, Errno> {
593 error!(ENOTSUP)
594 }
595
596 fn peek_each_segment(
597 &mut self,
598 _callback: &mut PeekBufferSegmentsCallback<'_>,
599 ) -> Result<(), Errno> {
600 error!(ENOTSUP)
601 }
602}
603
604impl<'a> OutputBuffer for SpliceOutputBuffer<'a> {
605 fn write_each(&mut self, callback: &mut OutputBufferCallback<'_>) -> Result<usize, Errno> {
606 let bytes = unsafe {
608 read_to_vec::<u8, _>(self.available, |buf| callback(buf).map(NumberOfElementsRead))
609 }?;
610 let bytes_len = bytes.len();
611 if bytes_len > 0 {
612 let was_empty = self.pipe.is_empty();
613 self.pipe.messages.write_message(PipeMessageData::from(bytes).into());
614 if was_empty {
615 self.pipe.notify_fd_events(FdEvents::POLLIN);
616 }
617 self.available -= bytes_len;
618 }
619 Ok(bytes_len)
620 }
621
622 fn available(&self) -> usize {
623 self.available
624 }
625
626 fn bytes_written(&self) -> usize {
627 self.len - self.available
628 }
629
630 fn zero(&mut self) -> Result<usize, Errno> {
631 let bytes = vec![0; self.available];
632 let len = bytes.len();
633 if len > 0 {
634 let was_empty = self.pipe.is_empty();
635 self.pipe.messages.write_message(PipeMessageData::from(bytes).into());
636 if was_empty {
637 self.pipe.notify_fd_events(FdEvents::POLLIN);
638 }
639 self.available -= len;
640 }
641 Ok(len)
642 }
643
644 unsafe fn advance(&mut self, _length: usize) -> Result<(), Errno> {
645 error!(ENOTSUP)
646 }
647}
648
649#[derive(Debug)]
651struct SpliceInputBuffer<'a> {
652 pipe: &'a mut Pipe,
653 len: usize,
654 available: usize,
655}
656
657impl<'a> Buffer for SpliceInputBuffer<'a> {
658 fn segments_count(&self) -> Result<usize, Errno> {
659 Ok(self.pipe.messages.len())
660 }
661
662 fn peek_each_segment(
663 &mut self,
664 callback: &mut PeekBufferSegmentsCallback<'_>,
665 ) -> Result<(), Errno> {
666 let mut available = self.available;
667 for message in self.pipe.messages.messages() {
668 let to_read = std::cmp::min(available, message.len());
669 callback(&UserBuffer {
670 address: UserAddress::from(message.data.ptr()? as u64),
671 length: to_read,
672 });
673 available -= to_read;
674 }
675 Ok(())
676 }
677}
678
679impl<'a> InputBuffer for SpliceInputBuffer<'a> {
680 fn peek_each(&mut self, callback: &mut InputBufferCallback<'_>) -> Result<usize, Errno> {
681 let mut read = 0;
682 let mut available = self.available;
683 for message in self.pipe.messages.messages() {
684 let to_read = std::cmp::min(available, message.len());
685 let result = message.data.with_bytes(|bytes| callback(&bytes[0..to_read]))?;
686 if result > to_read {
687 return error!(EINVAL);
688 }
689 read += result;
690 available -= result;
691 if result != to_read {
692 break;
693 }
694 }
695 Ok(read)
696 }
697
698 fn available(&self) -> usize {
699 self.available
700 }
701
702 fn bytes_read(&self) -> usize {
703 self.len - self.available
704 }
705
706 fn drain(&mut self) -> usize {
707 let result = self.available;
708 self.available = 0;
709 result
710 }
711
712 fn advance(&mut self, mut length: usize) -> Result<(), Errno> {
713 if length == 0 {
714 return Ok(());
715 }
716 if length > self.available {
717 return error!(EINVAL);
718 }
719 self.available -= length;
720 while let Some(mut message) = self.pipe.messages.read_message() {
721 if let Some(data) = MessageData::split_off(&mut message.data, length) {
722 self.pipe.messages.write_front(data.into());
724 }
725 length -= message.len();
726 if length == 0 {
727 if self.pipe.is_empty() {
728 self.pipe.notify_fd_events(FdEvents::POLLOUT);
729 }
730 return Ok(());
731 }
732 }
733 panic!();
734 }
735}
736
737impl PipeFileObject {
738 fn on_close(&self, flags: OpenFlags) {
740 let mut events = FdEvents::empty();
741 let mut pipe = self.pipe.lock();
742 if flags.can_read() {
743 assert!(pipe.reader_count > 0);
744 pipe.reader_count -= 1;
745 if pipe.reader_count == 0 {
746 events |= FdEvents::POLLOUT | FdEvents::POLLERR;
747 }
748 }
749 if flags.can_write() {
750 assert!(pipe.writer_count > 0);
751 pipe.writer_count -= 1;
752 if pipe.writer_count == 0 {
753 if pipe.reader_count > 0 {
754 events |= FdEvents::POLLHUP;
755 }
756 if !pipe.is_empty() {
757 events |= FdEvents::POLLIN;
758 }
759 }
760 }
761 if events != FdEvents::empty() {
762 pipe.waiters.notify_fd_events(events);
763 }
764 pipe.on_close();
765 }
766
767 fn wait_for_condition<'a, L, F, G, V>(
773 &'a self,
774 locked: &mut Locked<L>,
775 current_task: &CurrentTask,
776 file: &FileHandle,
777 condition: F,
778 pregen: G,
779 events: FdEvents,
780 ) -> Result<(V, MutexGuard<'a, Pipe>), Errno>
781 where
782 L: LockEqualOrBefore<FileOpsCore>,
783 F: Fn(&Pipe) -> bool,
784 G: Fn(&mut Locked<L>) -> Result<V, Errno>,
785 {
786 file.blocking_op(locked, current_task, events, None, |locked| {
787 let other = pregen(locked)?;
788 let pipe = self.pipe.lock();
789 if condition(&pipe) { Ok((other, pipe)) } else { error!(EAGAIN) }
790 })
791 }
792
793 fn lock_pipe_for_reading_with<'a, L, G, V>(
795 &'a self,
796 locked: &mut Locked<L>,
797 current_task: &CurrentTask,
798 file: &FileHandle,
799 pregen: G,
800 non_blocking: bool,
801 ) -> Result<(V, MutexGuard<'a, Pipe>), Errno>
802 where
803 L: LockEqualOrBefore<FileOpsCore>,
804 G: Fn(&mut Locked<L>) -> Result<V, Errno>,
805 {
806 if non_blocking {
807 let other = pregen(locked)?;
808 let pipe = self.pipe.lock();
809 if !pipe.is_readable() {
810 return error!(EAGAIN);
811 }
812 Ok((other, pipe))
813 } else {
814 self.wait_for_condition(
815 locked,
816 current_task,
817 file,
818 |pipe| pipe.is_readable(),
819 pregen,
820 FdEvents::POLLIN | FdEvents::POLLHUP,
821 )
822 }
823 }
824
825 fn lock_pipe_for_reading<'a, L>(
826 &'a self,
827 locked: &mut Locked<L>,
828 current_task: &CurrentTask,
829 file: &FileHandle,
830 non_blocking: bool,
831 ) -> Result<MutexGuard<'a, Pipe>, Errno>
832 where
833 L: LockEqualOrBefore<FileOpsCore>,
834 {
835 self.lock_pipe_for_reading_with(locked, current_task, file, |_| Ok(()), non_blocking)
836 .map(|(_, l)| l)
837 }
838
839 fn lock_pipe_for_writing_with<'a, L, G, V>(
841 &'a self,
842 locked: &mut Locked<L>,
843 current_task: &CurrentTask,
844 file: &FileHandle,
845 pregen: G,
846 non_blocking: bool,
847 len: usize,
848 ) -> Result<(V, MutexGuard<'a, Pipe>), Errno>
849 where
850 L: LockEqualOrBefore<FileOpsCore>,
851 G: Fn(&mut Locked<L>) -> Result<V, Errno>,
852 {
853 if non_blocking {
854 let other = pregen(locked)?;
855 let pipe = self.pipe.lock();
856 if !pipe.is_writable(len) {
857 return error!(EAGAIN);
858 }
859 Ok((other, pipe))
860 } else {
861 self.wait_for_condition(
862 locked,
863 current_task,
864 file,
865 |pipe| pipe.is_writable(len),
866 pregen,
867 FdEvents::POLLOUT,
868 )
869 }
870 }
871
872 fn lock_pipe_for_writing<'a, L>(
873 &'a self,
874 locked: &mut Locked<L>,
875 current_task: &CurrentTask,
876 file: &FileHandle,
877 non_blocking: bool,
878 len: usize,
879 ) -> Result<MutexGuard<'a, Pipe>, Errno>
880 where
881 L: LockEqualOrBefore<FileOpsCore>,
882 {
883 self.lock_pipe_for_writing_with(locked, current_task, file, |_| Ok(()), non_blocking, len)
884 .map(|(_, l)| l)
885 }
886
887 pub fn splice_from<L>(
892 &self,
893 locked: &mut Locked<L>,
894 current_task: &CurrentTask,
895 self_file: &FileHandle,
896 from: &FileHandle,
897 maybe_offset: Option<usize>,
898 len: usize,
899 non_blocking: bool,
900 ) -> Result<usize, Errno>
901 where
902 L: LockEqualOrBefore<FileOpsCore>,
903 {
904 assert!(from.downcast_file::<PipeFileObject>().is_none());
906
907 let mut pipe =
908 self.lock_pipe_for_writing(locked, current_task, self_file, non_blocking, len)?;
909 let len = std::cmp::min(len, pipe.messages.available_capacity());
910 let mut buffer = SpliceOutputBuffer { pipe: &mut pipe, len, available: len };
911 if let Some(offset) = maybe_offset {
912 from.read_at(locked, current_task, offset, &mut buffer)
913 } else {
914 from.read(locked, current_task, &mut buffer)
915 }
916 }
917
918 pub fn splice_to<L>(
923 &self,
924 locked: &mut Locked<L>,
925 current_task: &CurrentTask,
926 self_file: &FileHandle,
927 to: &FileHandle,
928 maybe_offset: Option<usize>,
929 len: usize,
930 non_blocking: bool,
931 ) -> Result<usize, Errno>
932 where
933 L: LockEqualOrBefore<FileOpsCore>,
934 {
935 assert!(to.downcast_file::<PipeFileObject>().is_none());
937
938 let mut pipe = self.lock_pipe_for_reading(locked, current_task, self_file, non_blocking)?;
939 let len = std::cmp::min(len, pipe.messages.len());
940 let mut buffer = SpliceInputBuffer { pipe: &mut pipe, len, available: len };
941 if let Some(offset) = maybe_offset {
942 to.write_at(locked, current_task, offset, &mut buffer)
943 } else {
944 to.write(locked, current_task, &mut buffer)
945 }
946 }
947
948 pub fn vmsplice_from<L>(
952 &self,
953 locked: &mut Locked<L>,
954 current_task: &CurrentTask,
955 self_file: &FileHandle,
956 mut iovec: UserBuffers,
957 non_blocking: bool,
958 ) -> Result<usize, Errno>
959 where
960 L: LockEqualOrBefore<FileOpsCore>,
961 {
962 let locked = locked.cast_locked::<FileOpsCore>();
963 let locked = locked;
964 let available = UserBuffer::cap_buffers_to_max_rw_count(
965 current_task.maximum_valid_address().ok_or_else(|| errno!(EINVAL))?,
966 &mut iovec,
967 )?;
968 let mappings = current_task.mm()?.get_mappings_for_vmsplice(&iovec)?;
969
970 let mut pipe =
971 self.lock_pipe_for_writing(locked, current_task, self_file, non_blocking, available)?;
972
973 if pipe.reader_count == 0 {
974 send_standard_signal(locked, current_task, SignalInfo::default(SIGPIPE));
975 return error!(EPIPE);
976 }
977
978 let was_empty = pipe.is_empty();
979 let mut remaining = std::cmp::min(available, pipe.messages.available_capacity());
980
981 let mut bytes_transferred = 0;
982 for mut mapping in mappings.into_iter() {
983 mapping.truncate(remaining);
984 let actual = mapping.len();
985
986 pipe.messages.write_message(PipeMessageData::Vmspliced(mapping).into());
987 remaining -= actual;
988 bytes_transferred += actual;
989
990 if remaining == 0 {
991 break;
992 }
993 }
994 if bytes_transferred > 0 && was_empty {
995 pipe.notify_fd_events(FdEvents::POLLIN);
996 }
997 Ok(bytes_transferred)
998 }
999
1000 pub fn vmsplice_to<L>(
1004 &self,
1005 locked: &mut Locked<L>,
1006 current_task: &CurrentTask,
1007 self_file: &FileHandle,
1008 iovec: UserBuffers,
1009 non_blocking: bool,
1010 ) -> Result<usize, Errno>
1011 where
1012 L: LockEqualOrBefore<FileOpsCore>,
1013 {
1014 let mut pipe = self.lock_pipe_for_reading(locked, current_task, self_file, non_blocking)?;
1015
1016 let mut data = UserBuffersOutputBuffer::unified_new(current_task, iovec)?;
1017 let len = std::cmp::min(data.available(), pipe.messages.len());
1018 let mut buffer = SpliceInputBuffer { pipe: &mut pipe, len, available: len };
1019 data.write_buffer(&mut buffer)
1020 }
1021
1022 pub fn lock_pipes<'a, 'b, L>(
1028 locked: &mut Locked<L>,
1029 current_task: &CurrentTask,
1030 file_in: &'a FileHandle,
1031 file_out: &'b FileHandle,
1032 len: usize,
1033 non_blocking: bool,
1034 ) -> Result<PipeOperands<'a, 'b>, Errno>
1035 where
1036 L: LockEqualOrBefore<FileOpsCore>,
1037 {
1038 let pipe_in = file_in.downcast_file::<PipeFileObject>().ok_or_else(|| errno!(EINVAL))?;
1039 let pipe_out = file_out.downcast_file::<PipeFileObject>().ok_or_else(|| errno!(EINVAL))?;
1040
1041 let node_cmp =
1042 Arc::as_ptr(&file_in.name.entry.node).cmp(&Arc::as_ptr(&file_out.name.entry.node));
1043
1044 match node_cmp {
1045 Ordering::Equal => error!(EINVAL),
1046 Ordering::Less => {
1047 let (write, read) = pipe_in.lock_pipe_for_reading_with(
1048 locked,
1049 current_task,
1050 file_in,
1051 |locked| {
1052 pipe_out.lock_pipe_for_writing(
1053 locked,
1054 current_task,
1055 file_out,
1056 non_blocking,
1057 len,
1058 )
1059 },
1060 non_blocking,
1061 )?;
1062 Ok(PipeOperands { read, write })
1063 }
1064 Ordering::Greater => {
1065 let (read, write) = pipe_out.lock_pipe_for_writing_with(
1066 locked,
1067 current_task,
1068 file_out,
1069 |locked| {
1070 pipe_in.lock_pipe_for_reading(locked, current_task, file_in, non_blocking)
1071 },
1072 non_blocking,
1073 len,
1074 )?;
1075 Ok(PipeOperands { read, write })
1076 }
1077 }
1078 }
1079}
1080
1081pub struct PipeOperands<'a, 'b> {
1082 pub read: MutexGuard<'a, Pipe>,
1083 pub write: MutexGuard<'b, Pipe>,
1084}