1use fidl_fuchsia_starnix_runner as fstarnixrunner;
10use futures::TryStreamExt;
11use starnix_logging::{
12 log_debug, log_error, log_warn, trace_duration, trace_instant, with_zx_name,
13};
14use starnix_sync::Mutex;
15use starnix_uapi::errors::Errno;
16use starnix_uapi::{errno, error};
17use std::collections::HashMap;
18use std::collections::hash_map::Entry;
19use std::ops::Range;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU32, Ordering};
22use zx::sys::zx_page_request_command_t::{ZX_PAGER_VMO_COMPLETE, ZX_PAGER_VMO_READ};
23
24const PAGER_THREADS: usize = 1;
27const TRANSFER_VMO_SIZE: u64 = 1 * 1024 * 1024;
28const ZERO_VMO_SIZE: u64 = 1 * 1024 * 1024;
29
30const CATEGORY_STARNIX_PAGER: &'static str = "starnix:pager";
32
33pub async fn run_pager(
34 pager_request: fstarnixrunner::ManagerCreatePagerRequest,
35 pager: Arc<Pager>,
36) {
37 let fstarnixrunner::ManagerCreatePagerRequest {
38 backing_vmo: Some(backing_vmo),
39 block_size: Some(block_size),
40 pager: Some(pager_server),
41 ..
42 } = pager_request
43 else {
44 log_error!("Invalid create pager request");
45 return;
46 };
47
48 let filesystem = Arc::new(match Filesystem::new(pager.clone(), backing_vmo, block_size) {
49 Ok(filesystem) => filesystem,
50 Err(error) => {
51 log_error!("Unable to register filesystem {error}");
52 return;
53 }
54 });
55
56 pager.add_filesystem(filesystem.clone());
57
58 let mut stream = pager_server.into_stream();
59 'outer: while let Ok(Some(event)) = stream.try_next().await {
60 match event {
61 fstarnixrunner::PagerRequest::RegisterFile {
62 payload:
63 fstarnixrunner::PagerRegisterFileRequest {
64 name: Some(name),
65 inode_num: Some(inode_num),
66 size: Some(size),
67 extents: Some(extents),
68 ..
69 },
70 responder,
71 ..
72 } => {
73 trace_instant!(
74 CATEGORY_STARNIX_PAGER,
75 "file_register",
76 fuchsia_trace::Scope::Thread
77 );
78 match filesystem.register(
79 &name,
80 inode_num,
81 size,
82 extents
83 .iter()
84 .map(|e| PagerExtent {
85 logical: e.logical_start..e.logical_end,
86 physical_block: e.physical_block,
87 })
88 .collect(),
89 ) {
90 Ok(vmo) => {
91 match responder.send(Ok(fstarnixrunner::PagerRegisterFileResponse {
92 vmo: Some(vmo),
93 ..Default::default()
94 })) {
95 Ok(_) => {}
96 Err(e) => {
97 log_error!("Error sending pager response {:?}", e);
98 break 'outer;
99 }
100 }
101 }
102 Err(e) => match responder.send(Err(e.into_raw())) {
103 Ok(_) => {}
104 Err(e) => {
105 log_error!("Error sending pager error response {:?}", e);
106 break 'outer;
107 }
108 },
109 };
110 }
111 fstarnixrunner::PagerRequest::RegisterFile { .. } => {
112 log_error!("Invalid RegisterFile request");
113 break 'outer;
114 }
115 _ => {}
116 }
117 }
118 pager.remove_filesystem(&*filesystem);
119}
120
121pub struct Pager {
123 pager: zx::Pager,
124 port: zx::Port,
125 zero_vmo: zx::Vmo,
126 next_filesystem_id: AtomicU32,
127 filesystems: Mutex<HashMap<u32, Arc<Filesystem>>>,
128}
129
130impl Pager {
131 pub fn new() -> Result<Self, Errno> {
132 Ok(Self {
133 pager: zx::Pager::create(zx::PagerOptions::empty()).map_err(|error| {
134 log_error!(error:?; "Pager::create failed");
135 errno!(EINVAL)
136 })?,
137 port: zx::Port::create(),
138 zero_vmo: with_zx_name(
139 zx::Vmo::create(ZERO_VMO_SIZE).map_err(|_| errno!(EINVAL))?,
140 b"starnix:ext4",
141 ),
142 next_filesystem_id: AtomicU32::new(1),
143 filesystems: Mutex::new(HashMap::new()),
144 })
145 }
146
147 pub fn start_threads(self: &Arc<Self>) {
149 for i in 0..PAGER_THREADS {
150 let this = self.clone();
151 let _ = std::thread::Builder::new().name(format!("pager-{}", i)).spawn(move || {
152 this.run_pager_thread();
153 });
154 }
155 }
156
157 pub fn run_pager_thread(&self) {
160 let transfer_vmo = with_zx_name(
161 zx::Vmo::create(TRANSFER_VMO_SIZE).expect("unable to create transfer vmo"),
162 b"starnix:ext4",
163 );
164 let transfer_vmo_addr = fuchsia_runtime::vmar_root_self()
165 .map(
166 0,
167 &transfer_vmo,
168 0,
169 TRANSFER_VMO_SIZE as usize,
170 zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE | zx::VmarFlags::ALLOW_FAULTS,
171 )
172 .expect("unable to map transfer vmo");
173 scopeguard::defer!({
174 let _ = unsafe {
176 fuchsia_runtime::vmar_root_self()
177 .unmap(transfer_vmo_addr, TRANSFER_VMO_SIZE as usize)
178 };
179 });
180 let split_key = |key: u64| -> (u32, u32) { ((key >> 32) as u32, key as u32) };
181 loop {
182 match self.port.wait(zx::MonotonicInstant::INFINITE) {
183 Ok(packet) => {
184 match packet.contents() {
185 zx::PacketContents::Pager(contents)
186 if contents.command() == ZX_PAGER_VMO_READ =>
187 {
188 trace_duration!(CATEGORY_STARNIX_PAGER, "vmo_read");
189 let (filesystem_num, inode_num) = split_key(packet.key());
190 self.filesystems
191 .lock()
192 .get(&filesystem_num)
193 .expect("Unexpected packet key")
194 .receive_pager_packet(
195 inode_num,
196 contents,
197 &transfer_vmo,
198 transfer_vmo_addr,
199 );
200 }
201 zx::PacketContents::Pager(contents)
202 if contents.command() == ZX_PAGER_VMO_COMPLETE =>
203 {
204 trace_duration!(CATEGORY_STARNIX_PAGER, "vmo_complete");
205 }
208 zx::PacketContents::SignalOne(signals)
209 if signals.observed().contains(zx::Signals::VMO_ZERO_CHILDREN) =>
210 {
211 trace_duration!(CATEGORY_STARNIX_PAGER, "signal_zero_children");
212 let (filesystem_num, inode_num) = split_key(packet.key());
213 if let Some(filesystem) = self.filesystems.lock().get(&filesystem_num) {
217 filesystem.on_zero_children(inode_num).expect("on_zero_children");
218 }
219 }
220 zx::PacketContents::User(_) => break,
221 _ => log_error!("Unexpected port packet: {:?}", packet.contents()),
222 }
223 }
224 Err(error) => log_error!(error:?; "Port::wait failed"),
225 }
226 }
227 log_debug!("Pager thread terminating");
228 }
229
230 fn create_pager_vmo(&self, key: u64, size: u64) -> Result<zx::Vmo, zx::Status> {
231 self.pager.create_vmo(zx::VmoOptions::RESIZABLE, &self.port, key, size)
232 }
233
234 fn allocate_filesystem_id(&self) -> u32 {
235 self.next_filesystem_id.fetch_add(1, Ordering::Relaxed)
236 }
237
238 fn add_filesystem(&self, filesystem: Arc<Filesystem>) {
239 self.filesystems.lock().insert(filesystem.id(), filesystem);
240 }
241
242 fn remove_filesystem(&self, filesystem: &Filesystem) {
243 self.filesystems.lock().remove(&filesystem.id());
244 }
245
246 pub fn terminate(&self) {
248 let up = zx::UserPacket::from_u8_array([0; 32]);
249 let packet = zx::Packet::from_user_packet(0, 0, up);
250 for _ in 0..PAGER_THREADS {
251 self.port.queue(&packet).unwrap();
252 }
253 }
254}
255
256pub struct Filesystem {
258 pager: Arc<Pager>,
259 backing_vmo: zx::Vmo,
260 block_size: u64,
261 files_by_inode: Mutex<HashMap<u32, Arc<PagedFile>>>,
262 id: u32,
263}
264
265impl Filesystem {
266 pub fn new(pager: Arc<Pager>, backing_vmo: zx::Vmo, block_size: u64) -> Result<Self, Errno> {
269 if block_size > 1024 * 1024 || !block_size.is_power_of_two() {
270 return error!(EINVAL, "Bad block size {block_size}");
271 }
272 let id = pager.allocate_filesystem_id();
273 Ok(Self { pager, backing_vmo, block_size, files_by_inode: Mutex::new(HashMap::new()), id })
274 }
275
276 pub fn register(
278 &self,
279 name: &str,
280 inode_num: u32,
281 size: u64,
282 extents: Box<[PagerExtent]>,
283 ) -> Result<zx::Vmo, zx::Status> {
284 let (file, did_create) = {
285 match self.files_by_inode.lock().entry(inode_num) {
286 Entry::Occupied(o) => (o.get().clone(), false),
287 Entry::Vacant(v) => (
288 v.insert(Arc::new(PagedFile {
289 vmo: self
290 .pager
291 .create_pager_vmo(self.port_key_for_inode(inode_num), size)?,
292 extents,
293 }))
294 .clone(),
295 true,
296 ),
297 }
298 };
299 let child_vmo = file.vmo.create_child(zx::VmoChildOptions::REFERENCE, 0, 0);
300 if did_create {
301 let set_up_vmo = |vmo| -> Result<(), zx::Status> {
302 self.watch_for_zero_children(vmo, inode_num)?;
303 vmo.set_name(&zx::Name::new_lossy(&format!("ext4!{}", name)))?;
304 Ok(())
305 };
306
307 if let Err(e) = set_up_vmo(&file.vmo) {
308 self.files_by_inode.lock().remove(&inode_num);
309 return Err(e);
310 }
311 }
312 child_vmo
313 }
314
315 fn receive_pager_packet(
316 &self,
317 inode_num: u32,
318 contents: zx::PagerPacket,
319 transfer_vmo: &zx::Vmo,
320 transfer_vmo_addr: usize,
321 ) {
322 let Some(file) = self.files_by_inode.lock().get(&inode_num).cloned() else {
323 return;
324 };
325
326 let mut range = contents.range();
327
328 const ALIGNMENT: u64 = 128 * 1024;
330 let unaligned = (range.end - range.start) % ALIGNMENT;
331 let readahead_end =
332 if unaligned > 0 { range.end - unaligned + ALIGNMENT } else { range.end };
333
334 let start_block = (range.start / self.block_size) as u32;
335 let mut ix = file.extents.partition_point(|e| e.logical.end <= start_block);
336
337 let buf = unsafe {
340 std::slice::from_raw_parts_mut(transfer_vmo_addr as *mut u8, TRANSFER_VMO_SIZE as usize)
341 };
342
343 let mut supply_helper =
344 SupplyHelper::new(transfer_vmo, buf, &file.vmo, range.start, &*self.pager);
345
346 while ix < file.extents.len() && range.start < readahead_end {
347 let extent = &file.extents[ix];
348
349 let logical_start = extent.logical.start as u64 * self.block_size;
350
351 if range.start < logical_start {
353 if let Err(e) = supply_helper.zero(logical_start - range.start) {
354 supply_helper.fail_to(range.end, e);
355 return;
356 }
357 range.start = logical_start;
358 }
359
360 let end = std::cmp::min(extent.logical.end as u64 * self.block_size, readahead_end);
361
362 while range.start < end {
363 let phys_offset =
364 extent.physical_block * self.block_size + range.start - logical_start;
365
366 match supply_helper.fill_buf(|buf| {
367 let amount = std::cmp::min(buf.len() as u64, end - range.start) as usize;
368 self.backing_vmo.read(&mut buf[..amount], phys_offset)?;
369 Ok(amount)
370 }) {
371 Ok(amount) => {
372 let _ = self.backing_vmo.op_range(
375 zx::VmoOp::DONT_NEED,
376 phys_offset,
377 amount as u64,
378 );
379 range.start += amount as u64;
380 }
381 Err(e) => {
382 supply_helper.fail_to(range.end, e);
383 return;
384 }
385 }
386 }
387
388 ix += 1;
389 }
390
391 if let Err(e) = supply_helper.finish(range.end) {
396 supply_helper.fail_to(range.end, e);
397 }
398 }
399
400 fn watch_for_zero_children(&self, vmo: &zx::Vmo, inode_num: u32) -> Result<(), zx::Status> {
401 vmo.wait_async(
402 &self.pager.port,
403 self.port_key_for_inode(inode_num),
404 zx::Signals::VMO_ZERO_CHILDREN,
405 zx::WaitAsyncOpts::empty(),
406 )
407 }
408
409 fn on_zero_children(&self, inode_num: u32) -> Result<(), Errno> {
410 let mut files = self.files_by_inode.lock();
411 let file = files.entry(inode_num);
412 if let Entry::Occupied(o) = file {
413 let vmo = &o.get().vmo;
414 match vmo.info() {
415 Ok(info) => {
416 if info.num_children == 0 {
417 o.remove();
419 } else {
420 if let Err(error) = self.watch_for_zero_children(vmo, inode_num) {
423 log_error!(
424 error:?;
425 "watch_for_zero_children failed"
426 );
427 }
428 }
429 }
430 Err(error) => log_error!(error:?; "Vmo::info failed"),
431 }
432 }
433 Ok(())
434 }
435
436 fn port_key_for_inode(&self, inode_num: u32) -> u64 {
437 (self.id as u64) << 32 | inode_num as u64
438 }
439
440 fn id(&self) -> u32 {
441 self.id
442 }
443}
444
445struct PagedFile {
447 vmo: zx::Vmo,
449
450 extents: Box<[PagerExtent]>,
453}
454
455pub struct PagerExtent {
457 pub logical: Range<u32>,
458 pub physical_block: u64,
459}
460
461struct SupplyHelper<'a> {
463 transfer_vmo: &'a zx::Vmo,
464 buffer: &'a mut [u8],
465 target_vmo: &'a zx::Vmo,
466 offset: u64,
467 pager: &'a Pager,
468 page_size: u64,
469 buf_len: usize,
470}
471
472impl<'a> SupplyHelper<'a> {
473 fn new(
474 transfer_vmo: &'a zx::Vmo,
475 buffer: &'a mut [u8],
476 target_vmo: &'a zx::Vmo,
477 offset: u64,
478 pager: &'a Pager,
479 ) -> Self {
480 Self {
481 transfer_vmo,
482 buffer,
483 target_vmo,
484 offset,
485 pager,
486 page_size: *starnix_core::mm::PAGE_SIZE,
487 buf_len: 0,
488 }
489 }
490
491 fn zero(&mut self, mut len: u64) -> Result<(), zx::Status> {
493 let unaligned = self.buf_len as u64 % self.page_size;
494 if unaligned > 0 {
495 let amount = std::cmp::min(self.page_size - unaligned, len);
496 self.buffer[self.buf_len..self.buf_len + amount as usize].fill(0);
497 self.buf_len += amount as usize;
498 len -= amount;
499 self.supply_pages()?;
500 }
501 while len >= self.page_size {
503 let amount =
504 if len >= ZERO_VMO_SIZE { ZERO_VMO_SIZE } else { len - len % self.page_size };
505 self.pager.pager.supply_pages(
506 self.target_vmo,
507 self.offset..self.offset + amount,
508 &self.pager.zero_vmo,
509 0,
510 )?;
511 self.offset += amount;
512 len -= amount;
513 }
514 self.buffer[self.buf_len..self.buf_len + len as usize].fill(0);
516 self.buf_len += len as usize;
517 Ok(())
518 }
519
520 fn supply_pages(&mut self) -> Result<(), zx::Status> {
522 if self.buf_len as u64 >= self.page_size {
523 let len = self.buf_len - self.buf_len % self.page_size as usize;
524 self.pager.pager.supply_pages(
525 self.target_vmo,
526 self.offset..self.offset + len as u64,
527 self.transfer_vmo,
528 0,
529 )?;
530 self.buffer.copy_within(len..self.buf_len, 0);
532 self.buf_len -= len;
533 self.offset += len as u64;
534 }
535 Ok(())
536 }
537
538 fn fill_buf(
540 &mut self,
541 f: impl FnOnce(&mut [u8]) -> Result<usize, zx::Status>,
542 ) -> Result<usize, zx::Status> {
543 let amount = f(&mut self.buffer[self.buf_len..])?;
544 self.buf_len += amount;
545 self.supply_pages()?;
546 Ok(amount)
547 }
548
549 fn finish(&mut self, mut end: u64) -> Result<(), zx::Status> {
551 let byte_offset = self.offset + self.buf_len as u64;
552 end = std::cmp::max(end, byte_offset);
553 end = end + self.page_size - 1;
554 end -= end % self.page_size;
555 self.zero(end - byte_offset)
556 }
557
558 fn fail_to(&mut self, end: u64, error: zx::Status) {
560 if self.offset < end {
561 log_warn!(error:?; "Failing page-in, range: {:?}", self.offset..end);
562 match self.pager.pager.op_range(
564 zx::PagerOp::Fail(zx::Status::IO),
565 self.target_vmo,
566 self.offset..end,
567 ) {
568 Ok(()) => {}
569 Err(error) => log_error!(error:?; "Failed to report error"),
570 }
571 self.offset = end;
572 self.buf_len = 0;
573 }
574 }
575}
576
577#[cfg(test)]
578mod tests {
579 use super::{Filesystem, Pager, PagerExtent};
580
581 use fidl::HandleBased;
582 use std::sync::Arc;
583 use std::time::Duration;
584
585 #[::fuchsia::test]
586 async fn test_pager() {
587 let backing_vmo = zx::Vmo::create(1 * 1024 * 1024).expect("Vmo::craete failed");
588 let backing_vmo_clone =
589 backing_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).expect("failed handle dup");
590
591 let pager = Arc::new(Pager::new().expect("Pager::new failed"));
592 let filesystem = Arc::new(
593 Filesystem::new(pager.clone(), backing_vmo_clone, 1024)
594 .expect("Filesystem::new failed"),
595 );
596
597 pager.add_filesystem(filesystem.clone());
598
599 {
600 pager.start_threads();
601
602 let vmo = filesystem.register("a".into(), 1, 5, Box::new([])).expect("register failed");
604
605 let mut buf = vec![1; 5];
606 vmo.read(&mut buf, 0).expect("read failed");
607
608 assert_eq!(&buf, &[0; 5]);
609
610 let vmo = filesystem
612 .register(
613 "b".into(),
614 2,
615 5,
616 Box::new([PagerExtent { logical: 0..1, physical_block: 0 }]),
617 )
618 .expect("register failed");
619 backing_vmo.write(b"hello", 0).expect("write failed");
620 vmo.read(&mut buf, 0).expect("read failed");
621
622 assert_eq!(&buf, b"hello");
623
624 let file_size = (6 + 1 + 5 + 4) * 1024 + 100;
627 let vmo = filesystem
628 .register(
629 "c".into(),
630 3,
631 file_size,
632 Box::new([
633 PagerExtent { logical: 6..7, physical_block: 0 },
634 PagerExtent { logical: 12..13, physical_block: 1 },
635 ]),
636 )
637 .expect("register failed");
638 backing_vmo.write(b"there", 1024).expect("write failed");
639 let mut buf = vec![1; file_size as usize];
640 vmo.read(&mut buf, 0).expect("read failed");
641
642 let mut expected = vec![0; file_size as usize];
643 expected[6 * 1024..6 * 1024 + 5].copy_from_slice(b"hello");
644 expected[12 * 1024..12 * 1024 + 5].copy_from_slice(b"there");
645 assert_eq!(&buf, &expected);
646
647 let vmo = filesystem
649 .register(
650 "d".into(),
651 4,
652 file_size,
653 Box::new([
654 PagerExtent { logical: 6..7, physical_block: 0 },
655 PagerExtent { logical: 12..13, physical_block: 1 },
656 ]),
657 )
658 .expect("register failed");
659
660 let offset = 9000;
661 let mut buf = vec![1; (file_size - offset) as usize];
662 vmo.read(&mut buf, offset).expect("read failed");
663
664 assert_eq!(&buf, &expected[offset as usize..]);
665 }
666
667 loop {
669 if filesystem.files_by_inode.lock().is_empty() {
670 break;
671 }
672 std::thread::sleep(Duration::from_millis(10));
674 }
675 }
676}