storage_device/
fake_device.rs
1use crate::buffer::{BufferFuture, BufferRef, MutableBufferRef};
6use crate::buffer_allocator::{BufferAllocator, BufferSource};
7use crate::{Device, DeviceHolder};
8use anyhow::{ensure, Error};
9use async_trait::async_trait;
10use block_protocol::WriteOptions;
11use rand::Rng;
12use std::ops::Range;
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Mutex;
15
16pub enum Op {
17 Read,
18 Write,
19 Flush,
20}
21
22pub struct FakeDevice {
24 allocator: BufferAllocator,
25 data: Mutex<(Vec<u8>, Vec<usize>)>,
26 closed: AtomicBool,
27 operation_closure: Box<dyn Fn(Op) -> Result<(), Error> + Send + Sync>,
28 read_only: AtomicBool,
29 poisoned: AtomicBool,
30}
31
32const TRANSFER_HEAP_SIZE: usize = 64 * 1024 * 1024;
33
34impl FakeDevice {
35 pub fn new(block_count: u64, block_size: u32) -> Self {
36 let allocator =
37 BufferAllocator::new(block_size as usize, BufferSource::new(TRANSFER_HEAP_SIZE));
38 Self {
39 allocator,
40 data: Mutex::new((
41 vec![0 as u8; block_count as usize * block_size as usize],
42 Vec::new(),
43 )),
44 closed: AtomicBool::new(false),
45 operation_closure: Box::new(|_: Op| Ok(())),
46 read_only: AtomicBool::new(false),
47 poisoned: AtomicBool::new(false),
48 }
49 }
50
51 pub fn set_op_callback(
54 &mut self,
55 cb: impl Fn(Op) -> Result<(), Error> + Send + Sync + 'static,
56 ) {
57 self.operation_closure = Box::new(cb);
58 }
59
60 pub fn from_image(
63 mut reader: impl std::io::Read,
64 block_size: u32,
65 ) -> Result<Self, std::io::Error> {
66 let allocator =
67 BufferAllocator::new(block_size as usize, BufferSource::new(TRANSFER_HEAP_SIZE));
68 let mut data = Vec::new();
69 reader.read_to_end(&mut data)?;
70 Ok(Self {
71 allocator,
72 data: Mutex::new((data, Vec::new())),
73 closed: AtomicBool::new(false),
74 operation_closure: Box::new(|_| Ok(())),
75 read_only: AtomicBool::new(false),
76 poisoned: AtomicBool::new(false),
77 })
78 }
79}
80
81#[async_trait]
82impl Device for FakeDevice {
83 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
84 assert!(!self.closed.load(Ordering::Relaxed));
85 self.allocator.allocate_buffer(size)
86 }
87
88 fn block_size(&self) -> u32 {
89 self.allocator.block_size() as u32
90 }
91
92 fn block_count(&self) -> u64 {
93 self.data.lock().unwrap().0.len() as u64 / self.block_size() as u64
94 }
95
96 async fn read(&self, offset: u64, mut buffer: MutableBufferRef<'_>) -> Result<(), Error> {
97 ensure!(!self.closed.load(Ordering::Relaxed));
98 (self.operation_closure)(Op::Read)?;
99 let offset = offset as usize;
100 assert_eq!(offset % self.allocator.block_size(), 0);
101 let data = self.data.lock().unwrap();
102 let size = buffer.len();
103 assert!(
104 offset + size <= data.0.len(),
105 "offset: {} len: {} data.len: {}",
106 offset,
107 size,
108 data.0.len()
109 );
110 buffer.as_mut_slice().copy_from_slice(&data.0[offset..offset + size]);
111 Ok(())
112 }
113
114 async fn write_with_opts(
115 &self,
116 offset: u64,
117 buffer: BufferRef<'_>,
118 _opts: WriteOptions,
119 ) -> Result<(), Error> {
120 ensure!(!self.closed.load(Ordering::Relaxed));
121 ensure!(!self.read_only.load(Ordering::Relaxed));
122 (self.operation_closure)(Op::Write)?;
123 let offset = offset as usize;
124 assert_eq!(offset % self.allocator.block_size(), 0);
125 let mut data = self.data.lock().unwrap();
126 let size = buffer.len();
127 assert!(
128 offset + size <= data.0.len(),
129 "offset: {} len: {} data.len: {}",
130 offset,
131 size,
132 data.0.len()
133 );
134 data.0[offset..offset + size].copy_from_slice(buffer.as_slice());
135 let first_block = offset / self.allocator.block_size();
136 for block in first_block..first_block + size / self.allocator.block_size() {
137 data.1.push(block)
138 }
139 Ok(())
140 }
141
142 async fn trim(&self, range: Range<u64>) -> Result<(), Error> {
143 ensure!(!self.closed.load(Ordering::Relaxed));
144 ensure!(!self.read_only.load(Ordering::Relaxed));
145 assert_eq!(range.start % self.block_size() as u64, 0);
146 assert_eq!(range.end % self.block_size() as u64, 0);
147 let mut data = self.data.lock().unwrap();
149 data.0[range.start as usize..range.end as usize].fill(0xab);
150 Ok(())
151 }
152
153 async fn close(&self) -> Result<(), Error> {
154 self.closed.store(true, Ordering::Relaxed);
155 Ok(())
156 }
157
158 async fn flush(&self) -> Result<(), Error> {
159 self.data.lock().unwrap().1.clear();
160 (self.operation_closure)(Op::Flush)
161 }
162
163 fn reopen(&self, read_only: bool) {
164 self.closed.store(false, Ordering::Relaxed);
165 self.read_only.store(read_only, Ordering::Relaxed);
166 }
167
168 fn is_read_only(&self) -> bool {
169 self.read_only.load(Ordering::Relaxed)
170 }
171
172 fn supports_trim(&self) -> bool {
173 true
174 }
175
176 fn snapshot(&self) -> Result<DeviceHolder, Error> {
177 let allocator =
178 BufferAllocator::new(self.block_size() as usize, BufferSource::new(TRANSFER_HEAP_SIZE));
179 Ok(DeviceHolder::new(Self {
180 allocator,
181 data: Mutex::new(self.data.lock().unwrap().clone()),
182 closed: AtomicBool::new(false),
183 operation_closure: Box::new(|_: Op| Ok(())),
184 read_only: AtomicBool::new(false),
185 poisoned: AtomicBool::new(false),
186 }))
187 }
188
189 fn discard_random_since_last_flush(&self) -> Result<(), Error> {
190 let bs = self.allocator.block_size();
191 let mut rng = rand::thread_rng();
192 let mut guard = self.data.lock().unwrap();
193 let (ref mut data, ref mut blocks_written) = &mut *guard;
194 log::info!("Discarding from {blocks_written:?}");
195 let mut discarded = Vec::new();
196 for block in blocks_written.drain(..) {
197 if rng.gen() {
198 data[block * bs..(block + 1) * bs].fill(0xaf);
199 discarded.push(block);
200 }
201 }
202 log::info!("Discarded {discarded:?}");
203 Ok(())
204 }
205
206 fn poison(&self) -> Result<(), Error> {
209 self.poisoned.store(true, Ordering::Relaxed);
210 Ok(())
211 }
212}
213
214impl Drop for FakeDevice {
215 fn drop(&mut self) {
216 if self.poisoned.load(Ordering::Relaxed) {
217 panic!("This device was poisoned to crash whomever is holding a reference here.");
218 }
219 }
220}