1use crate::mm::{
6 MemoryAccessorExt, NumberOfElementsRead, PAGE_SIZE, TaskMemoryAccessor, read_to_vec,
7};
8use crate::security;
9use crate::signals::{SignalInfo, send_standard_signal};
10use crate::task::{CurrentTask, EventHandler, WaitCallback, WaitCanceler, WaitQueue, Waiter};
11use crate::vfs::buffers::{
12 Buffer, InputBuffer, InputBufferCallback, MessageData, MessageQueue, OutputBuffer,
13 OutputBufferCallback, PeekBufferSegmentsCallback, PipeMessageData, UserBuffersOutputBuffer,
14};
15use crate::vfs::fs_registry::FsRegistry;
16use crate::vfs::{
17 CacheMode, FileHandle, FileObject, FileObjectState, FileOps, FileSystem, FileSystemHandle,
18 FileSystemOps, FileSystemOptions, FsNodeInfo, FsStr, SpecialNode, default_fcntl, default_ioctl,
19 fileops_impl_nonseekable, fileops_impl_noop_sync,
20};
21use starnix_sync::{FileOpsCore, LockEqualOrBefore, Locked, Mutex, MutexGuard, Unlocked};
22use starnix_syscalls::{SUCCESS, SyscallArg, SyscallResult};
23use starnix_types::user_buffer::{UserBuffer, UserBuffers};
24use starnix_types::vfs::default_statfs;
25use starnix_uapi::auth::CAP_SYS_RESOURCE;
26use starnix_uapi::errors::Errno;
27use starnix_uapi::file_mode::mode;
28use starnix_uapi::open_flags::OpenFlags;
29use starnix_uapi::signals::SIGPIPE;
30use starnix_uapi::user_address::{UserAddress, UserRef};
31use starnix_uapi::vfs::FdEvents;
32use starnix_uapi::{
33 F_GETPIPE_SZ, F_SETPIPE_SZ, FIONREAD, PIPEFS_MAGIC, errno, error, statfs, uapi,
34};
35use std::cmp::Ordering;
36use std::sync::Arc;
37
38const ATOMIC_IO_BYTES: u16 = 4096;
39
40const PIPE_MAX_SIZE: usize = 1 << 31;
42
43fn round_up(value: usize, increment: usize) -> usize {
44 (value + (increment - 1)) & !(increment - 1)
45}
46
47#[derive(Debug)]
48pub struct Pipe {
49 messages: MessageQueue<PipeMessageData>,
50
51 waiters: WaitQueue,
52
53 reader_count: usize,
55
56 had_reader: bool,
58
59 writer_count: usize,
61
62 had_writer: bool,
64}
65
66pub type PipeHandle = Arc<Mutex<Pipe>>;
67
68impl Pipe {
69 pub fn new(default_pipe_capacity: usize) -> PipeHandle {
70 Arc::new(Mutex::new(Pipe {
71 messages: MessageQueue::new(default_pipe_capacity),
72 waiters: WaitQueue::default(),
73 reader_count: 0,
74 had_reader: false,
75 writer_count: 0,
76 had_writer: false,
77 }))
78 }
79
80 pub fn open(
81 locked: &mut Locked<Unlocked>,
82 current_task: &CurrentTask,
83 pipe: &Arc<Mutex<Self>>,
84 flags: OpenFlags,
85 ) -> Result<Box<dyn FileOps>, Errno> {
86 let mut events = FdEvents::empty();
87 let mut pipe_locked = pipe.lock();
88 let mut must_wait_events = FdEvents::empty();
89 if flags.can_read() {
90 if !pipe_locked.had_reader {
91 events |= FdEvents::POLLOUT;
92 }
93 pipe_locked.add_reader();
94 if !flags.contains(OpenFlags::NONBLOCK) && !flags.can_write() && !pipe_locked.had_writer
95 {
96 must_wait_events |= FdEvents::POLLIN;
97 }
98 }
99 if flags.can_write() {
100 if flags.contains(OpenFlags::NONBLOCK) && pipe_locked.reader_count == 0 {
105 assert!(!flags.can_read()); return error!(ENXIO);
107 }
108 if !pipe_locked.had_writer {
109 events |= FdEvents::POLLIN;
110 }
111 pipe_locked.add_writer();
112 if !flags.contains(OpenFlags::NONBLOCK) && !pipe_locked.had_reader {
113 must_wait_events |= FdEvents::POLLOUT;
114 }
115 }
116 if events != FdEvents::empty() {
117 pipe_locked.waiters.notify_fd_events(events);
118 }
119 let ops = PipeFileObject { pipe: Arc::clone(pipe) };
120 if must_wait_events == FdEvents::empty() {
121 return Ok(Box::new(ops));
122 }
123
124 let ops = scopeguard::guard(ops, |ops| {
126 ops.on_close(flags);
127 });
128
129 let waiter = Waiter::new();
131 loop {
132 pipe_locked.waiters.wait_async_fd_events(
133 &waiter,
134 must_wait_events,
135 WaitCallback::none(),
136 );
137 std::mem::drop(pipe_locked);
138 match waiter.wait(locked, current_task) {
139 Err(e) => {
140 return Err(e);
141 }
142 _ => {}
143 }
144 pipe_locked = pipe.lock();
145 if pipe_locked.had_writer && pipe_locked.had_reader {
146 return Ok(Box::new(scopeguard::ScopeGuard::into_inner(ops)));
147 }
148 }
149 }
150
151 pub fn add_reader(&mut self) {
153 self.reader_count += 1;
154 self.had_reader = true;
155 }
156
157 pub fn add_writer(&mut self) {
159 self.writer_count += 1;
160 self.had_writer = true;
161 }
162
163 pub fn on_close(&mut self) {
166 if self.reader_count == 0 && self.writer_count == 0 {
167 self.had_reader = false;
168 self.had_writer = false;
169 self.messages = MessageQueue::new(self.messages.capacity());
170 self.waiters = WaitQueue::default();
171 }
172 }
173
174 fn is_empty(&self) -> bool {
175 self.messages.is_empty()
176 }
177
178 fn capacity(&self) -> usize {
179 self.messages.capacity()
180 }
181
182 fn set_capacity(
183 &mut self,
184 current_task: &CurrentTask,
185 mut requested_capacity: usize,
186 ) -> Result<(), Errno> {
187 if requested_capacity > PIPE_MAX_SIZE {
188 return error!(EINVAL);
189 }
190 if requested_capacity
191 > current_task
192 .kernel()
193 .system_limits
194 .pipe_max_size
195 .load(std::sync::atomic::Ordering::Relaxed)
196 {
197 security::check_task_capable(current_task, CAP_SYS_RESOURCE)?;
198 }
199 let page_size = *PAGE_SIZE as usize;
200 if requested_capacity < page_size {
201 requested_capacity = page_size;
202 }
203 requested_capacity = round_up(requested_capacity, page_size);
204 self.messages.set_capacity(requested_capacity)
205 }
206
207 fn is_readable(&self) -> bool {
208 !self.is_empty() || (self.writer_count == 0 && self.had_writer)
209 }
210
211 fn is_writable(&self, data_size: usize) -> bool {
213 let available_capacity = self.messages.available_capacity();
214 self.had_reader
217 && (available_capacity >= data_size
218 || (available_capacity > 0 && data_size > uapi::PIPE_BUF as usize))
219 }
220
221 pub fn read(&mut self, data: &mut dyn OutputBuffer) -> Result<usize, Errno> {
222 if !self.is_readable() {
231 return error!(EAGAIN);
232 }
233
234 self.messages.read_stream(data).map(|info| info.bytes_read)
235 }
236
237 pub fn write(
238 &mut self,
239 locked: &mut Locked<FileOpsCore>,
240 current_task: &CurrentTask,
241 data: &mut dyn InputBuffer,
242 ) -> Result<usize, Errno> {
243 if !self.had_reader {
244 return error!(EAGAIN);
245 }
246
247 if self.reader_count == 0 {
248 send_standard_signal(locked, current_task, SignalInfo::kernel(SIGPIPE));
249 return error!(EPIPE);
250 }
251
252 if !self.is_writable(data.available()) {
253 return error!(EAGAIN);
254 }
255
256 self.messages.write_stream(data, None, &mut vec![])
257 }
258
259 fn query_events(&self, flags: OpenFlags) -> FdEvents {
260 let mut events = FdEvents::empty();
261
262 if flags.can_read() && self.is_readable() {
263 let writer_closed = self.writer_count == 0 && self.had_writer;
264 let has_data = !self.is_empty();
265 if writer_closed {
266 events |= FdEvents::POLLHUP;
267 }
268 if !writer_closed || has_data {
269 events |= FdEvents::POLLIN;
270 }
271 }
272
273 if flags.can_write() && self.is_writable(1) {
274 if self.reader_count == 0 && self.had_reader {
275 events |= FdEvents::POLLERR;
276 }
277
278 events |= FdEvents::POLLOUT;
279 }
280
281 events
282 }
283
284 fn fcntl(
285 &mut self,
286 _file: &FileObject,
287 current_task: &CurrentTask,
288 cmd: u32,
289 arg: u64,
290 ) -> Result<SyscallResult, Errno> {
291 match cmd {
292 F_GETPIPE_SZ => Ok(self.capacity().into()),
293 F_SETPIPE_SZ => {
294 self.set_capacity(current_task, arg as usize)?;
295 Ok(self.capacity().into())
296 }
297 _ => default_fcntl(cmd),
298 }
299 }
300
301 fn ioctl(
302 &self,
303 file: &FileObject,
304 locked: &mut Locked<Unlocked>,
305 current_task: &CurrentTask,
306 request: u32,
307 arg: SyscallArg,
308 ) -> Result<SyscallResult, Errno> {
309 let user_addr = UserAddress::from(arg);
310 match request {
311 FIONREAD => {
312 let addr = UserRef::<i32>::new(user_addr);
313 let value: i32 = self.messages.len().try_into().map_err(|_| errno!(EINVAL))?;
314 current_task.write_object(addr, &value)?;
315 Ok(SUCCESS)
316 }
317 _ => default_ioctl(file, locked, current_task, request, arg),
318 }
319 }
320
321 fn notify_fd_events(&self, events: FdEvents) {
322 self.waiters.notify_fd_events(events);
323 }
324
325 pub fn splice(from: &mut Pipe, to: &mut Pipe, len: usize) -> Result<usize, Errno> {
327 if len == 0 {
328 return Ok(0);
329 }
330 let to_was_empty = to.is_empty();
331 let mut bytes_transferred = 0;
332 loop {
333 let limit = std::cmp::min(len - bytes_transferred, to.messages.available_capacity());
334 if limit == 0 {
335 break;
337 }
338 let Some(mut message) = from.messages.read_message() else {
339 break;
341 };
342 if let Some(data) = MessageData::split_off(&mut message.data, limit) {
343 assert!(data.len() > 0);
345 from.messages.write_front(data.into());
346 }
347 bytes_transferred += message.len();
348 to.messages.write_message(message);
349 }
350 if bytes_transferred > 0 {
351 if from.is_empty() {
352 from.notify_fd_events(FdEvents::POLLOUT);
353 }
354 if to_was_empty {
355 to.notify_fd_events(FdEvents::POLLIN);
356 }
357 }
358 return Ok(bytes_transferred);
359 }
360
361 pub fn tee(from: &mut Pipe, to: &mut Pipe, len: usize) -> Result<usize, Errno> {
363 if len == 0 {
364 return Ok(0);
365 }
366 let to_was_empty = to.is_empty();
367 let mut bytes_transferred = 0;
368 for message in from.messages.peek_queue().iter() {
369 let limit = std::cmp::min(len - bytes_transferred, to.messages.available_capacity());
370 if limit == 0 {
371 break;
372 }
373 let message = message.clone_at_most(limit);
374 bytes_transferred += message.len();
375 to.messages.write_message(message);
376 }
377 if bytes_transferred > 0 && to_was_empty {
378 to.notify_fd_events(FdEvents::POLLIN);
379 }
380 return Ok(bytes_transferred);
381 }
382}
383
384pub fn new_pipe(
390 locked: &mut Locked<Unlocked>,
391 current_task: &CurrentTask,
392) -> Result<(FileHandle, FileHandle), Errno> {
393 let fs = current_task
394 .kernel()
395 .expando
396 .get::<FsRegistry>()
397 .create(locked, current_task, "pipefs".into(), FileSystemOptions::default())
398 .ok_or_else(|| errno!(EINVAL))??;
399 let mut info = FsNodeInfo::new(mode!(IFIFO, 0o600), current_task.current_fscred());
400 info.blksize = ATOMIC_IO_BYTES.into();
401 let node = fs.create_node_and_allocate_node_id(SpecialNode, info);
402 let pipe = node.fifo(current_task);
403 {
404 let mut state = pipe.lock();
405 state.add_reader();
406 state.add_writer();
407 }
408
409 let read_ops = PipeFileObject { pipe: Arc::clone(pipe) };
411 let read_file = FileObject::new_anonymous(
412 locked,
413 current_task,
414 Box::new(read_ops),
415 Arc::clone(&node),
416 OpenFlags::RDONLY,
417 );
418
419 let write_ops = PipeFileObject { pipe: Arc::clone(pipe) };
421 let write_file = FileObject::new(
422 locked,
423 current_task,
424 Box::new(write_ops),
425 read_file.name.to_passive(),
426 OpenFlags::WRONLY,
427 )?;
428
429 Ok((read_file, write_file))
430}
431
432struct PipeFs;
433impl FileSystemOps for PipeFs {
434 fn statfs(
435 &self,
436 _locked: &mut Locked<FileOpsCore>,
437 _fs: &FileSystem,
438 _current_task: &CurrentTask,
439 ) -> Result<statfs, Errno> {
440 Ok(default_statfs(PIPEFS_MAGIC))
441 }
442 fn name(&self) -> &'static FsStr {
443 "pipefs".into()
444 }
445}
446
447fn pipe_fs(
448 locked: &mut Locked<Unlocked>,
449 current_task: &CurrentTask,
450 _options: FileSystemOptions,
451) -> Result<FileSystemHandle, Errno> {
452 struct PipeFsHandle(FileSystemHandle);
453
454 let kernel = current_task.kernel();
455 Ok(kernel
456 .expando
457 .get_or_init(|| {
458 PipeFsHandle(
459 FileSystem::new(
460 locked,
461 kernel,
462 CacheMode::Uncached,
463 PipeFs,
464 FileSystemOptions::default(),
465 )
466 .expect("pipefs constructed with valid options"),
467 )
468 })
469 .0
470 .clone())
471}
472
473pub fn register_pipe_fs(fs_registry: &FsRegistry) {
474 fs_registry.register("pipefs".into(), pipe_fs);
475}
476
477pub struct PipeFileObject {
478 pipe: Arc<Mutex<Pipe>>,
479}
480
481impl FileOps for PipeFileObject {
482 fileops_impl_nonseekable!();
483 fileops_impl_noop_sync!();
484
485 fn close(
486 self: Box<Self>,
487 _locked: &mut Locked<FileOpsCore>,
488 file: &FileObjectState,
489 _current_task: &CurrentTask,
490 ) {
491 self.on_close(file.flags());
492 }
493
494 fn read(
495 &self,
496 locked: &mut Locked<FileOpsCore>,
497 file: &FileObject,
498 current_task: &CurrentTask,
499 offset: usize,
500 data: &mut dyn OutputBuffer,
501 ) -> Result<usize, Errno> {
502 debug_assert!(offset == 0);
503 file.blocking_op(locked, current_task, FdEvents::POLLIN | FdEvents::POLLHUP, None, |_| {
504 let mut pipe = self.pipe.lock();
505 let actual = pipe.read(data)?;
506 if actual > 0 && pipe.is_empty() {
507 pipe.notify_fd_events(FdEvents::POLLOUT);
508 }
509 Ok(actual)
510 })
511 }
512
513 fn write(
514 &self,
515 locked: &mut Locked<FileOpsCore>,
516 file: &FileObject,
517 current_task: &CurrentTask,
518 offset: usize,
519 data: &mut dyn InputBuffer,
520 ) -> Result<usize, Errno> {
521 debug_assert!(offset == 0);
522 debug_assert!(data.bytes_read() == 0);
523
524 let result = file.blocking_op(locked, current_task, FdEvents::POLLOUT, None, |locked| {
525 let mut pipe = self.pipe.lock();
526 let was_empty = pipe.is_empty();
527 let offset_before = data.bytes_read();
528 let bytes_written = pipe.write(locked, current_task, data)?;
529 debug_assert!(data.bytes_read() - offset_before == bytes_written);
530 if bytes_written > 0 && was_empty {
531 pipe.notify_fd_events(FdEvents::POLLIN);
532 }
533 if data.available() > 0 {
534 return error!(EAGAIN);
535 }
536 Ok(())
537 });
538
539 let bytes_written = data.bytes_read();
540 if bytes_written == 0 {
541 result?;
544 }
545 Ok(bytes_written)
546 }
547
548 fn wait_async(
549 &self,
550 _locked: &mut Locked<FileOpsCore>,
551 file: &FileObject,
552 _current_task: &CurrentTask,
553 waiter: &Waiter,
554 mut events: FdEvents,
555 handler: EventHandler,
556 ) -> Option<WaitCanceler> {
557 let flags = file.flags();
558 if !flags.can_read() {
559 events.remove(FdEvents::POLLIN);
560 }
561 if !flags.can_write() {
562 events.remove(FdEvents::POLLOUT);
563 }
564 Some(self.pipe.lock().waiters.wait_async_fd_events(waiter, events, handler))
565 }
566
567 fn query_events(
568 &self,
569 _locked: &mut Locked<FileOpsCore>,
570 file: &FileObject,
571 _current_task: &CurrentTask,
572 ) -> Result<FdEvents, Errno> {
573 Ok(self.pipe.lock().query_events(file.flags()))
574 }
575
576 fn fcntl(
577 &self,
578 file: &FileObject,
579 current_task: &CurrentTask,
580 cmd: u32,
581 arg: u64,
582 ) -> Result<SyscallResult, Errno> {
583 self.pipe.lock().fcntl(file, current_task, cmd, arg)
584 }
585
586 fn ioctl(
587 &self,
588 locked: &mut Locked<Unlocked>,
589 file: &FileObject,
590 current_task: &CurrentTask,
591 request: u32,
592 arg: SyscallArg,
593 ) -> Result<SyscallResult, Errno> {
594 self.pipe.lock().ioctl(file, locked, current_task, request, arg)
595 }
596}
597
598#[derive(Debug)]
600struct SpliceOutputBuffer<'a> {
601 pipe: &'a mut Pipe,
602 len: usize,
603 available: usize,
604}
605
606impl<'a> Buffer for SpliceOutputBuffer<'a> {
607 fn segments_count(&self) -> Result<usize, Errno> {
608 error!(ENOTSUP)
609 }
610
611 fn peek_each_segment(
612 &mut self,
613 _callback: &mut PeekBufferSegmentsCallback<'_>,
614 ) -> Result<(), Errno> {
615 error!(ENOTSUP)
616 }
617}
618
619impl<'a> OutputBuffer for SpliceOutputBuffer<'a> {
620 fn write_each(&mut self, callback: &mut OutputBufferCallback<'_>) -> Result<usize, Errno> {
621 let bytes = unsafe {
623 read_to_vec::<u8, _>(self.available, |buf| callback(buf).map(NumberOfElementsRead))
624 }?;
625 let bytes_len = bytes.len();
626 if bytes_len > 0 {
627 let was_empty = self.pipe.is_empty();
628 self.pipe.messages.write_message(PipeMessageData::from(bytes).into());
629 if was_empty {
630 self.pipe.notify_fd_events(FdEvents::POLLIN);
631 }
632 self.available -= bytes_len;
633 }
634 Ok(bytes_len)
635 }
636
637 fn available(&self) -> usize {
638 self.available
639 }
640
641 fn bytes_written(&self) -> usize {
642 self.len - self.available
643 }
644
645 fn zero(&mut self) -> Result<usize, Errno> {
646 let bytes = vec![0; self.available];
647 let len = bytes.len();
648 if len > 0 {
649 let was_empty = self.pipe.is_empty();
650 self.pipe.messages.write_message(PipeMessageData::from(bytes).into());
651 if was_empty {
652 self.pipe.notify_fd_events(FdEvents::POLLIN);
653 }
654 self.available -= len;
655 }
656 Ok(len)
657 }
658
659 unsafe fn advance(&mut self, _length: usize) -> Result<(), Errno> {
660 error!(ENOTSUP)
661 }
662}
663
664#[derive(Debug)]
666struct SpliceInputBuffer<'a> {
667 pipe: &'a mut Pipe,
668 len: usize,
669 available: usize,
670}
671
672impl<'a> Buffer for SpliceInputBuffer<'a> {
673 fn segments_count(&self) -> Result<usize, Errno> {
674 Ok(self.pipe.messages.len())
675 }
676
677 fn peek_each_segment(
678 &mut self,
679 callback: &mut PeekBufferSegmentsCallback<'_>,
680 ) -> Result<(), Errno> {
681 let mut available = self.available;
682 for message in self.pipe.messages.messages() {
683 let to_read = std::cmp::min(available, message.len());
684 callback(&UserBuffer {
685 address: UserAddress::from(message.data.ptr()? as u64),
686 length: to_read,
687 });
688 available -= to_read;
689 }
690 Ok(())
691 }
692}
693
694impl<'a> InputBuffer for SpliceInputBuffer<'a> {
695 fn peek_each(&mut self, callback: &mut InputBufferCallback<'_>) -> Result<usize, Errno> {
696 let mut read = 0;
697 let mut available = self.available;
698 for message in self.pipe.messages.messages() {
699 let to_read = std::cmp::min(available, message.len());
700 let result = message.data.with_bytes(|bytes| callback(&bytes[0..to_read]))?;
701 if result > to_read {
702 return error!(EINVAL);
703 }
704 read += result;
705 available -= result;
706 if result != to_read {
707 break;
708 }
709 }
710 Ok(read)
711 }
712
713 fn available(&self) -> usize {
714 self.available
715 }
716
717 fn bytes_read(&self) -> usize {
718 self.len - self.available
719 }
720
721 fn drain(&mut self) -> usize {
722 let result = self.available;
723 self.available = 0;
724 result
725 }
726
727 fn advance(&mut self, mut length: usize) -> Result<(), Errno> {
728 if length == 0 {
729 return Ok(());
730 }
731 if length > self.available {
732 return error!(EINVAL);
733 }
734 self.available -= length;
735 while let Some(mut message) = self.pipe.messages.read_message() {
736 if let Some(data) = MessageData::split_off(&mut message.data, length) {
737 self.pipe.messages.write_front(data.into());
739 }
740 length -= message.len();
741 if length == 0 {
742 if self.pipe.is_empty() {
743 self.pipe.notify_fd_events(FdEvents::POLLOUT);
744 }
745 return Ok(());
746 }
747 }
748 panic!();
749 }
750}
751
752impl PipeFileObject {
753 fn on_close(&self, flags: OpenFlags) {
755 let mut events = FdEvents::empty();
756 let mut pipe = self.pipe.lock();
757 if flags.can_read() {
758 assert!(pipe.reader_count > 0);
759 pipe.reader_count -= 1;
760 if pipe.reader_count == 0 {
761 events |= FdEvents::POLLOUT | FdEvents::POLLERR;
762 }
763 }
764 if flags.can_write() {
765 assert!(pipe.writer_count > 0);
766 pipe.writer_count -= 1;
767 if pipe.writer_count == 0 {
768 if pipe.reader_count > 0 {
769 events |= FdEvents::POLLHUP;
770 }
771 if !pipe.is_empty() {
772 events |= FdEvents::POLLIN;
773 }
774 }
775 }
776 if events != FdEvents::empty() {
777 pipe.waiters.notify_fd_events(events);
778 }
779 pipe.on_close();
780 }
781
782 fn wait_for_condition<'a, L, F, G, V>(
788 &'a self,
789 locked: &mut Locked<L>,
790 current_task: &CurrentTask,
791 file: &FileHandle,
792 condition: F,
793 pregen: G,
794 events: FdEvents,
795 ) -> Result<(V, MutexGuard<'a, Pipe>), Errno>
796 where
797 L: LockEqualOrBefore<FileOpsCore>,
798 F: Fn(&Pipe) -> bool,
799 G: Fn(&mut Locked<L>) -> Result<V, Errno>,
800 {
801 file.blocking_op(locked, current_task, events, None, |locked| {
802 let other = pregen(locked)?;
803 let pipe = self.pipe.lock();
804 if condition(&pipe) { Ok((other, pipe)) } else { error!(EAGAIN) }
805 })
806 }
807
808 fn lock_pipe_for_reading_with<'a, L, G, V>(
810 &'a self,
811 locked: &mut Locked<L>,
812 current_task: &CurrentTask,
813 file: &FileHandle,
814 pregen: G,
815 non_blocking: bool,
816 ) -> Result<(V, MutexGuard<'a, Pipe>), Errno>
817 where
818 L: LockEqualOrBefore<FileOpsCore>,
819 G: Fn(&mut Locked<L>) -> Result<V, Errno>,
820 {
821 if non_blocking {
822 let other = pregen(locked)?;
823 let pipe = self.pipe.lock();
824 if !pipe.is_readable() {
825 return error!(EAGAIN);
826 }
827 Ok((other, pipe))
828 } else {
829 self.wait_for_condition(
830 locked,
831 current_task,
832 file,
833 |pipe| pipe.is_readable(),
834 pregen,
835 FdEvents::POLLIN | FdEvents::POLLHUP,
836 )
837 }
838 }
839
840 fn lock_pipe_for_reading<'a, L>(
841 &'a self,
842 locked: &mut Locked<L>,
843 current_task: &CurrentTask,
844 file: &FileHandle,
845 non_blocking: bool,
846 ) -> Result<MutexGuard<'a, Pipe>, Errno>
847 where
848 L: LockEqualOrBefore<FileOpsCore>,
849 {
850 self.lock_pipe_for_reading_with(locked, current_task, file, |_| Ok(()), non_blocking)
851 .map(|(_, l)| l)
852 }
853
854 fn lock_pipe_for_writing_with<'a, L, G, V>(
856 &'a self,
857 locked: &mut Locked<L>,
858 current_task: &CurrentTask,
859 file: &FileHandle,
860 pregen: G,
861 non_blocking: bool,
862 len: usize,
863 ) -> Result<(V, MutexGuard<'a, Pipe>), Errno>
864 where
865 L: LockEqualOrBefore<FileOpsCore>,
866 G: Fn(&mut Locked<L>) -> Result<V, Errno>,
867 {
868 if non_blocking {
869 let other = pregen(locked)?;
870 let pipe = self.pipe.lock();
871 if !pipe.is_writable(len) {
872 return error!(EAGAIN);
873 }
874 Ok((other, pipe))
875 } else {
876 self.wait_for_condition(
877 locked,
878 current_task,
879 file,
880 |pipe| pipe.is_writable(len),
881 pregen,
882 FdEvents::POLLOUT,
883 )
884 }
885 }
886
887 fn lock_pipe_for_writing<'a, L>(
888 &'a self,
889 locked: &mut Locked<L>,
890 current_task: &CurrentTask,
891 file: &FileHandle,
892 non_blocking: bool,
893 len: usize,
894 ) -> Result<MutexGuard<'a, Pipe>, Errno>
895 where
896 L: LockEqualOrBefore<FileOpsCore>,
897 {
898 self.lock_pipe_for_writing_with(locked, current_task, file, |_| Ok(()), non_blocking, len)
899 .map(|(_, l)| l)
900 }
901
902 pub fn splice_from<L>(
907 &self,
908 locked: &mut Locked<L>,
909 current_task: &CurrentTask,
910 self_file: &FileHandle,
911 from: &FileHandle,
912 maybe_offset: Option<usize>,
913 len: usize,
914 non_blocking: bool,
915 ) -> Result<usize, Errno>
916 where
917 L: LockEqualOrBefore<FileOpsCore>,
918 {
919 assert!(from.downcast_file::<PipeFileObject>().is_none());
921
922 let mut pipe =
923 self.lock_pipe_for_writing(locked, current_task, self_file, non_blocking, len)?;
924 let len = std::cmp::min(len, pipe.messages.available_capacity());
925 let mut buffer = SpliceOutputBuffer { pipe: &mut pipe, len, available: len };
926 if let Some(offset) = maybe_offset {
927 from.read_at(locked, current_task, offset, &mut buffer)
928 } else {
929 from.read(locked, current_task, &mut buffer)
930 }
931 }
932
933 pub fn splice_to<L>(
938 &self,
939 locked: &mut Locked<L>,
940 current_task: &CurrentTask,
941 self_file: &FileHandle,
942 to: &FileHandle,
943 maybe_offset: Option<usize>,
944 len: usize,
945 non_blocking: bool,
946 ) -> Result<usize, Errno>
947 where
948 L: LockEqualOrBefore<FileOpsCore>,
949 {
950 assert!(to.downcast_file::<PipeFileObject>().is_none());
952
953 let mut pipe = self.lock_pipe_for_reading(locked, current_task, self_file, non_blocking)?;
954 let len = std::cmp::min(len, pipe.messages.len());
955 let mut buffer = SpliceInputBuffer { pipe: &mut pipe, len, available: len };
956 if let Some(offset) = maybe_offset {
957 to.write_at(locked, current_task, offset, &mut buffer)
958 } else {
959 to.write(locked, current_task, &mut buffer)
960 }
961 }
962
963 pub fn vmsplice_from<L>(
967 &self,
968 locked: &mut Locked<L>,
969 current_task: &CurrentTask,
970 self_file: &FileHandle,
971 mut iovec: UserBuffers,
972 non_blocking: bool,
973 ) -> Result<usize, Errno>
974 where
975 L: LockEqualOrBefore<FileOpsCore>,
976 {
977 let locked = locked.cast_locked::<FileOpsCore>();
978 let locked = locked;
979 let available = UserBuffer::cap_buffers_to_max_rw_count(
980 current_task.maximum_valid_address().ok_or_else(|| errno!(EINVAL))?,
981 &mut iovec,
982 )?;
983 let mappings = current_task.mm()?.get_mappings_for_vmsplice(&iovec)?;
984
985 let mut pipe =
986 self.lock_pipe_for_writing(locked, current_task, self_file, non_blocking, available)?;
987
988 if pipe.reader_count == 0 {
989 send_standard_signal(locked, current_task, SignalInfo::kernel(SIGPIPE));
990 return error!(EPIPE);
991 }
992
993 let was_empty = pipe.is_empty();
994 let mut remaining = std::cmp::min(available, pipe.messages.available_capacity());
995
996 let mut bytes_transferred = 0;
997 for mut mapping in mappings.into_iter() {
998 mapping.truncate(remaining);
999 let actual = mapping.len();
1000
1001 pipe.messages.write_message(PipeMessageData::Vmspliced(mapping).into());
1002 remaining -= actual;
1003 bytes_transferred += actual;
1004
1005 if remaining == 0 {
1006 break;
1007 }
1008 }
1009 if bytes_transferred > 0 && was_empty {
1010 pipe.notify_fd_events(FdEvents::POLLIN);
1011 }
1012 Ok(bytes_transferred)
1013 }
1014
1015 pub fn vmsplice_to<L>(
1019 &self,
1020 locked: &mut Locked<L>,
1021 current_task: &CurrentTask,
1022 self_file: &FileHandle,
1023 iovec: UserBuffers,
1024 non_blocking: bool,
1025 ) -> Result<usize, Errno>
1026 where
1027 L: LockEqualOrBefore<FileOpsCore>,
1028 {
1029 let mut pipe = self.lock_pipe_for_reading(locked, current_task, self_file, non_blocking)?;
1030
1031 let mut data = UserBuffersOutputBuffer::unified_new(current_task, iovec)?;
1032 let len = std::cmp::min(data.available(), pipe.messages.len());
1033 let mut buffer = SpliceInputBuffer { pipe: &mut pipe, len, available: len };
1034 data.write_buffer(&mut buffer)
1035 }
1036
1037 pub fn lock_pipes<'a, 'b, L>(
1043 locked: &mut Locked<L>,
1044 current_task: &CurrentTask,
1045 file_in: &'a FileHandle,
1046 file_out: &'b FileHandle,
1047 len: usize,
1048 non_blocking: bool,
1049 ) -> Result<PipeOperands<'a, 'b>, Errno>
1050 where
1051 L: LockEqualOrBefore<FileOpsCore>,
1052 {
1053 let pipe_in = file_in.downcast_file::<PipeFileObject>().ok_or_else(|| errno!(EINVAL))?;
1054 let pipe_out = file_out.downcast_file::<PipeFileObject>().ok_or_else(|| errno!(EINVAL))?;
1055
1056 let node_cmp =
1057 Arc::as_ptr(&file_in.name.entry.node).cmp(&Arc::as_ptr(&file_out.name.entry.node));
1058
1059 match node_cmp {
1060 Ordering::Equal => error!(EINVAL),
1061 Ordering::Less => {
1062 let (write, read) = pipe_in.lock_pipe_for_reading_with(
1063 locked,
1064 current_task,
1065 file_in,
1066 |locked| {
1067 pipe_out.lock_pipe_for_writing(
1068 locked,
1069 current_task,
1070 file_out,
1071 non_blocking,
1072 len,
1073 )
1074 },
1075 non_blocking,
1076 )?;
1077 Ok(PipeOperands { read, write })
1078 }
1079 Ordering::Greater => {
1080 let (read, write) = pipe_out.lock_pipe_for_writing_with(
1081 locked,
1082 current_task,
1083 file_out,
1084 |locked| {
1085 pipe_in.lock_pipe_for_reading(locked, current_task, file_in, non_blocking)
1086 },
1087 non_blocking,
1088 len,
1089 )?;
1090 Ok(PipeOperands { read, write })
1091 }
1092 }
1093 }
1094}
1095
1096pub struct PipeOperands<'a, 'b> {
1097 pub read: MutexGuard<'a, Pipe>,
1098 pub write: MutexGuard<'b, Pipe>,
1099}