1use fidl_fuchsia_starnix_runner as fstarnixrunner;
10use futures::TryStreamExt;
11use starnix_logging::{log_debug, log_error, log_warn, with_zx_name};
12use starnix_sync::{LockDepMutex, TerminalLock};
13use starnix_uapi::errors::Errno;
14use starnix_uapi::{errno, error};
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::ops::Range;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU32, Ordering};
20use zx::sys::zx_page_request_command_t::{ZX_PAGER_VMO_COMPLETE, ZX_PAGER_VMO_READ};
21
22const PAGER_THREADS: usize = 1;
25const TRANSFER_VMO_SIZE: u64 = 1 * 1024 * 1024;
26const ZERO_VMO_SIZE: u64 = 1 * 1024 * 1024;
27
28const CATEGORY_STARNIX_PAGER: &'static str = "starnix:pager";
30
31pub async fn run_pager(
32 pager_request: fstarnixrunner::ManagerCreatePagerRequest,
33 pager: Arc<Pager>,
34) {
35 let fstarnixrunner::ManagerCreatePagerRequest {
36 backing_vmo: Some(backing_vmo),
37 block_size: Some(block_size),
38 pager: Some(pager_server),
39 ..
40 } = pager_request
41 else {
42 log_error!("Invalid create pager request");
43 return;
44 };
45
46 let filesystem = Arc::new(match Filesystem::new(pager.clone(), backing_vmo, block_size) {
47 Ok(filesystem) => filesystem,
48 Err(error) => {
49 log_error!("Unable to register filesystem {error}");
50 return;
51 }
52 });
53
54 pager.add_filesystem(filesystem.clone());
55
56 let mut stream = pager_server.into_stream();
57 'outer: while let Ok(Some(event)) = stream.try_next().await {
58 match event {
59 fstarnixrunner::PagerRequest::RegisterFile {
60 payload:
61 fstarnixrunner::PagerRegisterFileRequest {
62 name: Some(name),
63 inode_num: Some(inode_num),
64 size: Some(size),
65 extents: Some(extents),
66 ..
67 },
68 responder,
69 ..
70 } => {
71 fuchsia_trace::instant!(
72 CATEGORY_STARNIX_PAGER,
73 "file_register",
74 fuchsia_trace::Scope::Thread
75 );
76 match filesystem.register(
77 &name,
78 inode_num,
79 size,
80 extents
81 .iter()
82 .map(|e| PagerExtent {
83 logical: e.logical_start..e.logical_end,
84 physical_block: e.physical_block,
85 })
86 .collect(),
87 ) {
88 Ok(vmo) => {
89 match responder.send(Ok(fstarnixrunner::PagerRegisterFileResponse {
90 vmo: Some(vmo),
91 ..Default::default()
92 })) {
93 Ok(_) => {}
94 Err(e) => {
95 log_error!("Error sending pager response {:?}", e);
96 break 'outer;
97 }
98 }
99 }
100 Err(e) => match responder.send(Err(e.into_raw())) {
101 Ok(_) => {}
102 Err(e) => {
103 log_error!("Error sending pager error response {:?}", e);
104 break 'outer;
105 }
106 },
107 };
108 }
109 fstarnixrunner::PagerRequest::RegisterFile { .. } => {
110 log_error!("Invalid RegisterFile request");
111 break 'outer;
112 }
113 _ => {}
114 }
115 }
116 pager.remove_filesystem(&*filesystem);
117}
118
119pub struct Pager {
121 pager: zx::Pager,
122 port: zx::Port,
123 zero_vmo: zx::Vmo,
124 next_filesystem_id: AtomicU32,
125 filesystems: LockDepMutex<HashMap<u32, Arc<Filesystem>>, TerminalLock>,
126}
127
128impl Pager {
129 pub fn new() -> Result<Self, Errno> {
130 Ok(Self {
131 pager: zx::Pager::create(zx::PagerOptions::empty()).map_err(|error| {
132 log_error!(error:?; "Pager::create failed");
133 errno!(EINVAL)
134 })?,
135 port: zx::Port::create(),
136 zero_vmo: with_zx_name(
137 zx::Vmo::create(ZERO_VMO_SIZE).map_err(|_| errno!(EINVAL))?,
138 b"starnix:ext4",
139 ),
140 next_filesystem_id: AtomicU32::new(1),
141 filesystems: Default::default(),
142 })
143 }
144
145 pub fn start_threads(self: &Arc<Self>) {
147 for i in 0..PAGER_THREADS {
148 let this = self.clone();
149 let _ = std::thread::Builder::new().name(format!("pager-{}", i)).spawn(move || {
150 this.run_pager_thread();
151 });
152 }
153 }
154
155 pub fn run_pager_thread(&self) {
158 let transfer_vmo = with_zx_name(
159 zx::Vmo::create(TRANSFER_VMO_SIZE).expect("unable to create transfer vmo"),
160 b"starnix:ext4",
161 );
162 let transfer_vmo_addr = fuchsia_runtime::vmar_root_self()
163 .map(
164 0,
165 &transfer_vmo,
166 0,
167 TRANSFER_VMO_SIZE as usize,
168 zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE | zx::VmarFlags::ALLOW_FAULTS,
169 )
170 .expect("unable to map transfer vmo");
171 scopeguard::defer!({
172 let _ = unsafe {
174 fuchsia_runtime::vmar_root_self()
175 .unmap(transfer_vmo_addr, TRANSFER_VMO_SIZE as usize)
176 };
177 });
178 let split_key = |key: u64| -> (u32, u32) { ((key >> 32) as u32, key as u32) };
179 loop {
180 match self.port.wait(zx::MonotonicInstant::INFINITE) {
181 Ok(packet) => {
182 match packet.contents() {
183 zx::PacketContents::Pager(contents)
184 if contents.command() == ZX_PAGER_VMO_READ =>
185 {
186 fuchsia_trace::duration!(CATEGORY_STARNIX_PAGER, "vmo_read");
187 let (filesystem_num, inode_num) = split_key(packet.key());
188 let filesystem = self
189 .filesystems
190 .lock()
191 .get(&filesystem_num)
192 .cloned()
193 .expect("Unexpected packet key");
194 filesystem.receive_pager_packet(
195 inode_num,
196 contents,
197 &transfer_vmo,
198 transfer_vmo_addr,
199 );
200 }
201 zx::PacketContents::Pager(contents)
202 if contents.command() == ZX_PAGER_VMO_COMPLETE =>
203 {
204 fuchsia_trace::duration!(CATEGORY_STARNIX_PAGER, "vmo_complete");
205 }
208 zx::PacketContents::SignalOne(signals)
209 if signals.observed().contains(zx::Signals::VMO_ZERO_CHILDREN) =>
210 {
211 fuchsia_trace::duration!(
212 CATEGORY_STARNIX_PAGER,
213 "signal_zero_children"
214 );
215 let (filesystem_num, inode_num) = split_key(packet.key());
216 let filesystem = self.filesystems.lock().get(&filesystem_num).cloned();
220 if let Some(filesystem) = filesystem {
221 filesystem.on_zero_children(inode_num).expect("on_zero_children");
222 }
223 }
224 zx::PacketContents::User(_) => break,
225 _ => log_error!("Unexpected port packet: {:?}", packet.contents()),
226 }
227 }
228 Err(error) => log_error!(error:?; "Port::wait failed"),
229 }
230 }
231 log_debug!("Pager thread terminating");
232 }
233
234 fn create_pager_vmo(&self, key: u64, size: u64) -> Result<zx::Vmo, zx::Status> {
235 self.pager.create_vmo(zx::VmoOptions::RESIZABLE, &self.port, key, size)
236 }
237
238 fn allocate_filesystem_id(&self) -> u32 {
239 self.next_filesystem_id.fetch_add(1, Ordering::Relaxed)
240 }
241
242 fn add_filesystem(&self, filesystem: Arc<Filesystem>) {
243 self.filesystems.lock().insert(filesystem.id(), filesystem);
244 }
245
246 fn remove_filesystem(&self, filesystem: &Filesystem) {
247 self.filesystems.lock().remove(&filesystem.id());
248 }
249
250 pub fn terminate(&self) {
252 let up = zx::UserPacket::from_u8_array([0; 32]);
253 let packet = zx::Packet::from_user_packet(0, 0, up);
254 for _ in 0..PAGER_THREADS {
255 self.port.queue(&packet).unwrap();
256 }
257 }
258}
259
260pub struct Filesystem {
262 pager: Arc<Pager>,
263 backing_vmo: zx::Vmo,
264 block_size: u64,
265 files_by_inode: LockDepMutex<HashMap<u32, Arc<PagedFile>>, TerminalLock>,
266 id: u32,
267}
268
269impl Filesystem {
270 pub fn new(pager: Arc<Pager>, backing_vmo: zx::Vmo, block_size: u64) -> Result<Self, Errno> {
273 if block_size > 1024 * 1024 || !block_size.is_power_of_two() {
274 return error!(EINVAL, "Bad block size {block_size}");
275 }
276 let id = pager.allocate_filesystem_id();
277 Ok(Self {
278 pager,
279 backing_vmo,
280 block_size,
281 files_by_inode: Default::default(),
282 id,
283 })
284 }
285
286 pub fn register(
288 &self,
289 name: &str,
290 inode_num: u32,
291 size: u64,
292 extents: Box<[PagerExtent]>,
293 ) -> Result<zx::Vmo, zx::Status> {
294 let (file, did_create) = {
295 match self.files_by_inode.lock().entry(inode_num) {
296 Entry::Occupied(o) => (o.get().clone(), false),
297 Entry::Vacant(v) => (
298 v.insert(Arc::new(PagedFile {
299 vmo: self
300 .pager
301 .create_pager_vmo(self.port_key_for_inode(inode_num), size)?,
302 extents,
303 }))
304 .clone(),
305 true,
306 ),
307 }
308 };
309 let child_vmo = file.vmo.create_child(zx::VmoChildOptions::REFERENCE, 0, 0);
310 if did_create {
311 let set_up_vmo = |vmo| -> Result<(), zx::Status> {
312 self.watch_for_zero_children(vmo, inode_num)?;
313 vmo.set_name(&zx::Name::new_lossy(&format!("ext4!{}", name)))?;
314 Ok(())
315 };
316
317 if let Err(e) = set_up_vmo(&file.vmo) {
318 self.files_by_inode.lock().remove(&inode_num);
319 return Err(e);
320 }
321 }
322 child_vmo
323 }
324
325 fn receive_pager_packet(
326 &self,
327 inode_num: u32,
328 contents: zx::PagerPacket,
329 transfer_vmo: &zx::Vmo,
330 transfer_vmo_addr: usize,
331 ) {
332 let Some(file) = self.files_by_inode.lock().get(&inode_num).cloned() else {
333 return;
334 };
335
336 let mut range = contents.range();
337
338 const ALIGNMENT: u64 = 128 * 1024;
340 let unaligned = (range.end - range.start) % ALIGNMENT;
341 let readahead_end =
342 if unaligned > 0 { range.end - unaligned + ALIGNMENT } else { range.end };
343
344 let start_block = (range.start / self.block_size) as u32;
345 let mut ix = file.extents.partition_point(|e| e.logical.end <= start_block);
346
347 let buf = unsafe {
350 std::slice::from_raw_parts_mut(transfer_vmo_addr as *mut u8, TRANSFER_VMO_SIZE as usize)
351 };
352
353 let mut supply_helper =
354 SupplyHelper::new(transfer_vmo, buf, &file.vmo, range.start, &*self.pager);
355
356 while ix < file.extents.len() && range.start < readahead_end {
357 let extent = &file.extents[ix];
358
359 let logical_start = extent.logical.start as u64 * self.block_size;
360
361 if range.start < logical_start {
363 if let Err(e) = supply_helper.zero(logical_start - range.start) {
364 supply_helper.fail_to(range.end, e);
365 return;
366 }
367 range.start = logical_start;
368 }
369
370 let end = std::cmp::min(extent.logical.end as u64 * self.block_size, readahead_end);
371
372 while range.start < end {
373 let phys_offset =
374 extent.physical_block * self.block_size + range.start - logical_start;
375
376 match supply_helper.fill_buf(|buf| {
377 let amount = std::cmp::min(buf.len() as u64, end - range.start) as usize;
378 self.backing_vmo.read(&mut buf[..amount], phys_offset)?;
379 Ok(amount)
380 }) {
381 Ok(amount) => {
382 let _ = self.backing_vmo.op_range(
385 zx::VmoOp::DONT_NEED,
386 phys_offset,
387 amount as u64,
388 );
389 range.start += amount as u64;
390 }
391 Err(e) => {
392 supply_helper.fail_to(range.end, e);
393 return;
394 }
395 }
396 }
397
398 ix += 1;
399 }
400
401 if let Err(e) = supply_helper.finish(range.end) {
406 supply_helper.fail_to(range.end, e);
407 }
408 }
409
410 fn watch_for_zero_children(&self, vmo: &zx::Vmo, inode_num: u32) -> Result<(), zx::Status> {
411 vmo.wait_async(
412 &self.pager.port,
413 self.port_key_for_inode(inode_num),
414 zx::Signals::VMO_ZERO_CHILDREN,
415 zx::WaitAsyncOpts::empty(),
416 )
417 }
418
419 fn on_zero_children(&self, inode_num: u32) -> Result<(), Errno> {
420 let mut files = self.files_by_inode.lock();
421 let file = files.entry(inode_num);
422 if let Entry::Occupied(o) = file {
423 let vmo = &o.get().vmo;
424 match vmo.info() {
425 Ok(info) => {
426 if info.num_children == 0 {
427 o.remove();
429 } else {
430 if let Err(error) = self.watch_for_zero_children(vmo, inode_num) {
433 log_error!(
434 error:?;
435 "watch_for_zero_children failed"
436 );
437 }
438 }
439 }
440 Err(error) => log_error!(error:?; "Vmo::info failed"),
441 }
442 }
443 Ok(())
444 }
445
446 fn port_key_for_inode(&self, inode_num: u32) -> u64 {
447 (self.id as u64) << 32 | inode_num as u64
448 }
449
450 fn id(&self) -> u32 {
451 self.id
452 }
453}
454
455struct PagedFile {
457 vmo: zx::Vmo,
459
460 extents: Box<[PagerExtent]>,
463}
464
465pub struct PagerExtent {
467 pub logical: Range<u32>,
468 pub physical_block: u64,
469}
470
471struct SupplyHelper<'a> {
473 transfer_vmo: &'a zx::Vmo,
474 buffer: &'a mut [u8],
475 target_vmo: &'a zx::Vmo,
476 offset: u64,
477 pager: &'a Pager,
478 page_size: u64,
479 buf_len: usize,
480}
481
482impl<'a> SupplyHelper<'a> {
483 fn new(
484 transfer_vmo: &'a zx::Vmo,
485 buffer: &'a mut [u8],
486 target_vmo: &'a zx::Vmo,
487 offset: u64,
488 pager: &'a Pager,
489 ) -> Self {
490 Self {
491 transfer_vmo,
492 buffer,
493 target_vmo,
494 offset,
495 pager,
496 page_size: *starnix_core::mm::PAGE_SIZE,
497 buf_len: 0,
498 }
499 }
500
501 fn zero(&mut self, mut len: u64) -> Result<(), zx::Status> {
503 let unaligned = self.buf_len as u64 % self.page_size;
504 if unaligned > 0 {
505 let amount = std::cmp::min(self.page_size - unaligned, len);
506 self.buffer[self.buf_len..self.buf_len + amount as usize].fill(0);
507 self.buf_len += amount as usize;
508 len -= amount;
509 self.supply_pages()?;
510 }
511 while len >= self.page_size {
513 let amount =
514 if len >= ZERO_VMO_SIZE { ZERO_VMO_SIZE } else { len - len % self.page_size };
515 self.pager.pager.supply_pages(
516 self.target_vmo,
517 self.offset..self.offset + amount,
518 &self.pager.zero_vmo,
519 0,
520 )?;
521 self.offset += amount;
522 len -= amount;
523 }
524 self.buffer[self.buf_len..self.buf_len + len as usize].fill(0);
526 self.buf_len += len as usize;
527 Ok(())
528 }
529
530 fn supply_pages(&mut self) -> Result<(), zx::Status> {
532 if self.buf_len as u64 >= self.page_size {
533 let len = self.buf_len - self.buf_len % self.page_size as usize;
534 self.pager.pager.supply_pages(
535 self.target_vmo,
536 self.offset..self.offset + len as u64,
537 self.transfer_vmo,
538 0,
539 )?;
540 self.buffer.copy_within(len..self.buf_len, 0);
542 self.buf_len -= len;
543 self.offset += len as u64;
544 }
545 Ok(())
546 }
547
548 fn fill_buf(
550 &mut self,
551 f: impl FnOnce(&mut [u8]) -> Result<usize, zx::Status>,
552 ) -> Result<usize, zx::Status> {
553 let amount = f(&mut self.buffer[self.buf_len..])?;
554 self.buf_len += amount;
555 self.supply_pages()?;
556 Ok(amount)
557 }
558
559 fn finish(&mut self, mut end: u64) -> Result<(), zx::Status> {
561 let byte_offset = self.offset + self.buf_len as u64;
562 end = std::cmp::max(end, byte_offset);
563 end = end + self.page_size - 1;
564 end -= end % self.page_size;
565 self.zero(end - byte_offset)
566 }
567
568 fn fail_to(&mut self, end: u64, error: zx::Status) {
570 if self.offset < end {
571 log_warn!(error:?; "Failing page-in, range: {:?}", self.offset..end);
572 match self.pager.pager.op_range(
574 zx::PagerOp::Fail(zx::Status::IO),
575 self.target_vmo,
576 self.offset..end,
577 ) {
578 Ok(()) => {}
579 Err(error) => log_error!(error:?; "Failed to report error"),
580 }
581 self.offset = end;
582 self.buf_len = 0;
583 }
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use super::{Filesystem, Pager, PagerExtent};
590
591 use std::sync::Arc;
592 use std::time::Duration;
593
594 #[::fuchsia::test]
595 async fn test_pager() {
596 let backing_vmo = zx::Vmo::create(1 * 1024 * 1024).expect("Vmo::craete failed");
597 let backing_vmo_clone =
598 backing_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).expect("failed handle dup");
599
600 let pager = Arc::new(Pager::new().expect("Pager::new failed"));
601 let filesystem = Arc::new(
602 Filesystem::new(pager.clone(), backing_vmo_clone, 1024)
603 .expect("Filesystem::new failed"),
604 );
605
606 pager.add_filesystem(filesystem.clone());
607
608 {
609 pager.start_threads();
610
611 let vmo = filesystem.register("a".into(), 1, 5, Box::new([])).expect("register failed");
613
614 let mut buf = vec![1; 5];
615 vmo.read(&mut buf, 0).expect("read failed");
616
617 assert_eq!(&buf, &[0; 5]);
618
619 let vmo = filesystem
621 .register(
622 "b".into(),
623 2,
624 5,
625 Box::new([PagerExtent { logical: 0..1, physical_block: 0 }]),
626 )
627 .expect("register failed");
628 backing_vmo.write(b"hello", 0).expect("write failed");
629 vmo.read(&mut buf, 0).expect("read failed");
630
631 assert_eq!(&buf, b"hello");
632
633 let file_size = (6 + 1 + 5 + 4) * 1024 + 100;
636 let vmo = filesystem
637 .register(
638 "c".into(),
639 3,
640 file_size,
641 Box::new([
642 PagerExtent { logical: 6..7, physical_block: 0 },
643 PagerExtent { logical: 12..13, physical_block: 1 },
644 ]),
645 )
646 .expect("register failed");
647 backing_vmo.write(b"there", 1024).expect("write failed");
648 let mut buf = vec![1; file_size as usize];
649 vmo.read(&mut buf, 0).expect("read failed");
650
651 let mut expected = vec![0; file_size as usize];
652 expected[6 * 1024..6 * 1024 + 5].copy_from_slice(b"hello");
653 expected[12 * 1024..12 * 1024 + 5].copy_from_slice(b"there");
654 assert_eq!(&buf, &expected);
655
656 let vmo = filesystem
658 .register(
659 "d".into(),
660 4,
661 file_size,
662 Box::new([
663 PagerExtent { logical: 6..7, physical_block: 0 },
664 PagerExtent { logical: 12..13, physical_block: 1 },
665 ]),
666 )
667 .expect("register failed");
668
669 let offset = 9000;
670 let mut buf = vec![1; (file_size - offset) as usize];
671 vmo.read(&mut buf, offset).expect("read failed");
672
673 assert_eq!(&buf, &expected[offset as usize..]);
674 }
675
676 loop {
678 if filesystem.files_by_inode.lock().is_empty() {
679 break;
680 }
681 std::thread::sleep(Duration::from_millis(10));
683 }
684 }
685}