storage_device/
fake_device.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::buffer::{BufferFuture, BufferRef, MutableBufferRef};
6use crate::buffer_allocator::{BufferAllocator, BufferSource};
7use crate::{Device, DeviceHolder};
8use anyhow::{Error, ensure};
9use async_trait::async_trait;
10use block_protocol::{ReadOptions, WriteOptions};
11use fuchsia_sync::Mutex;
12use rand::Rng;
13use std::ops::Range;
14use std::sync::atomic::{AtomicBool, Ordering};
15
16pub enum Op {
17    Read,
18    Write,
19    Flush,
20}
21
22pub trait Observer: Send + Sync {
23    fn barrier(&self) {}
24}
25
26#[derive(Debug, Default, Clone)]
27struct Inner {
28    data: Vec<u8>,
29    blocks_written_since_last_barrier: Vec<usize>,
30    attach_barrier: bool,
31}
32
33/// A Device backed by a memory buffer.
34pub struct FakeDevice {
35    allocator: BufferAllocator,
36    inner: Mutex<Inner>,
37    closed: AtomicBool,
38    operation_closure: Box<dyn Fn(Op) -> Result<(), Error> + Send + Sync>,
39    read_only: AtomicBool,
40    poisoned: AtomicBool,
41    observer: Option<Box<dyn Observer>>,
42}
43
44const TRANSFER_HEAP_SIZE: usize = 64 * 1024 * 1024;
45
46impl FakeDevice {
47    pub fn new(block_count: u64, block_size: u32) -> Self {
48        let allocator =
49            BufferAllocator::new(block_size as usize, BufferSource::new(TRANSFER_HEAP_SIZE));
50        Self {
51            allocator,
52            inner: Mutex::new(Inner {
53                data: vec![0 as u8; block_count as usize * block_size as usize],
54                blocks_written_since_last_barrier: Vec::new(),
55                attach_barrier: false,
56            }),
57            closed: AtomicBool::new(false),
58            operation_closure: Box::new(|_: Op| Ok(())),
59            read_only: AtomicBool::new(false),
60            poisoned: AtomicBool::new(false),
61            observer: None,
62        }
63    }
64
65    pub fn set_observer(&mut self, observer: Box<dyn Observer>) {
66        self.observer = Some(observer);
67    }
68
69    /// Sets a callback that will run at the beginning of read, write, and flush which will forward
70    /// any errors, and proceed on Ok().
71    pub fn set_op_callback(
72        &mut self,
73        cb: impl Fn(Op) -> Result<(), Error> + Send + Sync + 'static,
74    ) {
75        self.operation_closure = Box::new(cb);
76    }
77
78    /// Creates a fake block device from an image (which can be anything that implements
79    /// std::io::Read).  The size of the device is determined by how much data is read.
80    pub fn from_image(
81        mut reader: impl std::io::Read,
82        block_size: u32,
83    ) -> Result<Self, std::io::Error> {
84        let allocator =
85            BufferAllocator::new(block_size as usize, BufferSource::new(TRANSFER_HEAP_SIZE));
86        let mut data = Vec::new();
87        reader.read_to_end(&mut data)?;
88        Ok(Self {
89            allocator,
90            inner: Mutex::new(Inner {
91                data: data,
92                blocks_written_since_last_barrier: Vec::new(),
93                attach_barrier: false,
94            }),
95            closed: AtomicBool::new(false),
96            operation_closure: Box::new(|_| Ok(())),
97            read_only: AtomicBool::new(false),
98            poisoned: AtomicBool::new(false),
99            observer: None,
100        })
101    }
102}
103
104#[async_trait]
105impl Device for FakeDevice {
106    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
107        assert!(!self.closed.load(Ordering::Relaxed));
108        self.allocator.allocate_buffer(size)
109    }
110
111    fn block_size(&self) -> u32 {
112        self.allocator.block_size() as u32
113    }
114
115    fn block_count(&self) -> u64 {
116        self.inner.lock().data.len() as u64 / self.block_size() as u64
117    }
118
119    async fn read_with_opts(
120        &self,
121        offset: u64,
122        mut buffer: MutableBufferRef<'_>,
123        _read_opts: ReadOptions,
124    ) -> Result<(), Error> {
125        ensure!(!self.closed.load(Ordering::Relaxed));
126        (self.operation_closure)(Op::Read)?;
127        let offset = offset as usize;
128        assert_eq!(offset % self.allocator.block_size(), 0);
129        let inner = self.inner.lock();
130        let size = buffer.len();
131        assert!(
132            offset + size <= inner.data.len(),
133            "offset: {} len: {} data.len: {}",
134            offset,
135            size,
136            inner.data.len()
137        );
138        buffer.as_mut_slice().copy_from_slice(&inner.data[offset..offset + size]);
139        Ok(())
140    }
141
142    async fn write_with_opts(
143        &self,
144        offset: u64,
145        buffer: BufferRef<'_>,
146        _write_opts: WriteOptions,
147    ) -> Result<(), Error> {
148        ensure!(!self.closed.load(Ordering::Relaxed));
149        ensure!(!self.read_only.load(Ordering::Relaxed));
150        let mut inner = self.inner.lock();
151
152        if inner.attach_barrier {
153            inner.blocks_written_since_last_barrier.clear();
154            inner.attach_barrier = false;
155        }
156
157        (self.operation_closure)(Op::Write)?;
158        let offset = offset as usize;
159        assert_eq!(offset % self.allocator.block_size(), 0);
160
161        let size = buffer.len();
162        assert!(
163            offset + size <= inner.data.len(),
164            "offset: {} len: {} data.len: {}",
165            offset,
166            size,
167            inner.data.len()
168        );
169        inner.data[offset..offset + size].copy_from_slice(buffer.as_slice());
170        let first_block = offset / self.allocator.block_size();
171        for block in first_block..first_block + size / self.allocator.block_size() {
172            inner.blocks_written_since_last_barrier.push(block)
173        }
174        Ok(())
175    }
176
177    async fn trim(&self, range: Range<u64>) -> Result<(), Error> {
178        ensure!(!self.closed.load(Ordering::Relaxed));
179        ensure!(!self.read_only.load(Ordering::Relaxed));
180        assert_eq!(range.start % self.block_size() as u64, 0);
181        assert_eq!(range.end % self.block_size() as u64, 0);
182        // Blast over the range to simulate it being used for something else.
183        let mut inner = self.inner.lock();
184        inner.data[range.start as usize..range.end as usize].fill(0xab);
185        Ok(())
186    }
187
188    async fn close(&self) -> Result<(), Error> {
189        self.closed.store(true, Ordering::Relaxed);
190        Ok(())
191    }
192
193    async fn flush(&self) -> Result<(), Error> {
194        self.inner.lock().blocks_written_since_last_barrier.clear();
195        (self.operation_closure)(Op::Flush)
196    }
197
198    fn barrier(&self) {
199        if let Some(observer) = &self.observer {
200            observer.barrier();
201        }
202        self.inner.lock().attach_barrier = true;
203    }
204
205    fn reopen(&self, read_only: bool) {
206        self.closed.store(false, Ordering::Relaxed);
207        self.read_only.store(read_only, Ordering::Relaxed);
208    }
209
210    fn is_read_only(&self) -> bool {
211        self.read_only.load(Ordering::Relaxed)
212    }
213
214    fn supports_trim(&self) -> bool {
215        true
216    }
217
218    fn snapshot(&self) -> Result<DeviceHolder, Error> {
219        let allocator =
220            BufferAllocator::new(self.block_size() as usize, BufferSource::new(TRANSFER_HEAP_SIZE));
221        Ok(DeviceHolder::new(Self {
222            allocator,
223            inner: Mutex::new(self.inner.lock().clone()),
224            closed: AtomicBool::new(false),
225            operation_closure: Box::new(|_: Op| Ok(())),
226            read_only: AtomicBool::new(false),
227            poisoned: AtomicBool::new(false),
228            observer: None,
229        }))
230    }
231
232    fn discard_random_since_last_flush(&self) -> Result<(), Error> {
233        let bs = self.allocator.block_size();
234        let mut rng = rand::rng();
235        let mut guard = self.inner.lock();
236        let Inner { ref mut data, ref mut blocks_written_since_last_barrier, .. } = &mut *guard;
237        log::info!("Discarding from {blocks_written_since_last_barrier:?}");
238        let mut discarded = Vec::new();
239        for block in blocks_written_since_last_barrier.drain(..) {
240            if rng.random() {
241                data[block * bs..(block + 1) * bs].fill(0xaf);
242                discarded.push(block);
243            }
244        }
245        log::info!("Discarded {discarded:?}");
246        Ok(())
247    }
248
249    /// Sets the poisoned state for the device. A poisoned device will panic the thread that
250    /// performs Drop on it.
251    fn poison(&self) -> Result<(), Error> {
252        self.poisoned.store(true, Ordering::Relaxed);
253        Ok(())
254    }
255}
256
257impl Drop for FakeDevice {
258    fn drop(&mut self) {
259        if self.poisoned.load(Ordering::Relaxed) {
260            panic!("This device was poisoned to crash whomever is holding a reference here.");
261        }
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::FakeDevice;
268    use crate::Device;
269    use block_protocol::WriteOptions;
270
271    const TEST_DEVICE_BLOCK_SIZE: usize = 512;
272
273    #[fuchsia::test(threads = 10)]
274    async fn test_discard_random_with_barriers() {
275        let device = FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE as u32);
276        // Loop 100 times to catch errors.
277        for _ in 0..1000 {
278            let mut data = vec![0; 7 * TEST_DEVICE_BLOCK_SIZE];
279            rand::fill(&mut data[..]);
280            // Ensure that barriers work with overwrites.
281            let indices = [1, 2, 3, 4, 3, 5, 6];
282            for i in 0..indices.len() {
283                let mut buffer = device.allocate_buffer(TEST_DEVICE_BLOCK_SIZE).await;
284                if i == 2 || i == 5 {
285                    buffer.as_mut_slice().copy_from_slice(
286                        &data[indices[i] * TEST_DEVICE_BLOCK_SIZE
287                            ..indices[i] * TEST_DEVICE_BLOCK_SIZE + TEST_DEVICE_BLOCK_SIZE],
288                    );
289                    device.barrier();
290                    device
291                        .write(i as u64 * TEST_DEVICE_BLOCK_SIZE as u64, buffer.as_ref())
292                        .await
293                        .expect("Failed to write to FakeDevice");
294                } else {
295                    buffer.as_mut_slice().copy_from_slice(
296                        &data[indices[i] * TEST_DEVICE_BLOCK_SIZE
297                            ..indices[i] * TEST_DEVICE_BLOCK_SIZE + TEST_DEVICE_BLOCK_SIZE],
298                    );
299                    device
300                        .write_with_opts(
301                            i as u64 * TEST_DEVICE_BLOCK_SIZE as u64,
302                            buffer.as_ref(),
303                            WriteOptions::default(),
304                        )
305                        .await
306                        .expect("Failed to write to FakeDevice");
307                }
308            }
309            device.discard_random_since_last_flush().expect("failed to randomly discard writes");
310            let mut discard = false;
311            let mut discard_2 = false;
312            for i in 0..7 {
313                let mut read_buffer = device.allocate_buffer(TEST_DEVICE_BLOCK_SIZE).await;
314                device
315                    .read(i as u64 * TEST_DEVICE_BLOCK_SIZE as u64, read_buffer.as_mut())
316                    .await
317                    .expect("failed to read from FakeDevice");
318                let expected_data = &data[indices[i] * TEST_DEVICE_BLOCK_SIZE
319                    ..indices[i] * TEST_DEVICE_BLOCK_SIZE + TEST_DEVICE_BLOCK_SIZE];
320                if i < 2 {
321                    if expected_data != read_buffer.as_slice() {
322                        discard = true;
323                    }
324                } else if i < 5 {
325                    if discard == true {
326                        assert_ne!(expected_data, read_buffer.as_slice());
327                        discard_2 = true;
328                    } else if expected_data != read_buffer.as_slice() {
329                        discard_2 = true;
330                    }
331                } else if discard_2 == true {
332                    assert_ne!(expected_data, read_buffer.as_slice());
333                }
334            }
335        }
336    }
337}