1use super::{BufferSlice, MutableBufferSlice, RemoteBlockClientSync, VmoId};
6use anyhow::{ensure, Error};
7
8use linked_hash_map::LinkedHashMap;
9use log::error;
10use std::io::{SeekFrom, Write};
11
12const VMO_SIZE: u64 = 262_144;
13const BLOCK_SIZE: u64 = 8192;
14const BLOCK_COUNT: usize = (VMO_SIZE / BLOCK_SIZE) as usize;
15
16struct CacheEntry {
17 vmo_offset: u64,
18 dirty: bool,
19}
20
21#[derive(Debug, Default, Eq, PartialEq)]
22pub struct Stats {
23 read_count: u64,
24 write_count: u64,
25 cache_hits: u64,
26}
27
28pub struct Cache {
33 device: RemoteBlockClientSync,
34 vmo: zx::Vmo,
35 vmo_id: VmoId,
36 map: LinkedHashMap<u64, CacheEntry>,
37 offset: u64, stats: Stats,
39}
40
41impl Cache {
42 pub fn new(device: RemoteBlockClientSync) -> Result<Self, Error> {
44 ensure!(
45 BLOCK_SIZE % device.block_size() as u64 == 0,
46 "underlying block size not supported"
47 );
48 let vmo = zx::Vmo::create(VMO_SIZE)?;
49 let vmo_id = device.attach_vmo(&vmo)?;
50 Ok(Self {
51 device,
52 vmo,
53 vmo_id,
54 map: Default::default(),
55 offset: 0,
56 stats: Stats::default(),
57 })
58 }
59
60 fn device_size(&self) -> u64 {
61 self.device.block_count() * self.device.block_size() as u64
62 }
63
64 fn get_block(&mut self, offset: u64, mark_dirty: bool) -> Result<(u64, bool), Error> {
68 if let Some(ref mut entry) = self.map.get_refresh(&offset) {
69 self.stats.cache_hits += 1;
70 if mark_dirty {
71 entry.dirty = true;
72 }
73 Ok((entry.vmo_offset, true))
74 } else {
75 let vmo_offset = if self.map.len() < BLOCK_COUNT {
76 self.map.len() as u64 * BLOCK_SIZE
77 } else {
78 let entry = self.map.pop_front().unwrap();
79 if entry.1.dirty {
80 self.stats.write_count += 1;
81 self.device.write_at(
82 BufferSlice::new_with_vmo_id(
83 &self.vmo_id,
84 entry.1.vmo_offset,
85 std::cmp::min(BLOCK_SIZE, self.device_size() - entry.0),
86 ),
87 entry.0,
88 )?;
89 }
90 entry.1.vmo_offset
91 };
92 Ok((vmo_offset, false))
93 }
94 }
95
96 fn read_block(&mut self, offset: u64, mark_dirty: bool) -> Result<u64, Error> {
99 let (vmo_offset, hit) = self.get_block(offset, mark_dirty)?;
100 if !hit {
101 self.stats.read_count += 1;
102 self.device.read_at(
103 MutableBufferSlice::new_with_vmo_id(
104 &self.vmo_id,
105 vmo_offset,
106 std::cmp::min(BLOCK_SIZE, self.device_size() - offset),
107 ),
108 offset,
109 )?;
110 self.map.insert(offset, CacheEntry { vmo_offset, dirty: mark_dirty });
111 }
112 Ok(vmo_offset)
113 }
114
115 pub fn read_at(&mut self, mut buf: &mut [u8], offset: u64) -> Result<(), Error> {
117 ensure!(
118 offset <= self.device_size() && buf.len() as u64 <= self.device_size() - offset,
119 "read exceeds device size"
120 );
121
122 let mut aligned_offset = offset - offset % BLOCK_SIZE;
124 let end = offset + buf.len() as u64;
125 if aligned_offset < offset {
126 let vmo_offset = self.read_block(aligned_offset, false)?;
127 let to_copy = std::cmp::min(aligned_offset + BLOCK_SIZE, end) - offset;
128 self.vmo.read(&mut buf[..to_copy as usize], vmo_offset + offset - aligned_offset)?;
129 aligned_offset += BLOCK_SIZE;
130 buf = &mut buf[to_copy as usize..];
131 }
132
133 while aligned_offset + BLOCK_SIZE <= end {
135 let vmo_offset = self.read_block(aligned_offset, false)?;
136 self.vmo.read(&mut buf[..BLOCK_SIZE as usize], vmo_offset)?;
137 aligned_offset += BLOCK_SIZE;
138 buf = &mut buf[BLOCK_SIZE as usize..];
139 }
140
141 if end > aligned_offset {
143 let vmo_offset = self.read_block(aligned_offset, false)?;
144 self.vmo.read(buf, vmo_offset)?;
145 }
146 Ok(())
147 }
148
149 pub fn write_at(&mut self, mut buf: &[u8], offset: u64) -> Result<(), Error> {
151 ensure!(
152 offset <= self.device_size() && buf.len() as u64 <= self.device_size() - offset,
153 "write exceeds device size"
154 );
155
156 let mut aligned_offset = offset - offset % BLOCK_SIZE;
158 let end = offset + buf.len() as u64;
159 if aligned_offset < offset {
160 let vmo_offset = self.read_block(aligned_offset, true)?;
161 let to_copy = std::cmp::min(aligned_offset + BLOCK_SIZE, end) - offset;
162 self.vmo.write(&buf[..to_copy as usize], vmo_offset + offset - aligned_offset)?;
163 aligned_offset += BLOCK_SIZE;
164 buf = &buf[to_copy as usize..];
165 }
166
167 while aligned_offset + BLOCK_SIZE <= end {
169 let (vmo_offset, hit) = self.get_block(aligned_offset, true)?;
170 self.vmo.write(&buf[..BLOCK_SIZE as usize], vmo_offset)?;
171 if !hit {
172 self.map.insert(aligned_offset, CacheEntry { vmo_offset, dirty: true });
173 }
174 aligned_offset += BLOCK_SIZE;
175 buf = &buf[BLOCK_SIZE as usize..];
176 }
177
178 if end > aligned_offset {
180 let vmo_offset = self.read_block(aligned_offset, true)?;
181 self.vmo.write(buf, vmo_offset)?;
182 }
183 Ok(())
184 }
185
186 pub fn stats(&self) -> &Stats {
188 &self.stats
189 }
190
191 pub fn device(&self) -> &RemoteBlockClientSync {
194 &self.device
195 }
196
197 pub fn flush_device(&self) -> Result<(), Error> {
198 Ok(self.device.flush()?)
199 }
200}
201
202impl Drop for Cache {
203 fn drop(&mut self) {
204 if let Err(e) = self.flush() {
205 error!("Flush failed: {}", e);
206 }
207 let _ = self.vmo_id.take().into_id(); }
209}
210
211fn into_io_error<E: Into<Box<dyn std::error::Error + Send + Sync>>>(error: E) -> std::io::Error {
212 std::io::Error::other(error)
213}
214
215impl std::io::Read for Cache {
216 fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
217 if self.offset > self.device_size() {
218 return Ok(0);
219 }
220 let max_len = self.device_size() - self.offset;
221 if buf.len() as u64 > max_len {
222 buf = &mut buf[0..max_len as usize];
223 }
224 self.read_at(buf, self.offset).map_err(into_io_error)?;
225 self.offset += buf.len() as u64;
226 Ok(buf.len())
227 }
228}
229
230impl Write for Cache {
231 fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
232 if self.offset > self.device_size() {
233 return Ok(0);
234 }
235 let max_len = self.device_size() - self.offset;
236 if buf.len() as u64 > max_len {
237 buf = &buf[0..max_len as usize];
238 }
239 self.write_at(&buf, self.offset).map_err(into_io_error)?;
240 self.offset += buf.len() as u64;
241 Ok(buf.len())
242 }
243
244 fn flush(&mut self) -> std::io::Result<()> {
247 let max = self.device_size();
248 for mut entry in self.map.entries() {
249 if entry.get().dirty {
250 self.stats.write_count += 1;
251 self.device
252 .write_at(
253 BufferSlice::new_with_vmo_id(
254 &self.vmo_id,
255 entry.get().vmo_offset,
256 std::cmp::min(BLOCK_SIZE, max - *entry.key()),
257 ),
258 *entry.key(),
259 )
260 .map_err(into_io_error)?;
261 entry.get_mut().dirty = false;
262 }
263 }
264 Ok(())
265 }
266}
267
268impl std::io::Seek for Cache {
269 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
270 self.offset = match pos {
271 SeekFrom::Start(offset) => Some(offset),
272 SeekFrom::End(delta) => {
273 if delta >= 0 {
274 self.device_size().checked_add(delta as u64)
275 } else {
276 self.device_size().checked_sub(-delta as u64)
277 }
278 }
279 SeekFrom::Current(delta) => {
280 if delta >= 0 {
281 self.offset.checked_add(delta as u64)
282 } else {
283 self.offset.checked_sub(-delta as u64)
284 }
285 }
286 }
287 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "bad delta"))?;
288 Ok(self.offset)
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use super::{Cache, Stats};
295 use crate::RemoteBlockClientSync;
296 use ramdevice_client::RamdiskClient;
297 use std::io::{Read as _, Seek as _, SeekFrom, Write as _};
298
299 const RAMDISK_BLOCK_SIZE: u64 = 1024;
300 const RAMDISK_BLOCK_COUNT: u64 = 1023; const RAMDISK_SIZE: u64 = RAMDISK_BLOCK_SIZE * RAMDISK_BLOCK_COUNT;
302
303 pub async fn make_ramdisk() -> (RamdiskClient, RemoteBlockClientSync) {
304 let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
305 .await
306 .expect("RamdiskClient::create failed");
307 let client_end = ramdisk.open().expect("ramdisk.open failed");
308 let block_client =
309 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
310 (ramdisk, block_client)
311 }
312
313 #[fuchsia::test]
314 async fn test_cache_read_at_and_write_at_with_no_hits() {
315 let (_ramdisk, block_client) = make_ramdisk().await;
316 let mut cache = Cache::new(block_client).expect("Cache::new failed");
317 let mut offset = 5;
318 const TEST_COUNT: usize = super::BLOCK_COUNT * 2; const DATA: &[u8] = b"hello";
320 for _ in 0..TEST_COUNT {
321 cache.write_at(DATA, offset).expect("cache.write failed");
322 offset += super::BLOCK_SIZE + 1;
325 }
326 assert_eq!(
327 cache.stats(),
328 &Stats {
329 read_count: TEST_COUNT as u64,
330 write_count: super::BLOCK_COUNT as u64,
331 cache_hits: 0
332 }
333 );
334 offset = 5;
335 for _ in 0..TEST_COUNT {
336 let mut buf = [0; 5];
337 cache.read_at(&mut buf, offset).expect("cache.read_at failed");
338 assert_eq!(&buf, DATA);
339 offset += super::BLOCK_SIZE + 1;
340 }
341 assert_eq!(
342 cache.stats(),
343 &Stats {
344 read_count: 2 * TEST_COUNT as u64,
345 write_count: TEST_COUNT as u64,
346 cache_hits: 0
347 }
348 );
349 }
350
351 #[fuchsia::test]
352 async fn test_cache_read_at_and_write_at_with_hit() {
353 let (_ramdisk, block_client) = make_ramdisk().await;
354 let mut cache = Cache::new(block_client).expect("Cache::new failed");
355 const OFFSET: u64 = 11;
356 const DATA: &[u8] = b"hello";
357 cache.write_at(DATA, OFFSET).expect("cache.write failed");
358 let mut buf = [0; 5];
359 cache.read_at(&mut buf, OFFSET).expect("cache.read_at failed");
360 assert_eq!(&buf, DATA);
361 assert_eq!(cache.stats(), &Stats { read_count: 1, write_count: 0, cache_hits: 1 });
362 }
363
364 #[fuchsia::test]
365 async fn test_cache_aligned_read_at_and_write_at() {
366 let (_ramdisk, block_client) = make_ramdisk().await;
367 let mut cache = Cache::new(block_client).expect("Cache::new failed");
368 const OFFSET: u64 = 11;
369 const BLOCKS: usize = 3;
370 const DATA_LEN: usize = super::BLOCK_SIZE as usize * BLOCKS + 11;
371 let data = [0xe2u8; DATA_LEN];
372 cache.write_at(&data, OFFSET).expect("cache.write failed");
374 let mut buf = [0; DATA_LEN + 2]; cache.read_at(&mut buf, OFFSET - 1).expect("cache.read_at failed");
376 assert_eq!(buf[0], 0);
377 assert_eq!(buf[DATA_LEN + 1], 0);
378 assert_eq!(&buf[1..DATA_LEN + 1], &data[0..DATA_LEN]);
379 assert_eq!(
382 cache.stats(),
383 &Stats { read_count: 2, write_count: 0, cache_hits: BLOCKS as u64 + 1 }
384 );
385 }
386
387 #[fuchsia::test]
388 async fn test_cache_aligned_read_at_and_write_at_cold() {
389 let (ramdisk, block_client) = make_ramdisk().await;
391 let mut cache = Cache::new(block_client).expect("Cache::new failed");
392 const OFFSET: u64 = 11;
393 const BLOCKS: usize = 3;
394 const DATA_LEN: usize = super::BLOCK_SIZE as usize * BLOCKS + 11;
395 let data = [0xe2u8; DATA_LEN];
396 cache.write_at(&data, OFFSET).expect("cache.write failed");
398 assert_eq!(cache.stats(), &Stats { read_count: 2, write_count: 0, cache_hits: 0 });
399
400 drop(cache);
401 let client_end = ramdisk.open().expect("ramdisk.open failed");
402 let block_client =
403 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
404 let mut cache = Cache::new(block_client).expect("Cache::new failed");
405
406 let mut buf = [0; DATA_LEN + 2]; cache.read_at(&mut buf, OFFSET - 1).expect("cache.read_at failed");
408 assert_eq!(buf[0], 0);
409 assert_eq!(buf[DATA_LEN + 1], 0);
410 assert_eq!(&buf[1..DATA_LEN + 1], &data[0..DATA_LEN]);
411 assert_eq!(
414 cache.stats(),
415 &Stats { read_count: BLOCKS as u64 + 1, write_count: 0, cache_hits: 0 }
416 );
417 }
418
419 #[fuchsia::test]
420 async fn test_io_read_write_and_seek() {
421 let (_ramdisk, block_client) = make_ramdisk().await;
422 let mut cache = Cache::new(block_client).expect("Cache::new failed");
423 const OFFSET: u64 = 11;
424 const DATA: &[u8] = b"hello";
425 assert_eq!(cache.seek(SeekFrom::Start(OFFSET)).expect("seek failed"), OFFSET);
426 cache.write_all(DATA).expect("cache.write failed");
427 assert_eq!(
428 cache.seek(SeekFrom::Current(-(DATA.len() as i64))).expect("seek failed"),
429 OFFSET
430 );
431 let mut buf = [0u8; 5];
432 assert_eq!(cache.read(&mut buf).expect("cache.read failed"), DATA.len());
433 assert_eq!(&buf, DATA);
434 }
435
436 #[fuchsia::test]
437 async fn test_io_read_write_and_seek_at_max_offset() {
438 let (_ramdisk, block_client) = make_ramdisk().await;
439 let mut cache = Cache::new(block_client).expect("Cache::new failed");
440 const DATA: &[u8] = b"hello";
441 assert_eq!(cache.seek(SeekFrom::End(-1)).expect("seek failed"), RAMDISK_SIZE - 1);
442 assert_eq!(cache.write(DATA).expect("cache.write failed"), 1);
443 assert_eq!(cache.seek(SeekFrom::End(-4)).expect("seek failed"), RAMDISK_SIZE - 4);
444 let mut buf = [0x56u8; 5];
445 assert_eq!(cache.read(&mut buf).expect("cache.read failed"), 4);
446 assert_eq!(&buf, &[0, 0, 0, b'h', 0x56]);
447 }
448
449 #[fuchsia::test]
450 async fn test_read_beyond_max_offset_returns_error() {
451 let (_ramdisk, block_client) = make_ramdisk().await;
452 let mut cache = Cache::new(block_client).expect("Cache::new failed");
453 let mut buf = [0u8; 2];
454 cache.read_at(&mut buf, RAMDISK_SIZE).expect_err("read_at succeeded");
455 cache.read_at(&mut buf, RAMDISK_SIZE - 1).expect_err("read_at succeeded");
456 }
457
458 #[fuchsia::test]
459 async fn test_write_beyond_max_offset_returns_error() {
460 let (_ramdisk, block_client) = make_ramdisk().await;
461 let mut cache = Cache::new(block_client).expect("Cache::new failed");
462 let buf = [0u8; 2];
463 cache.write_at(&buf, RAMDISK_SIZE).expect_err("write_at succeeded");
464 cache.write_at(&buf, RAMDISK_SIZE - 1).expect_err("write_at succeeded");
465 }
466
467 #[fuchsia::test]
468 async fn test_read_with_overflow_returns_error() {
469 let (_ramdisk, block_client) = make_ramdisk().await;
470 let mut cache = Cache::new(block_client).expect("Cache::new failed");
471 let mut buf = [0u8; 2];
472 cache.read_at(&mut buf, u64::MAX - 1).expect_err("read_at succeeded");
473 }
474
475 #[fuchsia::test]
476 async fn test_write_with_overflow_returns_error() {
477 let (_ramdisk, block_client) = make_ramdisk().await;
478 let mut cache = Cache::new(block_client).expect("Cache::new failed");
479 let buf = [0u8; 2];
480 cache.write_at(&buf, u64::MAX - 1).expect_err("write_at succeeded");
481 }
482
483 #[fuchsia::test]
484 async fn test_read_and_write_at_max_offset_suceeds() {
485 let (_ramdisk, block_client) = make_ramdisk().await;
486 let mut cache = Cache::new(block_client).expect("Cache::new failed");
487 let buf = [0xd4u8; 2];
488 cache.write_at(&buf, RAMDISK_SIZE - buf.len() as u64).expect("write_at failed");
489 let mut read_buf = [0xf3u8; 2];
490 cache.read_at(&mut read_buf, RAMDISK_SIZE - buf.len() as u64).expect("read_at failed");
491 assert_eq!(&buf, &read_buf);
492 }
493
494 #[fuchsia::test]
495 async fn test_seek_with_bad_delta_returns_error() {
496 let (_ramdisk, block_client) = make_ramdisk().await;
497 let mut cache = Cache::new(block_client).expect("Cache::new failed");
498 cache.seek(SeekFrom::End(-(RAMDISK_SIZE as i64) - 1)).expect_err("seek suceeded");
499 cache.seek(SeekFrom::Current(-1)).expect_err("seek succeeded");
500 }
501
502 #[fuchsia::test]
503 async fn test_ramdisk_with_large_block_size_returns_error() {
504 let ramdisk = RamdiskClient::create(super::BLOCK_SIZE * 2, 10)
505 .await
506 .expect("RamdiskClient::create failed");
507 let client_end = ramdisk.open().expect("ramdisk.open failed");
508 let block_client =
509 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
510 Cache::new(block_client).err().expect("Cache::new succeeded");
511 }
512}