1use fidl_fuchsia_starnix_runner as fstarnixrunner;
10use futures::TryStreamExt;
11use starnix_logging::{
12 log_debug, log_error, log_warn, trace_duration, trace_instant, with_zx_name,
13};
14use starnix_sync::Mutex;
15use starnix_uapi::errors::Errno;
16use starnix_uapi::{errno, error};
17use std::collections::HashMap;
18use std::collections::hash_map::Entry;
19use std::ops::Range;
20use std::sync::Arc;
21use zx::sys::zx_page_request_command_t::{ZX_PAGER_VMO_COMPLETE, ZX_PAGER_VMO_READ};
22
23const PAGER_THREADS: usize = 1;
26const TRANSFER_VMO_SIZE: u64 = 1 * 1024 * 1024;
27const ZERO_VMO_SIZE: u64 = 1 * 1024 * 1024;
28
29const CATEGORY_STARNIX_PAGER: &'static str = "starnix:pager";
31
32pub async fn run_pager(pager_request: fstarnixrunner::ManagerCreatePagerRequest) {
33 let fstarnixrunner::ManagerCreatePagerRequest {
34 backing_vmo: Some(backing_vmo),
35 block_size: Some(block_size),
36 pager: Some(pager_server),
37 ..
38 } = pager_request
39 else {
40 log_error!("Invalid create pager request");
41 return;
42 };
43
44 let Ok(pager) = Pager::new(backing_vmo, block_size) else {
45 log_error!("Failed to serve pager instance");
46 return;
47 };
48 let pager = Arc::new(pager);
49 pager.start_threads();
50
51 let mut stream = pager_server.into_stream();
52 'outer: while let Ok(Some(event)) = stream.try_next().await {
53 match event {
54 fstarnixrunner::PagerRequest::RegisterFile {
55 payload:
56 fstarnixrunner::PagerRegisterFileRequest {
57 name: Some(name),
58 inode_num: Some(inode_num),
59 size: Some(size),
60 extents: Some(extents),
61 ..
62 },
63 responder,
64 ..
65 } => {
66 trace_instant!(
67 CATEGORY_STARNIX_PAGER,
68 "file_register",
69 fuchsia_trace::Scope::Thread
70 );
71 match pager.register(
72 &name,
73 inode_num,
74 size,
75 extents
76 .iter()
77 .map(|e| PagerExtent {
78 logical: e.logical_start..e.logical_end,
79 physical_block: e.physical_block,
80 })
81 .collect(),
82 ) {
83 Ok(vmo) => {
84 match responder.send(Ok(fstarnixrunner::PagerRegisterFileResponse {
85 vmo: Some(vmo),
86 ..Default::default()
87 })) {
88 Ok(_) => {}
89 Err(e) => {
90 log_error!("Error sending pager response {:?}", e);
91 break 'outer;
92 }
93 }
94 }
95 Err(e) => match responder.send(Err(e.into_raw())) {
96 Ok(_) => {}
97 Err(e) => {
98 log_error!("Error sending pager error response {:?}", e);
99 break 'outer;
100 }
101 },
102 };
103 }
104 fstarnixrunner::PagerRequest::RegisterFile { .. } => {
105 log_error!("Invalid RegisterFile request");
106 break 'outer;
107 }
108 _ => {}
109 }
110 }
111}
112
113pub struct Pager {
115 backing_vmo: zx::Vmo,
116 block_size: u64,
117 pager: zx::Pager,
118 port: zx::Port,
119 files_by_inode: Mutex<HashMap<u32, Arc<PagedFile>>>,
120 zero_vmo: zx::Vmo,
121}
122
123impl Pager {
124 pub fn new(backing_vmo: zx::Vmo, block_size: u64) -> Result<Self, Errno> {
127 if block_size > 1024 * 1024 || !block_size.is_power_of_two() {
128 return error!(EINVAL, "Bad block size {block_size}");
129 }
130 Ok(Self {
131 backing_vmo,
132 block_size,
133 pager: zx::Pager::create(zx::PagerOptions::empty()).map_err(|error| {
134 log_error!(error:?; "Pager::create failed");
135 errno!(EINVAL)
136 })?,
137 port: zx::Port::create(),
138 files_by_inode: Mutex::new(HashMap::new()),
139 zero_vmo: with_zx_name(
140 zx::Vmo::create(ZERO_VMO_SIZE).map_err(|_| errno!(EINVAL))?,
141 b"starnix:ext4",
142 ),
143 })
144 }
145
146 pub fn start_threads(self: &Arc<Self>) {
148 for i in 0..PAGER_THREADS {
149 let this = self.clone();
150 let _ = std::thread::Builder::new().name(format!("pager-{}", i)).spawn(move || {
151 this.run_pager_thread();
152 });
153 }
154 }
155
156 pub fn register(
158 &self,
159 name: &str,
160 inode_num: u32,
161 size: u64,
162 extents: Box<[PagerExtent]>,
163 ) -> Result<zx::Vmo, zx::Status> {
164 let (file, did_create) = {
165 match self.files_by_inode.lock().entry(inode_num) {
166 Entry::Occupied(o) => (o.get().clone(), false),
167 Entry::Vacant(v) => (
168 v.insert(Arc::new(PagedFile {
169 vmo: self.pager.create_vmo(
170 zx::VmoOptions::RESIZABLE,
171 &self.port,
172 inode_num as u64,
173 size,
174 )?,
175 extents,
176 }))
177 .clone(),
178 true,
179 ),
180 }
181 };
182 let child_vmo = file.vmo.create_child(zx::VmoChildOptions::REFERENCE, 0, 0);
183 if did_create {
184 let set_up_vmo = |vmo| -> Result<(), zx::Status> {
185 self.watch_for_zero_children(vmo, inode_num)?;
186 vmo.set_name(&zx::Name::new_lossy(&format!("ext4!{}", name)))?;
187 Ok(())
188 };
189
190 if let Err(e) = set_up_vmo(&file.vmo) {
191 self.files_by_inode.lock().remove(&inode_num);
192 return Err(e);
193 }
194 }
195 child_vmo
196 }
197
198 pub fn run_pager_thread(&self) {
201 let transfer_vmo = with_zx_name(
202 zx::Vmo::create(TRANSFER_VMO_SIZE).expect("unable to create transfer vmo"),
203 b"starnix:ext4",
204 );
205 let transfer_vmo_addr = fuchsia_runtime::vmar_root_self()
206 .map(
207 0,
208 &transfer_vmo,
209 0,
210 TRANSFER_VMO_SIZE as usize,
211 zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE | zx::VmarFlags::ALLOW_FAULTS,
212 )
213 .expect("unable to map transfer vmo");
214 scopeguard::defer!({
215 let _ = unsafe {
217 fuchsia_runtime::vmar_root_self()
218 .unmap(transfer_vmo_addr, TRANSFER_VMO_SIZE as usize)
219 };
220 });
221 loop {
222 match self.port.wait(zx::MonotonicInstant::INFINITE) {
223 Ok(packet) => {
224 match packet.contents() {
225 zx::PacketContents::Pager(contents)
226 if contents.command() == ZX_PAGER_VMO_READ =>
227 {
228 trace_duration!(CATEGORY_STARNIX_PAGER, "vmo_read");
229 let inode_num = packet.key().try_into().expect("Unexpected packet key");
230 self.receive_pager_packet(
231 inode_num,
232 contents,
233 &transfer_vmo,
234 transfer_vmo_addr,
235 );
236 }
237 zx::PacketContents::Pager(contents)
238 if contents.command() == ZX_PAGER_VMO_COMPLETE =>
239 {
240 trace_duration!(CATEGORY_STARNIX_PAGER, "vmo_complete");
241 }
244 zx::PacketContents::SignalOne(signals)
245 if signals.observed().contains(zx::Signals::VMO_ZERO_CHILDREN) =>
246 {
247 trace_duration!(CATEGORY_STARNIX_PAGER, "signal_zero_children");
248 let inode_num = packet.key().try_into().expect("Unexpected packet key");
249 let mut files = self.files_by_inode.lock();
250 let file = files.entry(inode_num);
251 if let Entry::Occupied(o) = file {
252 let vmo = &o.get().vmo;
253 match vmo.info() {
254 Ok(info) => {
255 if info.num_children == 0 {
256 o.remove();
258 } else {
259 if let Err(error) =
262 self.watch_for_zero_children(vmo, inode_num)
263 {
264 log_error!(
265 error:?;
266 "watch_for_zero_children failed"
267 );
268 }
269 }
270 }
271 Err(error) => log_error!(error:?; "Vmo::info failed"),
272 }
273 }
274 }
275 zx::PacketContents::User(_) => break,
276 _ => log_error!("Unexpected port packet: {:?}", packet.contents()),
277 }
278 }
279 Err(error) => log_error!(error:?; "Port::wait failed"),
280 }
281 }
282 log_debug!("Pager thread terminating");
283 }
284
285 pub fn terminate(&self) {
287 let up = zx::UserPacket::from_u8_array([0; 32]);
288 let packet = zx::Packet::from_user_packet(0, 0, up);
289 for _ in 0..PAGER_THREADS {
290 self.port.queue(&packet).unwrap();
291 }
292 }
293
294 fn watch_for_zero_children(&self, vmo: &zx::Vmo, inode_num: u32) -> Result<(), zx::Status> {
295 vmo.wait_async(
296 &self.port,
297 inode_num as u64,
298 zx::Signals::VMO_ZERO_CHILDREN,
299 zx::WaitAsyncOpts::empty(),
300 )
301 }
302
303 fn receive_pager_packet(
304 &self,
305 inode_num: u32,
306 contents: zx::PagerPacket,
307 transfer_vmo: &zx::Vmo,
308 transfer_vmo_addr: usize,
309 ) {
310 let Some(file) = self.files_by_inode.lock().get(&inode_num).cloned() else {
311 return;
312 };
313
314 let mut range = contents.range();
315
316 const ALIGNMENT: u64 = 128 * 1024;
318 let unaligned = (range.end - range.start) % ALIGNMENT;
319 let readahead_end =
320 if unaligned > 0 { range.end - unaligned + ALIGNMENT } else { range.end };
321
322 let start_block = (range.start / self.block_size) as u32;
323 let mut ix = file.extents.partition_point(|e| e.logical.end <= start_block);
324
325 let buf = unsafe {
328 std::slice::from_raw_parts_mut(transfer_vmo_addr as *mut u8, TRANSFER_VMO_SIZE as usize)
329 };
330
331 let mut supply_helper = SupplyHelper::new(transfer_vmo, buf, &file.vmo, range.start, self);
332
333 while ix < file.extents.len() && range.start < readahead_end {
334 let extent = &file.extents[ix];
335
336 let logical_start = extent.logical.start as u64 * self.block_size;
337
338 if range.start < logical_start {
340 if let Err(e) = supply_helper.zero(logical_start - range.start) {
341 supply_helper.fail_to(range.end, e);
342 return;
343 }
344 range.start = logical_start;
345 }
346
347 let end = std::cmp::min(extent.logical.end as u64 * self.block_size, readahead_end);
348
349 while range.start < end {
350 let phys_offset =
351 extent.physical_block * self.block_size + range.start - logical_start;
352
353 match supply_helper.fill_buf(|buf| {
354 let amount = std::cmp::min(buf.len() as u64, end - range.start) as usize;
355 self.backing_vmo.read(&mut buf[..amount], phys_offset)?;
356 Ok(amount)
357 }) {
358 Ok(amount) => {
359 let _ = self.backing_vmo.op_range(
362 zx::VmoOp::DONT_NEED,
363 phys_offset,
364 amount as u64,
365 );
366 range.start += amount as u64;
367 }
368 Err(e) => {
369 supply_helper.fail_to(range.end, e);
370 return;
371 }
372 }
373 }
374
375 ix += 1;
376 }
377
378 if let Err(e) = supply_helper.finish(range.end) {
383 supply_helper.fail_to(range.end, e);
384 }
385 }
386}
387
388struct PagedFile {
390 vmo: zx::Vmo,
392
393 extents: Box<[PagerExtent]>,
396}
397
398pub struct PagerExtent {
400 pub logical: Range<u32>,
401 pub physical_block: u64,
402}
403
404struct SupplyHelper<'a> {
406 transfer_vmo: &'a zx::Vmo,
407 buffer: &'a mut [u8],
408 target_vmo: &'a zx::Vmo,
409 offset: u64,
410 pager: &'a Pager,
411 page_size: u64,
412 buf_len: usize,
413}
414
415impl<'a> SupplyHelper<'a> {
416 fn new(
417 transfer_vmo: &'a zx::Vmo,
418 buffer: &'a mut [u8],
419 target_vmo: &'a zx::Vmo,
420 offset: u64,
421 pager: &'a Pager,
422 ) -> Self {
423 Self {
424 transfer_vmo,
425 buffer,
426 target_vmo,
427 offset,
428 pager,
429 page_size: *starnix_core::mm::PAGE_SIZE,
430 buf_len: 0,
431 }
432 }
433
434 fn zero(&mut self, mut len: u64) -> Result<(), zx::Status> {
436 let unaligned = self.buf_len as u64 % self.page_size;
437 if unaligned > 0 {
438 let amount = std::cmp::min(self.page_size - unaligned, len);
439 self.buffer[self.buf_len..self.buf_len + amount as usize].fill(0);
440 self.buf_len += amount as usize;
441 len -= amount;
442 self.supply_pages()?;
443 }
444 while len >= self.page_size {
446 let amount =
447 if len >= ZERO_VMO_SIZE { ZERO_VMO_SIZE } else { len - len % self.page_size };
448 self.pager.pager.supply_pages(
449 self.target_vmo,
450 self.offset..self.offset + amount,
451 &self.pager.zero_vmo,
452 0,
453 )?;
454 self.offset += amount;
455 len -= amount;
456 }
457 self.buffer[self.buf_len..self.buf_len + len as usize].fill(0);
459 self.buf_len += len as usize;
460 Ok(())
461 }
462
463 fn supply_pages(&mut self) -> Result<(), zx::Status> {
465 if self.buf_len as u64 >= self.page_size {
466 let len = self.buf_len - self.buf_len % self.page_size as usize;
467 self.pager.pager.supply_pages(
468 self.target_vmo,
469 self.offset..self.offset + len as u64,
470 self.transfer_vmo,
471 0,
472 )?;
473 self.buffer.copy_within(len..self.buf_len, 0);
475 self.buf_len -= len;
476 self.offset += len as u64;
477 }
478 Ok(())
479 }
480
481 fn fill_buf(
483 &mut self,
484 f: impl FnOnce(&mut [u8]) -> Result<usize, zx::Status>,
485 ) -> Result<usize, zx::Status> {
486 let amount = f(&mut self.buffer[self.buf_len..])?;
487 self.buf_len += amount;
488 self.supply_pages()?;
489 Ok(amount)
490 }
491
492 fn finish(&mut self, mut end: u64) -> Result<(), zx::Status> {
494 let byte_offset = self.offset + self.buf_len as u64;
495 end = std::cmp::max(end, byte_offset);
496 end = end + self.page_size - 1;
497 end -= end % self.page_size;
498 self.zero(end - byte_offset)
499 }
500
501 fn fail_to(&mut self, end: u64, error: zx::Status) {
503 if self.offset < end {
504 log_warn!(error:?; "Failing page-in, range: {:?}", self.offset..end);
505 match self.pager.pager.op_range(
507 zx::PagerOp::Fail(zx::Status::IO),
508 self.target_vmo,
509 self.offset..end,
510 ) {
511 Ok(()) => {}
512 Err(error) => log_error!(error:?; "Failed to report error"),
513 }
514 self.offset = end;
515 self.buf_len = 0;
516 }
517 }
518}
519
520#[cfg(test)]
521mod tests {
522 use super::{Pager, PagerExtent};
523
524 use fidl::HandleBased;
525 use std::sync::Arc;
526 use std::time::Duration;
527
528 #[::fuchsia::test]
529 async fn test_pager() {
530 let backing_vmo = zx::Vmo::create(1 * 1024 * 1024).expect("Vmo::craete failed");
531 let backing_vmo_clone =
532 backing_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).expect("failed handle dup");
533
534 let pager = Arc::new(Pager::new(backing_vmo_clone, 1024).expect("Pager::new failed"));
535
536 {
537 pager.start_threads();
538
539 let vmo = pager.register("a".into(), 1, 5, Box::new([])).expect("register failed");
541
542 let mut buf = vec![1; 5];
543 vmo.read(&mut buf, 0).expect("read failed");
544
545 assert_eq!(&buf, &[0; 5]);
546
547 let vmo = pager
549 .register(
550 "b".into(),
551 2,
552 5,
553 Box::new([PagerExtent { logical: 0..1, physical_block: 0 }]),
554 )
555 .expect("register failed");
556 backing_vmo.write(b"hello", 0).expect("write failed");
557 vmo.read(&mut buf, 0).expect("read failed");
558
559 assert_eq!(&buf, b"hello");
560
561 let file_size = (6 + 1 + 5 + 4) * 1024 + 100;
564 let vmo = pager
565 .register(
566 "c".into(),
567 3,
568 file_size,
569 Box::new([
570 PagerExtent { logical: 6..7, physical_block: 0 },
571 PagerExtent { logical: 12..13, physical_block: 1 },
572 ]),
573 )
574 .expect("register failed");
575 backing_vmo.write(b"there", 1024).expect("write failed");
576 let mut buf = vec![1; file_size as usize];
577 vmo.read(&mut buf, 0).expect("read failed");
578
579 let mut expected = vec![0; file_size as usize];
580 expected[6 * 1024..6 * 1024 + 5].copy_from_slice(b"hello");
581 expected[12 * 1024..12 * 1024 + 5].copy_from_slice(b"there");
582 assert_eq!(&buf, &expected);
583
584 let vmo = pager
586 .register(
587 "d".into(),
588 4,
589 file_size,
590 Box::new([
591 PagerExtent { logical: 6..7, physical_block: 0 },
592 PagerExtent { logical: 12..13, physical_block: 1 },
593 ]),
594 )
595 .expect("register failed");
596
597 let offset = 9000;
598 let mut buf = vec![1; (file_size - offset) as usize];
599 vmo.read(&mut buf, offset).expect("read failed");
600
601 assert_eq!(&buf, &expected[offset as usize..]);
602 }
603
604 loop {
606 if pager.files_by_inode.lock().is_empty() {
607 break;
608 }
609 std::thread::sleep(Duration::from_millis(10));
611 }
612 }
613}