vmo_backed_block_server/
vmo_backed_server.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{Error, anyhow};
6use block_server::async_interface::{Interface, SessionManager};
7use block_server::{BlockInfo, BlockServer, DeviceInfo, ReadOptions, WriteFlags, WriteOptions};
8use fidl::endpoints::{ClientEnd, FromClient, RequestStream, ServerEnd, create_endpoints};
9use fs_management::filesystem::BlockConnector;
10use fuchsia_sync::Mutex;
11use fxfs_crypto::{FscryptSoftwareInoLblk32FileCipher, UnwrappedKey};
12use rand::Rng as _;
13use std::borrow::Cow;
14use std::collections::HashMap;
15use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
19
20/// The Observer can silently discard writes, or fail them explicitly (zx::Status::IO is returned).
21pub enum WriteAction {
22    Write,
23    Discard,
24    Fail,
25}
26
27pub trait Observer: Send + Sync {
28    fn read(
29        &self,
30        _device_block_offset: u64,
31        _block_count: u32,
32        _vmo: &Arc<zx::Vmo>,
33        _vmo_offset: u64,
34    ) {
35    }
36
37    fn write(
38        &self,
39        _device_block_offset: u64,
40        _block_count: u32,
41        _vmo: &Arc<zx::Vmo>,
42        _vmo_offset: u64,
43        _opts: WriteOptions,
44    ) -> WriteAction {
45        WriteAction::Write
46    }
47
48    // If [`VmoBackedServerOptions::write_tracking`] is enabled, `writes` is set to the batch since
49    // last flush or barrier and can be freely modified.
50    fn flush(&self, _writes: Option<&mut WriteCache>) {}
51
52    // If [`VmoBackedServerOptions::write_tracking`] is enabled, `writes` is set to the batch since
53    // last flush or barrier and can be freely modified.
54    fn close(&self, _writes: Option<&mut WriteCache>) {}
55
56    fn trim(&self, _device_block_offset: u64, _block_count: u32) {}
57}
58
59pub struct FscryptInfo {
60    // Maps keyslots to lblk32 software ciphers used to encrypt/decrypt file contents.
61    fscrypt_keys: HashMap<u8, FscryptSoftwareInoLblk32FileCipher>,
62    next_key_slot: u8,
63}
64
65/// A local server backed by a VMO.
66pub struct VmoBackedServer {
67    server: BlockServer<SessionManager<Data>>,
68    fscrypt_info: Arc<Mutex<FscryptInfo>>,
69}
70
71/// The initial contents of the VMO.  This also determines the size of the block device.
72pub enum InitialContents<'a> {
73    /// An empty VMO will be created with capacity for this many *blocks*.
74    FromCapacity(u64),
75    /// A VMO is created with capacity for this many *blocks* and the buffer's contents copied into
76    /// it.
77    FromCapacityAndBuffer(u64, &'a [u8]),
78    /// A VMO is created which is exactly large enough for the initial contents (rounded up to block
79    /// size), and the buffer's contents copied into it.
80    FromBuffer(&'a [u8]),
81    /// The provided VMO is used.  If its size is not block-aligned, the data will be truncated.
82    FromVmo(zx::Vmo),
83}
84
85pub struct VmoBackedServerOptions<'a> {
86    /// NB: `block_count` is ignored as that comes from `initial_contents`.
87    pub info: DeviceInfo,
88    pub block_size: u32,
89    pub initial_contents: InitialContents<'a>,
90    pub observer: Option<Box<dyn Observer>>,
91    /// Enables write tracking so [`Observer::flush`] and [`Observer::barrier`] will be provided
92    /// with [`WriteCache`].
93    /// Note that this is expensive and should mainly be used for tests.
94    pub write_tracking: bool,
95    /// If set, each operation will be delayed by a random duration <= this value, which is useful
96    /// for testing race conditions due to out-of-order block requests.
97    pub max_jitter_usec: Option<u64>,
98}
99
100impl Default for VmoBackedServerOptions<'_> {
101    fn default() -> Self {
102        VmoBackedServerOptions {
103            info: DeviceInfo::Block(BlockInfo {
104                device_flags: fblock::Flag::empty(),
105                block_count: 0,
106                max_transfer_blocks: None,
107            }),
108            block_size: 512,
109            initial_contents: InitialContents::FromCapacity(0),
110            observer: None,
111            write_tracking: false,
112            max_jitter_usec: None,
113        }
114    }
115}
116
117impl VmoBackedServerOptions<'_> {
118    pub fn build(self) -> Result<VmoBackedServer, Error> {
119        let (data, block_count) = match self.initial_contents {
120            InitialContents::FromCapacity(block_count) => {
121                (zx::Vmo::create(block_count * self.block_size as u64)?, block_count)
122            }
123            InitialContents::FromCapacityAndBuffer(block_count, buf) => {
124                let needed =
125                    buf.len()
126                        .checked_next_multiple_of(self.block_size as usize)
127                        .ok_or_else(|| anyhow!("Invalid buffer size"))? as u64
128                        / self.block_size as u64;
129                if needed > block_count {
130                    return Err(anyhow!("Not enough capacity: {needed} vs {block_count}"));
131                }
132                let vmo = zx::Vmo::create(block_count * self.block_size as u64)?;
133                vmo.write(buf, 0)?;
134                (vmo, block_count)
135            }
136            InitialContents::FromBuffer(buf) => {
137                let block_count =
138                    buf.len()
139                        .checked_next_multiple_of(self.block_size as usize)
140                        .ok_or_else(|| anyhow!("Invalid buffer size"))? as u64
141                        / self.block_size as u64;
142                let vmo = zx::Vmo::create(block_count * self.block_size as u64)?;
143                vmo.write(buf, 0)?;
144                (vmo, block_count)
145            }
146            InitialContents::FromVmo(vmo) => {
147                let size = vmo.get_size()?;
148                let block_count = size / self.block_size as u64;
149                (vmo, block_count)
150            }
151        };
152
153        let info = match self.info {
154            DeviceInfo::Block(mut info) => {
155                info.block_count = block_count;
156                DeviceInfo::Block(info)
157            }
158            DeviceInfo::Partition(mut info) => {
159                info.block_range = Some(0..block_count);
160                DeviceInfo::Partition(info)
161            }
162        };
163        let fscrypt_info =
164            Arc::new(Mutex::new(FscryptInfo { fscrypt_keys: HashMap::new(), next_key_slot: 0 }));
165        Ok(VmoBackedServer {
166            server: BlockServer::new(
167                self.block_size,
168                Arc::new(Data {
169                    info,
170                    block_size: self.block_size,
171                    data,
172                    observer: self.observer,
173                    write_cache: if self.write_tracking {
174                        Some(Mutex::new(WriteCache::new(self.block_size as u64)))
175                    } else {
176                        None
177                    },
178                    fscrypt_info: fscrypt_info.clone(),
179                    max_jitter_usec: self.max_jitter_usec,
180                }),
181            ),
182            fscrypt_info,
183        })
184    }
185}
186
187impl VmoBackedServer {
188    /// Handles `requests`.  The future will resolve when the stream terminates.
189    pub async fn serve(&self, requests: fvolume::VolumeRequestStream) -> Result<(), Error> {
190        let res = self.server.handle_requests(requests).await;
191        self.server.session_manager().interface().client_closed()?;
192        res
193    }
194
195    /// Implements software-fallback for fuchsia_hardware_inlineencryption.ProgramKey. There is no
196    /// limit on keyslots with the software fallback. As such, there is no mapping between keyslots
197    /// and FIDL connections or key eviction.
198    pub fn program_key(&self, xts_key: &[u8; 64]) -> u8 {
199        let unwrapped_key = UnwrappedKey::new(xts_key.to_vec());
200        let cipher = FscryptSoftwareInoLblk32FileCipher::new(&unwrapped_key);
201        let mut fscrypt_info = self.fscrypt_info.lock();
202        let slot = fscrypt_info.next_key_slot;
203        fscrypt_info.fscrypt_keys.insert(slot, cipher);
204        fscrypt_info.next_key_slot += 1;
205        slot
206    }
207}
208
209/// Implements `BlockConnector` to vend connections to a VmoBackedServer.
210pub struct VmoBackedServerConnector {
211    scope: fuchsia_async::Scope,
212    server: Arc<VmoBackedServer>,
213}
214
215impl VmoBackedServerConnector {
216    pub fn new(scope: fuchsia_async::Scope, server: Arc<VmoBackedServer>) -> Self {
217        Self { scope, server }
218    }
219}
220
221impl BlockConnector for VmoBackedServerConnector {
222    fn connect_channel_to_volume(
223        &self,
224        server_end: ServerEnd<fvolume::VolumeMarker>,
225    ) -> Result<(), Error> {
226        let server = self.server.clone();
227        let _ = self.scope.spawn(async move {
228            let _ = server.serve(server_end.into_stream()).await;
229        });
230        Ok(())
231    }
232}
233
234/// Keeps track of a sequence of writes since the last flush or barrier, and allows them to be
235/// arbitrarily modified or re-ordered.
236pub struct WriteCache {
237    block_size: u64,
238    block_offsets: Vec<u64>,
239    buffer: Vec<u8>,
240}
241
242impl WriteCache {
243    fn new(block_size: u64) -> Self {
244        Self { block_size, block_offsets: vec![], buffer: vec![] }
245    }
246
247    fn insert(&mut self, block_offset: u64, contents: &[u8]) {
248        let block_count = contents.len() as u64 / self.block_size;
249        let mut buf_offset = 0;
250        for offset in block_offset..block_offset + block_count {
251            self.block_offsets.push(offset);
252            self.buffer
253                .extend_from_slice(&contents[buf_offset..buf_offset + self.block_size as usize]);
254            buf_offset += self.block_size as usize;
255        }
256    }
257
258    // Reads the last written value, falling back to `data` if there are no local updates.
259    fn read(
260        &self,
261        data: &zx::Vmo,
262        block_offset: u64,
263        contents: &mut [u8],
264    ) -> Result<(), zx::Status> {
265        let block_count = contents.len() as u64 / self.block_size;
266        let max_offset = block_offset + block_count;
267        data.read(contents, block_offset * self.block_size)?;
268        // Apply any buffered writes that would overwrite the actual contents.  If the same offset
269        // shows up multiple times, we want to use the most recent write, so it's important to
270        // iterate in order.
271        for (idx, offset) in self.block_offsets.iter().enumerate() {
272            if *offset >= block_offset && *offset < max_offset {
273                let in_offset = idx * self.block_size as usize;
274                let out_offset = ((*offset - block_offset) * self.block_size) as usize;
275                contents[out_offset..out_offset + self.block_size as usize]
276                    .copy_from_slice(&self.buffer[in_offset..in_offset + self.block_size as usize]);
277            }
278        }
279        Ok(())
280    }
281
282    // Persists all writes to `data` and empties the cache.
283    fn apply(&mut self, data: &zx::Vmo) -> Result<(), zx::Status> {
284        let mut buf_offset = 0;
285        for offset in self.block_offsets.drain(..) {
286            data.write(
287                &self.buffer[buf_offset..buf_offset + self.block_size as usize],
288                offset * self.block_size,
289            )?;
290            buf_offset += self.block_size as usize;
291        }
292        self.buffer.clear();
293        Ok(())
294    }
295
296    /// Returns the number of writes in the batch.
297    pub fn len(&self) -> usize {
298        self.block_offsets.len()
299    }
300
301    /// Returns an iterator over the batch of writes (in temporal sequence).
302    pub fn iter(&self) -> impl Iterator<Item = (&u64, &[u8])> {
303        self.block_offsets.iter().zip(self.buffer.windows(self.block_size as usize))
304    }
305
306    fn swap_writes(&mut self, i: usize, j: usize) {
307        self.block_offsets.swap(i, j);
308        let bs = self.block_size as usize;
309        let mut buf = vec![0u8; bs];
310        buf.copy_from_slice(&self.buffer[i * bs..(i + 1) * bs]);
311        self.buffer.copy_within(j * bs..(j + 1) * bs, i * bs);
312        self.buffer[j * bs..(j + 1) * bs].copy_from_slice(&buf[..]);
313    }
314
315    /// Reorders all writes.
316    pub fn shuffle(&mut self) {
317        // Implements the Fisher–Yates shuffle.
318        let mut rng = rand::rng();
319        for i in 0..self.block_offsets.len() {
320            let j = rng.random_range(0..=i);
321            if i != j {
322                self.swap_writes(i, j);
323            }
324        }
325    }
326
327    /// Discards a random number of writes from the tail, simulating a power-cut.
328    pub fn discard_some(&mut self) {
329        let mut rng = rand::rng();
330        let idx = rng.random_range(0..=self.block_offsets.len());
331        for i in idx..self.block_offsets.len() {
332            self.buffer[i * self.block_size as usize..(i + 1) * self.block_size as usize]
333                .fill(0xab);
334        }
335    }
336}
337
338/// Extension trait for test-only functionality.  `unwrap` is used liberally in these functions, to
339/// simplify their usage in tests.
340pub trait VmoBackedServerTestingExt {
341    fn new(block_count: u64, block_size: u32, initial_content: &[u8]) -> Self;
342    fn from_vmo(block_size: u32, vmo: zx::Vmo) -> Self;
343    fn connect_server(self: &Arc<Self>, server: ServerEnd<fvolume::VolumeMarker>);
344    fn connect<R: BlockClient>(self: &Arc<Self>) -> R;
345}
346
347pub trait BlockClient: FromClient {}
348
349impl BlockClient for fblock::BlockProxy {}
350impl BlockClient for fvolume::VolumeProxy {}
351impl BlockClient for fblock::BlockSynchronousProxy {}
352impl BlockClient for fvolume::VolumeSynchronousProxy {}
353impl BlockClient for ClientEnd<fblock::BlockMarker> {}
354impl BlockClient for ClientEnd<fvolume::VolumeMarker> {}
355
356impl VmoBackedServerTestingExt for VmoBackedServer {
357    fn new(block_count: u64, block_size: u32, initial_content: &[u8]) -> Self {
358        VmoBackedServerOptions {
359            block_size,
360            initial_contents: InitialContents::FromCapacityAndBuffer(block_count, initial_content),
361            ..Default::default()
362        }
363        .build()
364        .unwrap()
365    }
366    fn from_vmo(block_size: u32, vmo: zx::Vmo) -> Self {
367        VmoBackedServerOptions {
368            block_size,
369            initial_contents: InitialContents::FromVmo(vmo),
370            ..Default::default()
371        }
372        .build()
373        .unwrap()
374    }
375
376    fn connect<R: BlockClient>(self: &Arc<Self>) -> R {
377        let (client, server) = create_endpoints::<R::Protocol>();
378        let this = self.clone();
379        fuchsia_async::Task::spawn(async move {
380            let _ = this.serve(server.into_stream().cast_stream()).await;
381        })
382        .detach();
383        R::from_client(client)
384    }
385
386    fn connect_server(self: &Arc<Self>, server: ServerEnd<fvolume::VolumeMarker>) {
387        let this = self.clone();
388        fuchsia_async::Task::spawn(async move {
389            let _ = this.serve(server.into_stream()).await;
390        })
391        .detach();
392    }
393}
394
395struct Data {
396    info: DeviceInfo,
397    block_size: u32,
398    data: zx::Vmo,
399    observer: Option<Box<dyn Observer>>,
400    write_cache: Option<Mutex<WriteCache>>,
401    fscrypt_info: Arc<Mutex<FscryptInfo>>,
402    max_jitter_usec: Option<u64>,
403}
404
405impl Data {
406    fn jitter(&self) -> Option<fuchsia_async::Timer> {
407        self.max_jitter_usec
408            .map(|max| fuchsia_async::Timer::new(Duration::from_micros(rand::random_range(0..max))))
409    }
410
411    fn client_closed(&self) -> Result<(), zx::Status> {
412        if let Some(mut cache) = self.write_cache.as_ref().map(|w| w.lock()) {
413            if let Some(observer) = self.observer.as_ref() {
414                observer.close(Some(&mut *cache));
415            }
416            cache.apply(&self.data)
417        } else {
418            if let Some(observer) = self.observer.as_ref() {
419                observer.close(None);
420            }
421            Ok(())
422        }
423    }
424}
425
426impl Interface for Data {
427    fn get_info(&self) -> Cow<'_, DeviceInfo> {
428        Cow::Borrowed(&self.info)
429    }
430
431    async fn read(
432        &self,
433        device_block_offset: u64,
434        block_count: u32,
435        vmo: &Arc<zx::Vmo>,
436        vmo_offset: u64,
437        opts: ReadOptions,
438        _trace_flow_id: Option<NonZero<u64>>,
439    ) -> Result<(), zx::Status> {
440        if let Some(jitter) = self.jitter() {
441            jitter.await;
442        }
443        if let Some(observer) = self.observer.as_ref() {
444            observer.read(device_block_offset, block_count, vmo, vmo_offset);
445        }
446        if let Some(max) = self.info.max_transfer_blocks().as_ref() {
447            // Requests should be split up by the core library
448            assert!(block_count <= max.get());
449        }
450        if device_block_offset + block_count as u64 > self.info.block_count().unwrap() {
451            Err(zx::Status::OUT_OF_RANGE)
452        } else {
453            let mut data = if let Some(tracking) = self.write_cache.as_ref() {
454                let mut data = vec![0u8; block_count as usize * self.block_size as usize];
455                tracking.lock().read(&self.data, device_block_offset, &mut data[..])?;
456                data
457            } else {
458                self.data.read_to_vec(
459                    device_block_offset * self.block_size as u64,
460                    block_count as u64 * self.block_size as u64,
461                )?
462            };
463
464            if opts.inline_crypto_options.slot != 0xff {
465                let fscrypt_info = self.fscrypt_info.lock();
466                if let Some(cipher) =
467                    fscrypt_info.fscrypt_keys.get(&opts.inline_crypto_options.slot)
468                {
469                    cipher
470                        .decrypt(&mut data, opts.inline_crypto_options.dun as u128)
471                        .map_err(|_| zx::Status::IO)?;
472                }
473            }
474            vmo.write(&data[..], vmo_offset)
475        }
476    }
477
478    async fn write(
479        &self,
480        device_block_offset: u64,
481        block_count: u32,
482        vmo: &Arc<zx::Vmo>,
483        vmo_offset: u64,
484        opts: WriteOptions,
485        _trace_flow_id: Option<NonZero<u64>>,
486    ) -> Result<(), zx::Status> {
487        if let Some(jitter) = self.jitter() {
488            jitter.await;
489        }
490        if let Some(observer) = self.observer.as_ref() {
491            match observer.write(device_block_offset, block_count, vmo, vmo_offset, opts) {
492                WriteAction::Write => {}
493                WriteAction::Discard => return Ok(()),
494                WriteAction::Fail => return Err(zx::Status::IO),
495            }
496        }
497        if opts.flags.contains(WriteFlags::PRE_BARRIER) {
498            if let Some(cache) = self.write_cache.as_ref() {
499                cache.lock().apply(&self.data)?;
500            }
501        }
502        if let Some(max) = self.info.max_transfer_blocks().as_ref() {
503            // Requests should be split up by the core library
504            assert!(block_count <= max.get());
505        }
506        if device_block_offset + block_count as u64 > self.info.block_count().unwrap() {
507            Err(zx::Status::OUT_OF_RANGE)
508        } else {
509            let mut data =
510                vmo.read_to_vec(vmo_offset, block_count as u64 * self.block_size as u64)?;
511            if !opts.flags.contains(WriteFlags::FORCE_ACCESS)
512                && let Some(tracking) = self.write_cache.as_ref()
513            {
514                tracking.lock().insert(device_block_offset, &data[..]);
515            }
516            if opts.inline_crypto_options.slot != 0xff {
517                let fscrypt_info = self.fscrypt_info.lock();
518                if let Some(cipher) =
519                    fscrypt_info.fscrypt_keys.get(&opts.inline_crypto_options.slot)
520                {
521                    cipher
522                        .encrypt(&mut data, opts.inline_crypto_options.dun as u128)
523                        .map_err(|_| zx::Status::IO)?;
524                }
525            }
526            self.data.write(&data[..], device_block_offset * self.block_size as u64)?;
527            Ok(())
528        }
529    }
530
531    async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
532        if let Some(jitter) = self.jitter() {
533            jitter.await;
534        }
535        let mut cache = self.write_cache.as_ref().map(|w| w.lock());
536        if let Some(observer) = self.observer.as_ref() {
537            match cache.as_mut() {
538                Some(w) => observer.flush(Some(&mut *w)),
539                None => observer.flush(None),
540            }
541        }
542        if let Some(w) = cache.as_mut() { w.apply(&self.data) } else { Ok(()) }
543    }
544
545    async fn trim(
546        &self,
547        device_block_offset: u64,
548        block_count: u32,
549        _trace_flow_id: Option<NonZero<u64>>,
550    ) -> Result<(), zx::Status> {
551        if let Some(jitter) = self.jitter() {
552            jitter.await;
553        }
554        if let Some(observer) = self.observer.as_ref() {
555            observer.trim(device_block_offset, block_count);
556        }
557        if device_block_offset + block_count as u64 > self.info.block_count().unwrap() {
558            Err(zx::Status::OUT_OF_RANGE)
559        } else {
560            Ok(())
561        }
562    }
563}