1use crate::buffer::{BufferFuture, BufferRef, MutableBufferRef};
6use crate::buffer_allocator::{BufferAllocator, BufferSource};
7use crate::{Device, DeviceHolder};
8use anyhow::{Error, ensure};
9use async_trait::async_trait;
10use block_protocol::{ReadOptions, WriteOptions};
11use fuchsia_sync::Mutex;
12use rand::Rng;
13use std::ops::Range;
14use std::sync::atomic::{AtomicBool, Ordering};
15
16pub enum Op {
17 Read,
18 Write,
19 Flush,
20}
21
22pub trait Observer: Send + Sync {
23 fn barrier(&self) {}
24}
25
26#[derive(Debug, Default, Clone)]
27struct Inner {
28 data: Vec<u8>,
29 blocks_written_since_last_barrier: Vec<usize>,
30 attach_barrier: bool,
31}
32
33pub struct FakeDevice {
35 allocator: BufferAllocator,
36 inner: Mutex<Inner>,
37 closed: AtomicBool,
38 operation_closure: Box<dyn Fn(Op) -> Result<(), Error> + Send + Sync>,
39 read_only: AtomicBool,
40 poisoned: AtomicBool,
41 observer: Option<Box<dyn Observer>>,
42}
43
44const TRANSFER_HEAP_SIZE: usize = 64 * 1024 * 1024;
45
46impl FakeDevice {
47 pub fn new(block_count: u64, block_size: u32) -> Self {
48 let allocator =
49 BufferAllocator::new(block_size as usize, BufferSource::new(TRANSFER_HEAP_SIZE));
50 Self {
51 allocator,
52 inner: Mutex::new(Inner {
53 data: vec![0 as u8; block_count as usize * block_size as usize],
54 blocks_written_since_last_barrier: Vec::new(),
55 attach_barrier: false,
56 }),
57 closed: AtomicBool::new(false),
58 operation_closure: Box::new(|_: Op| Ok(())),
59 read_only: AtomicBool::new(false),
60 poisoned: AtomicBool::new(false),
61 observer: None,
62 }
63 }
64
65 pub fn set_observer(&mut self, observer: Box<dyn Observer>) {
66 self.observer = Some(observer);
67 }
68
69 pub fn set_op_callback(
72 &mut self,
73 cb: impl Fn(Op) -> Result<(), Error> + Send + Sync + 'static,
74 ) {
75 self.operation_closure = Box::new(cb);
76 }
77
78 pub fn from_image(
81 mut reader: impl std::io::Read,
82 block_size: u32,
83 ) -> Result<Self, std::io::Error> {
84 let allocator =
85 BufferAllocator::new(block_size as usize, BufferSource::new(TRANSFER_HEAP_SIZE));
86 let mut data = Vec::new();
87 reader.read_to_end(&mut data)?;
88 Ok(Self {
89 allocator,
90 inner: Mutex::new(Inner {
91 data: data,
92 blocks_written_since_last_barrier: Vec::new(),
93 attach_barrier: false,
94 }),
95 closed: AtomicBool::new(false),
96 operation_closure: Box::new(|_| Ok(())),
97 read_only: AtomicBool::new(false),
98 poisoned: AtomicBool::new(false),
99 observer: None,
100 })
101 }
102}
103
104#[async_trait]
105impl Device for FakeDevice {
106 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
107 assert!(!self.closed.load(Ordering::Relaxed));
108 self.allocator.allocate_buffer(size)
109 }
110
111 fn block_size(&self) -> u32 {
112 self.allocator.block_size() as u32
113 }
114
115 fn block_count(&self) -> u64 {
116 self.inner.lock().data.len() as u64 / self.block_size() as u64
117 }
118
119 async fn read_with_opts(
120 &self,
121 offset: u64,
122 mut buffer: MutableBufferRef<'_>,
123 _read_opts: ReadOptions,
124 ) -> Result<(), Error> {
125 ensure!(!self.closed.load(Ordering::Relaxed));
126 (self.operation_closure)(Op::Read)?;
127 let offset = offset as usize;
128 assert_eq!(offset % self.allocator.block_size(), 0);
129 let inner = self.inner.lock();
130 let size = buffer.len();
131 assert!(
132 offset + size <= inner.data.len(),
133 "offset: {} len: {} data.len: {}",
134 offset,
135 size,
136 inner.data.len()
137 );
138 buffer.as_mut_slice().copy_from_slice(&inner.data[offset..offset + size]);
139 Ok(())
140 }
141
142 async fn write_with_opts(
143 &self,
144 offset: u64,
145 buffer: BufferRef<'_>,
146 _write_opts: WriteOptions,
147 ) -> Result<(), Error> {
148 ensure!(!self.closed.load(Ordering::Relaxed));
149 ensure!(!self.read_only.load(Ordering::Relaxed));
150 let mut inner = self.inner.lock();
151
152 if inner.attach_barrier {
153 inner.blocks_written_since_last_barrier.clear();
154 inner.attach_barrier = false;
155 }
156
157 (self.operation_closure)(Op::Write)?;
158 let offset = offset as usize;
159 assert_eq!(offset % self.allocator.block_size(), 0);
160
161 let size = buffer.len();
162 assert!(
163 offset + size <= inner.data.len(),
164 "offset: {} len: {} data.len: {}",
165 offset,
166 size,
167 inner.data.len()
168 );
169 inner.data[offset..offset + size].copy_from_slice(buffer.as_slice());
170 let first_block = offset / self.allocator.block_size();
171 for block in first_block..first_block + size / self.allocator.block_size() {
172 inner.blocks_written_since_last_barrier.push(block)
173 }
174 Ok(())
175 }
176
177 async fn trim(&self, range: Range<u64>) -> Result<(), Error> {
178 ensure!(!self.closed.load(Ordering::Relaxed));
179 ensure!(!self.read_only.load(Ordering::Relaxed));
180 assert_eq!(range.start % self.block_size() as u64, 0);
181 assert_eq!(range.end % self.block_size() as u64, 0);
182 let mut inner = self.inner.lock();
184 inner.data[range.start as usize..range.end as usize].fill(0xab);
185 Ok(())
186 }
187
188 async fn close(&self) -> Result<(), Error> {
189 self.closed.store(true, Ordering::Relaxed);
190 Ok(())
191 }
192
193 async fn flush(&self) -> Result<(), Error> {
194 self.inner.lock().blocks_written_since_last_barrier.clear();
195 (self.operation_closure)(Op::Flush)
196 }
197
198 fn barrier(&self) {
199 if let Some(observer) = &self.observer {
200 observer.barrier();
201 }
202 self.inner.lock().attach_barrier = true;
203 }
204
205 fn reopen(&self, read_only: bool) {
206 self.closed.store(false, Ordering::Relaxed);
207 self.read_only.store(read_only, Ordering::Relaxed);
208 }
209
210 fn is_read_only(&self) -> bool {
211 self.read_only.load(Ordering::Relaxed)
212 }
213
214 fn supports_trim(&self) -> bool {
215 true
216 }
217
218 fn snapshot(&self) -> Result<DeviceHolder, Error> {
219 let allocator =
220 BufferAllocator::new(self.block_size() as usize, BufferSource::new(TRANSFER_HEAP_SIZE));
221 Ok(DeviceHolder::new(Self {
222 allocator,
223 inner: Mutex::new(self.inner.lock().clone()),
224 closed: AtomicBool::new(false),
225 operation_closure: Box::new(|_: Op| Ok(())),
226 read_only: AtomicBool::new(false),
227 poisoned: AtomicBool::new(false),
228 observer: None,
229 }))
230 }
231
232 fn discard_random_since_last_flush(&self) -> Result<(), Error> {
233 let bs = self.allocator.block_size();
234 let mut rng = rand::rng();
235 let mut guard = self.inner.lock();
236 let Inner { ref mut data, ref mut blocks_written_since_last_barrier, .. } = &mut *guard;
237 log::info!("Discarding from {blocks_written_since_last_barrier:?}");
238 let mut discarded = Vec::new();
239 for block in blocks_written_since_last_barrier.drain(..) {
240 if rng.random() {
241 data[block * bs..(block + 1) * bs].fill(0xaf);
242 discarded.push(block);
243 }
244 }
245 log::info!("Discarded {discarded:?}");
246 Ok(())
247 }
248
249 fn poison(&self) -> Result<(), Error> {
252 self.poisoned.store(true, Ordering::Relaxed);
253 Ok(())
254 }
255}
256
257impl Drop for FakeDevice {
258 fn drop(&mut self) {
259 if self.poisoned.load(Ordering::Relaxed) {
260 panic!("This device was poisoned to crash whomever is holding a reference here.");
261 }
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::FakeDevice;
268 use crate::Device;
269 use block_protocol::WriteOptions;
270
271 const TEST_DEVICE_BLOCK_SIZE: usize = 512;
272
273 #[fuchsia::test(threads = 10)]
274 async fn test_discard_random_with_barriers() {
275 let device = FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE as u32);
276 for _ in 0..1000 {
278 let mut data = vec![0; 7 * TEST_DEVICE_BLOCK_SIZE];
279 rand::fill(&mut data[..]);
280 let indices = [1, 2, 3, 4, 3, 5, 6];
282 for i in 0..indices.len() {
283 let mut buffer = device.allocate_buffer(TEST_DEVICE_BLOCK_SIZE).await;
284 if i == 2 || i == 5 {
285 buffer.as_mut_slice().copy_from_slice(
286 &data[indices[i] * TEST_DEVICE_BLOCK_SIZE
287 ..indices[i] * TEST_DEVICE_BLOCK_SIZE + TEST_DEVICE_BLOCK_SIZE],
288 );
289 device.barrier();
290 device
291 .write(i as u64 * TEST_DEVICE_BLOCK_SIZE as u64, buffer.as_ref())
292 .await
293 .expect("Failed to write to FakeDevice");
294 } else {
295 buffer.as_mut_slice().copy_from_slice(
296 &data[indices[i] * TEST_DEVICE_BLOCK_SIZE
297 ..indices[i] * TEST_DEVICE_BLOCK_SIZE + TEST_DEVICE_BLOCK_SIZE],
298 );
299 device
300 .write_with_opts(
301 i as u64 * TEST_DEVICE_BLOCK_SIZE as u64,
302 buffer.as_ref(),
303 WriteOptions::default(),
304 )
305 .await
306 .expect("Failed to write to FakeDevice");
307 }
308 }
309 device.discard_random_since_last_flush().expect("failed to randomly discard writes");
310 let mut discard = false;
311 let mut discard_2 = false;
312 for i in 0..7 {
313 let mut read_buffer = device.allocate_buffer(TEST_DEVICE_BLOCK_SIZE).await;
314 device
315 .read(i as u64 * TEST_DEVICE_BLOCK_SIZE as u64, read_buffer.as_mut())
316 .await
317 .expect("failed to read from FakeDevice");
318 let expected_data = &data[indices[i] * TEST_DEVICE_BLOCK_SIZE
319 ..indices[i] * TEST_DEVICE_BLOCK_SIZE + TEST_DEVICE_BLOCK_SIZE];
320 if i < 2 {
321 if expected_data != read_buffer.as_slice() {
322 discard = true;
323 }
324 } else if i < 5 {
325 if discard == true {
326 assert_ne!(expected_data, read_buffer.as_slice());
327 discard_2 = true;
328 } else if expected_data != read_buffer.as_slice() {
329 discard_2 = true;
330 }
331 } else if discard_2 == true {
332 assert_ne!(expected_data, read_buffer.as_slice());
333 }
334 }
335 }
336 }
337}