1use anyhow::{Error, anyhow};
6use block_server::async_interface::{Interface, SessionManager};
7use block_server::{BlockInfo, BlockServer, DeviceInfo, ReadOptions, WriteFlags, WriteOptions};
8use fidl::endpoints::{ClientEnd, FromClient, RequestStream, ServerEnd, create_endpoints};
9use fs_management::filesystem::BlockConnector;
10use fuchsia_sync::Mutex;
11use fxfs_crypto::{FscryptSoftwareInoLblk32FileCipher, UnwrappedKey};
12use rand::Rng as _;
13use std::borrow::Cow;
14use std::collections::HashMap;
15use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
19
20pub enum WriteAction {
22 Write,
23 Discard,
24 Fail,
25}
26
27pub trait Observer: Send + Sync {
28 fn read(
29 &self,
30 _device_block_offset: u64,
31 _block_count: u32,
32 _vmo: &Arc<zx::Vmo>,
33 _vmo_offset: u64,
34 ) {
35 }
36
37 fn write(
38 &self,
39 _device_block_offset: u64,
40 _block_count: u32,
41 _vmo: &Arc<zx::Vmo>,
42 _vmo_offset: u64,
43 _opts: WriteOptions,
44 ) -> WriteAction {
45 WriteAction::Write
46 }
47
48 fn flush(&self, _writes: Option<&mut WriteCache>) {}
51
52 fn close(&self, _writes: Option<&mut WriteCache>) {}
55
56 fn trim(&self, _device_block_offset: u64, _block_count: u32) {}
57}
58
59pub struct FscryptInfo {
60 fscrypt_keys: HashMap<u8, FscryptSoftwareInoLblk32FileCipher>,
62 next_key_slot: u8,
63}
64
65pub struct VmoBackedServer {
67 server: BlockServer<SessionManager<Data>>,
68 fscrypt_info: Arc<Mutex<FscryptInfo>>,
69}
70
71pub enum InitialContents<'a> {
73 FromCapacity(u64),
75 FromCapacityAndBuffer(u64, &'a [u8]),
78 FromBuffer(&'a [u8]),
81 FromVmo(zx::Vmo),
83}
84
85pub struct VmoBackedServerOptions<'a> {
86 pub info: DeviceInfo,
88 pub block_size: u32,
89 pub initial_contents: InitialContents<'a>,
90 pub observer: Option<Box<dyn Observer>>,
91 pub write_tracking: bool,
95 pub max_jitter_usec: Option<u64>,
98}
99
100impl Default for VmoBackedServerOptions<'_> {
101 fn default() -> Self {
102 VmoBackedServerOptions {
103 info: DeviceInfo::Block(BlockInfo {
104 device_flags: fblock::Flag::empty(),
105 block_count: 0,
106 max_transfer_blocks: None,
107 }),
108 block_size: 512,
109 initial_contents: InitialContents::FromCapacity(0),
110 observer: None,
111 write_tracking: false,
112 max_jitter_usec: None,
113 }
114 }
115}
116
117impl VmoBackedServerOptions<'_> {
118 pub fn build(self) -> Result<VmoBackedServer, Error> {
119 let (data, block_count) = match self.initial_contents {
120 InitialContents::FromCapacity(block_count) => {
121 (zx::Vmo::create(block_count * self.block_size as u64)?, block_count)
122 }
123 InitialContents::FromCapacityAndBuffer(block_count, buf) => {
124 let needed =
125 buf.len()
126 .checked_next_multiple_of(self.block_size as usize)
127 .ok_or_else(|| anyhow!("Invalid buffer size"))? as u64
128 / self.block_size as u64;
129 if needed > block_count {
130 return Err(anyhow!("Not enough capacity: {needed} vs {block_count}"));
131 }
132 let vmo = zx::Vmo::create(block_count * self.block_size as u64)?;
133 vmo.write(buf, 0)?;
134 (vmo, block_count)
135 }
136 InitialContents::FromBuffer(buf) => {
137 let block_count =
138 buf.len()
139 .checked_next_multiple_of(self.block_size as usize)
140 .ok_or_else(|| anyhow!("Invalid buffer size"))? as u64
141 / self.block_size as u64;
142 let vmo = zx::Vmo::create(block_count * self.block_size as u64)?;
143 vmo.write(buf, 0)?;
144 (vmo, block_count)
145 }
146 InitialContents::FromVmo(vmo) => {
147 let size = vmo.get_size()?;
148 let block_count = size / self.block_size as u64;
149 (vmo, block_count)
150 }
151 };
152
153 let info = match self.info {
154 DeviceInfo::Block(mut info) => {
155 info.block_count = block_count;
156 DeviceInfo::Block(info)
157 }
158 DeviceInfo::Partition(mut info) => {
159 info.block_range = Some(0..block_count);
160 DeviceInfo::Partition(info)
161 }
162 };
163 let fscrypt_info =
164 Arc::new(Mutex::new(FscryptInfo { fscrypt_keys: HashMap::new(), next_key_slot: 0 }));
165 Ok(VmoBackedServer {
166 server: BlockServer::new(
167 self.block_size,
168 Arc::new(Data {
169 info,
170 block_size: self.block_size,
171 data,
172 observer: self.observer,
173 write_cache: if self.write_tracking {
174 Some(Mutex::new(WriteCache::new(self.block_size as u64)))
175 } else {
176 None
177 },
178 fscrypt_info: fscrypt_info.clone(),
179 max_jitter_usec: self.max_jitter_usec,
180 }),
181 ),
182 fscrypt_info,
183 })
184 }
185}
186
187impl VmoBackedServer {
188 pub async fn serve(&self, requests: fvolume::VolumeRequestStream) -> Result<(), Error> {
190 let res = self.server.handle_requests(requests).await;
191 self.server.session_manager().interface().client_closed()?;
192 res
193 }
194
195 pub fn program_key(&self, xts_key: &[u8; 64]) -> u8 {
199 let unwrapped_key = UnwrappedKey::new(xts_key.to_vec());
200 let cipher = FscryptSoftwareInoLblk32FileCipher::new(&unwrapped_key);
201 let mut fscrypt_info = self.fscrypt_info.lock();
202 let slot = fscrypt_info.next_key_slot;
203 fscrypt_info.fscrypt_keys.insert(slot, cipher);
204 fscrypt_info.next_key_slot += 1;
205 slot
206 }
207}
208
209pub struct VmoBackedServerConnector {
211 scope: fuchsia_async::Scope,
212 server: Arc<VmoBackedServer>,
213}
214
215impl VmoBackedServerConnector {
216 pub fn new(scope: fuchsia_async::Scope, server: Arc<VmoBackedServer>) -> Self {
217 Self { scope, server }
218 }
219}
220
221impl BlockConnector for VmoBackedServerConnector {
222 fn connect_channel_to_volume(
223 &self,
224 server_end: ServerEnd<fvolume::VolumeMarker>,
225 ) -> Result<(), Error> {
226 let server = self.server.clone();
227 let _ = self.scope.spawn(async move {
228 let _ = server.serve(server_end.into_stream()).await;
229 });
230 Ok(())
231 }
232}
233
234pub struct WriteCache {
237 block_size: u64,
238 block_offsets: Vec<u64>,
239 buffer: Vec<u8>,
240}
241
242impl WriteCache {
243 fn new(block_size: u64) -> Self {
244 Self { block_size, block_offsets: vec![], buffer: vec![] }
245 }
246
247 fn insert(&mut self, block_offset: u64, contents: &[u8]) {
248 let block_count = contents.len() as u64 / self.block_size;
249 let mut buf_offset = 0;
250 for offset in block_offset..block_offset + block_count {
251 self.block_offsets.push(offset);
252 self.buffer
253 .extend_from_slice(&contents[buf_offset..buf_offset + self.block_size as usize]);
254 buf_offset += self.block_size as usize;
255 }
256 }
257
258 fn read(
260 &self,
261 data: &zx::Vmo,
262 block_offset: u64,
263 contents: &mut [u8],
264 ) -> Result<(), zx::Status> {
265 let block_count = contents.len() as u64 / self.block_size;
266 let max_offset = block_offset + block_count;
267 data.read(contents, block_offset * self.block_size)?;
268 for (idx, offset) in self.block_offsets.iter().enumerate() {
272 if *offset >= block_offset && *offset < max_offset {
273 let in_offset = idx * self.block_size as usize;
274 let out_offset = ((*offset - block_offset) * self.block_size) as usize;
275 contents[out_offset..out_offset + self.block_size as usize]
276 .copy_from_slice(&self.buffer[in_offset..in_offset + self.block_size as usize]);
277 }
278 }
279 Ok(())
280 }
281
282 fn apply(&mut self, data: &zx::Vmo) -> Result<(), zx::Status> {
284 let mut buf_offset = 0;
285 for offset in self.block_offsets.drain(..) {
286 data.write(
287 &self.buffer[buf_offset..buf_offset + self.block_size as usize],
288 offset * self.block_size,
289 )?;
290 buf_offset += self.block_size as usize;
291 }
292 self.buffer.clear();
293 Ok(())
294 }
295
296 pub fn len(&self) -> usize {
298 self.block_offsets.len()
299 }
300
301 pub fn iter(&self) -> impl Iterator<Item = (&u64, &[u8])> {
303 self.block_offsets.iter().zip(self.buffer.windows(self.block_size as usize))
304 }
305
306 fn swap_writes(&mut self, i: usize, j: usize) {
307 self.block_offsets.swap(i, j);
308 let bs = self.block_size as usize;
309 let mut buf = vec![0u8; bs];
310 buf.copy_from_slice(&self.buffer[i * bs..(i + 1) * bs]);
311 self.buffer.copy_within(j * bs..(j + 1) * bs, i * bs);
312 self.buffer[j * bs..(j + 1) * bs].copy_from_slice(&buf[..]);
313 }
314
315 pub fn shuffle(&mut self) {
317 let mut rng = rand::rng();
319 for i in 0..self.block_offsets.len() {
320 let j = rng.random_range(0..=i);
321 if i != j {
322 self.swap_writes(i, j);
323 }
324 }
325 }
326
327 pub fn discard_some(&mut self) {
329 let mut rng = rand::rng();
330 let idx = rng.random_range(0..=self.block_offsets.len());
331 for i in idx..self.block_offsets.len() {
332 self.buffer[i * self.block_size as usize..(i + 1) * self.block_size as usize]
333 .fill(0xab);
334 }
335 }
336}
337
338pub trait VmoBackedServerTestingExt {
341 fn new(block_count: u64, block_size: u32, initial_content: &[u8]) -> Self;
342 fn from_vmo(block_size: u32, vmo: zx::Vmo) -> Self;
343 fn connect_server(self: &Arc<Self>, server: ServerEnd<fvolume::VolumeMarker>);
344 fn connect<R: BlockClient>(self: &Arc<Self>) -> R;
345}
346
347pub trait BlockClient: FromClient {}
348
349impl BlockClient for fblock::BlockProxy {}
350impl BlockClient for fvolume::VolumeProxy {}
351impl BlockClient for fblock::BlockSynchronousProxy {}
352impl BlockClient for fvolume::VolumeSynchronousProxy {}
353impl BlockClient for ClientEnd<fblock::BlockMarker> {}
354impl BlockClient for ClientEnd<fvolume::VolumeMarker> {}
355
356impl VmoBackedServerTestingExt for VmoBackedServer {
357 fn new(block_count: u64, block_size: u32, initial_content: &[u8]) -> Self {
358 VmoBackedServerOptions {
359 block_size,
360 initial_contents: InitialContents::FromCapacityAndBuffer(block_count, initial_content),
361 ..Default::default()
362 }
363 .build()
364 .unwrap()
365 }
366 fn from_vmo(block_size: u32, vmo: zx::Vmo) -> Self {
367 VmoBackedServerOptions {
368 block_size,
369 initial_contents: InitialContents::FromVmo(vmo),
370 ..Default::default()
371 }
372 .build()
373 .unwrap()
374 }
375
376 fn connect<R: BlockClient>(self: &Arc<Self>) -> R {
377 let (client, server) = create_endpoints::<R::Protocol>();
378 let this = self.clone();
379 fuchsia_async::Task::spawn(async move {
380 let _ = this.serve(server.into_stream().cast_stream()).await;
381 })
382 .detach();
383 R::from_client(client)
384 }
385
386 fn connect_server(self: &Arc<Self>, server: ServerEnd<fvolume::VolumeMarker>) {
387 let this = self.clone();
388 fuchsia_async::Task::spawn(async move {
389 let _ = this.serve(server.into_stream()).await;
390 })
391 .detach();
392 }
393}
394
395struct Data {
396 info: DeviceInfo,
397 block_size: u32,
398 data: zx::Vmo,
399 observer: Option<Box<dyn Observer>>,
400 write_cache: Option<Mutex<WriteCache>>,
401 fscrypt_info: Arc<Mutex<FscryptInfo>>,
402 max_jitter_usec: Option<u64>,
403}
404
405impl Data {
406 fn jitter(&self) -> Option<fuchsia_async::Timer> {
407 self.max_jitter_usec
408 .map(|max| fuchsia_async::Timer::new(Duration::from_micros(rand::random_range(0..max))))
409 }
410
411 fn client_closed(&self) -> Result<(), zx::Status> {
412 if let Some(mut cache) = self.write_cache.as_ref().map(|w| w.lock()) {
413 if let Some(observer) = self.observer.as_ref() {
414 observer.close(Some(&mut *cache));
415 }
416 cache.apply(&self.data)
417 } else {
418 if let Some(observer) = self.observer.as_ref() {
419 observer.close(None);
420 }
421 Ok(())
422 }
423 }
424}
425
426impl Interface for Data {
427 fn get_info(&self) -> Cow<'_, DeviceInfo> {
428 Cow::Borrowed(&self.info)
429 }
430
431 async fn read(
432 &self,
433 device_block_offset: u64,
434 block_count: u32,
435 vmo: &Arc<zx::Vmo>,
436 vmo_offset: u64,
437 opts: ReadOptions,
438 _trace_flow_id: Option<NonZero<u64>>,
439 ) -> Result<(), zx::Status> {
440 if let Some(jitter) = self.jitter() {
441 jitter.await;
442 }
443 if let Some(observer) = self.observer.as_ref() {
444 observer.read(device_block_offset, block_count, vmo, vmo_offset);
445 }
446 if let Some(max) = self.info.max_transfer_blocks().as_ref() {
447 assert!(block_count <= max.get());
449 }
450 if device_block_offset + block_count as u64 > self.info.block_count().unwrap() {
451 Err(zx::Status::OUT_OF_RANGE)
452 } else {
453 let mut data = if let Some(tracking) = self.write_cache.as_ref() {
454 let mut data = vec![0u8; block_count as usize * self.block_size as usize];
455 tracking.lock().read(&self.data, device_block_offset, &mut data[..])?;
456 data
457 } else {
458 self.data.read_to_vec(
459 device_block_offset * self.block_size as u64,
460 block_count as u64 * self.block_size as u64,
461 )?
462 };
463
464 if opts.inline_crypto_options.slot != 0xff {
465 let fscrypt_info = self.fscrypt_info.lock();
466 if let Some(cipher) =
467 fscrypt_info.fscrypt_keys.get(&opts.inline_crypto_options.slot)
468 {
469 cipher
470 .decrypt(&mut data, opts.inline_crypto_options.dun as u128)
471 .map_err(|_| zx::Status::IO)?;
472 }
473 }
474 vmo.write(&data[..], vmo_offset)
475 }
476 }
477
478 async fn write(
479 &self,
480 device_block_offset: u64,
481 block_count: u32,
482 vmo: &Arc<zx::Vmo>,
483 vmo_offset: u64,
484 opts: WriteOptions,
485 _trace_flow_id: Option<NonZero<u64>>,
486 ) -> Result<(), zx::Status> {
487 if let Some(jitter) = self.jitter() {
488 jitter.await;
489 }
490 if let Some(observer) = self.observer.as_ref() {
491 match observer.write(device_block_offset, block_count, vmo, vmo_offset, opts) {
492 WriteAction::Write => {}
493 WriteAction::Discard => return Ok(()),
494 WriteAction::Fail => return Err(zx::Status::IO),
495 }
496 }
497 if opts.flags.contains(WriteFlags::PRE_BARRIER) {
498 if let Some(cache) = self.write_cache.as_ref() {
499 cache.lock().apply(&self.data)?;
500 }
501 }
502 if let Some(max) = self.info.max_transfer_blocks().as_ref() {
503 assert!(block_count <= max.get());
505 }
506 if device_block_offset + block_count as u64 > self.info.block_count().unwrap() {
507 Err(zx::Status::OUT_OF_RANGE)
508 } else {
509 let mut data =
510 vmo.read_to_vec(vmo_offset, block_count as u64 * self.block_size as u64)?;
511 if !opts.flags.contains(WriteFlags::FORCE_ACCESS)
512 && let Some(tracking) = self.write_cache.as_ref()
513 {
514 tracking.lock().insert(device_block_offset, &data[..]);
515 }
516 if opts.inline_crypto_options.slot != 0xff {
517 let fscrypt_info = self.fscrypt_info.lock();
518 if let Some(cipher) =
519 fscrypt_info.fscrypt_keys.get(&opts.inline_crypto_options.slot)
520 {
521 cipher
522 .encrypt(&mut data, opts.inline_crypto_options.dun as u128)
523 .map_err(|_| zx::Status::IO)?;
524 }
525 }
526 self.data.write(&data[..], device_block_offset * self.block_size as u64)?;
527 Ok(())
528 }
529 }
530
531 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
532 if let Some(jitter) = self.jitter() {
533 jitter.await;
534 }
535 let mut cache = self.write_cache.as_ref().map(|w| w.lock());
536 if let Some(observer) = self.observer.as_ref() {
537 match cache.as_mut() {
538 Some(w) => observer.flush(Some(&mut *w)),
539 None => observer.flush(None),
540 }
541 }
542 if let Some(w) = cache.as_mut() { w.apply(&self.data) } else { Ok(()) }
543 }
544
545 async fn trim(
546 &self,
547 device_block_offset: u64,
548 block_count: u32,
549 _trace_flow_id: Option<NonZero<u64>>,
550 ) -> Result<(), zx::Status> {
551 if let Some(jitter) = self.jitter() {
552 jitter.await;
553 }
554 if let Some(observer) = self.observer.as_ref() {
555 observer.trim(device_block_offset, block_count);
556 }
557 if device_block_offset + block_count as u64 > self.info.block_count().unwrap() {
558 Err(zx::Status::OUT_OF_RANGE)
559 } else {
560 Ok(())
561 }
562 }
563}