use super::{BufferSlice, MutableBufferSlice, RemoteBlockClientSync, VmoId};
use anyhow::{ensure, Error};
use linked_hash_map::LinkedHashMap;
use std::io::{SeekFrom, Write};
use tracing::error;
const VMO_SIZE: u64 = 262_144;
const BLOCK_SIZE: u64 = 8192;
const BLOCK_COUNT: usize = (VMO_SIZE / BLOCK_SIZE) as usize;
struct CacheEntry {
vmo_offset: u64,
dirty: bool,
}
#[derive(Debug, Default, Eq, PartialEq)]
pub struct Stats {
read_count: u64,
write_count: u64,
cache_hits: u64,
}
pub struct Cache {
device: RemoteBlockClientSync,
vmo: zx::Vmo,
vmo_id: VmoId,
map: LinkedHashMap<u64, CacheEntry>,
offset: u64, stats: Stats,
}
impl Cache {
pub fn new(device: RemoteBlockClientSync) -> Result<Self, Error> {
ensure!(
BLOCK_SIZE % device.block_size() as u64 == 0,
"underlying block size not supported"
);
let vmo = zx::Vmo::create(VMO_SIZE)?;
let vmo_id = device.attach_vmo(&vmo)?;
Ok(Self {
device,
vmo,
vmo_id,
map: Default::default(),
offset: 0,
stats: Stats::default(),
})
}
fn device_size(&self) -> u64 {
self.device.block_count() * self.device.block_size() as u64
}
fn get_block(&mut self, offset: u64, mark_dirty: bool) -> Result<(u64, bool), Error> {
if let Some(ref mut entry) = self.map.get_refresh(&offset) {
self.stats.cache_hits += 1;
if mark_dirty {
entry.dirty = true;
}
Ok((entry.vmo_offset, true))
} else {
let vmo_offset = if self.map.len() < BLOCK_COUNT {
self.map.len() as u64 * BLOCK_SIZE
} else {
let entry = self.map.pop_front().unwrap();
if entry.1.dirty {
self.stats.write_count += 1;
self.device.write_at(
BufferSlice::new_with_vmo_id(
&self.vmo_id,
entry.1.vmo_offset,
std::cmp::min(BLOCK_SIZE, self.device_size() - entry.0),
),
entry.0,
)?;
}
entry.1.vmo_offset
};
Ok((vmo_offset, false))
}
}
fn read_block(&mut self, offset: u64, mark_dirty: bool) -> Result<u64, Error> {
let (vmo_offset, hit) = self.get_block(offset, mark_dirty)?;
if !hit {
self.stats.read_count += 1;
self.device.read_at(
MutableBufferSlice::new_with_vmo_id(
&self.vmo_id,
vmo_offset,
std::cmp::min(BLOCK_SIZE, self.device_size() - offset),
),
offset,
)?;
self.map.insert(offset, CacheEntry { vmo_offset, dirty: mark_dirty });
}
Ok(vmo_offset)
}
pub fn read_at(&mut self, mut buf: &mut [u8], offset: u64) -> Result<(), Error> {
ensure!(
offset <= self.device_size() && buf.len() as u64 <= self.device_size() - offset,
"read exceeds device size"
);
let mut aligned_offset = offset - offset % BLOCK_SIZE;
let end = offset + buf.len() as u64;
if aligned_offset < offset {
let vmo_offset = self.read_block(aligned_offset, false)?;
let to_copy = std::cmp::min(aligned_offset + BLOCK_SIZE, end) - offset;
self.vmo.read(&mut buf[..to_copy as usize], vmo_offset + offset - aligned_offset)?;
aligned_offset += BLOCK_SIZE;
buf = &mut buf[to_copy as usize..];
}
while aligned_offset + BLOCK_SIZE <= end {
let vmo_offset = self.read_block(aligned_offset, false)?;
self.vmo.read(&mut buf[..BLOCK_SIZE as usize], vmo_offset)?;
aligned_offset += BLOCK_SIZE;
buf = &mut buf[BLOCK_SIZE as usize..];
}
if end > aligned_offset {
let vmo_offset = self.read_block(aligned_offset, false)?;
self.vmo.read(buf, vmo_offset)?;
}
Ok(())
}
pub fn write_at(&mut self, mut buf: &[u8], offset: u64) -> Result<(), Error> {
ensure!(
offset <= self.device_size() && buf.len() as u64 <= self.device_size() - offset,
"write exceeds device size"
);
let mut aligned_offset = offset - offset % BLOCK_SIZE;
let end = offset + buf.len() as u64;
if aligned_offset < offset {
let vmo_offset = self.read_block(aligned_offset, true)?;
let to_copy = std::cmp::min(aligned_offset + BLOCK_SIZE, end) - offset;
self.vmo.write(&buf[..to_copy as usize], vmo_offset + offset - aligned_offset)?;
aligned_offset += BLOCK_SIZE;
buf = &buf[to_copy as usize..];
}
while aligned_offset + BLOCK_SIZE <= end {
let (vmo_offset, hit) = self.get_block(aligned_offset, true)?;
self.vmo.write(&buf[..BLOCK_SIZE as usize], vmo_offset)?;
if !hit {
self.map.insert(aligned_offset, CacheEntry { vmo_offset, dirty: true });
}
aligned_offset += BLOCK_SIZE;
buf = &buf[BLOCK_SIZE as usize..];
}
if end > aligned_offset {
let vmo_offset = self.read_block(aligned_offset, true)?;
self.vmo.write(buf, vmo_offset)?;
}
Ok(())
}
pub fn stats(&self) -> &Stats {
&self.stats
}
pub fn device(&self) -> &RemoteBlockClientSync {
&self.device
}
pub fn flush_device(&self) -> Result<(), Error> {
Ok(self.device.flush()?)
}
}
impl Drop for Cache {
fn drop(&mut self) {
if let Err(e) = self.flush() {
error!("Flush failed: {}", e);
}
let _ = self.vmo_id.take().into_id(); }
}
fn into_io_error<E: Into<Box<dyn std::error::Error + Send + Sync>>>(error: E) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::Other, error)
}
impl std::io::Read for Cache {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
if self.offset > self.device_size() {
return Ok(0);
}
let max_len = self.device_size() - self.offset;
if buf.len() as u64 > max_len {
buf = &mut buf[0..max_len as usize];
}
self.read_at(buf, self.offset).map_err(into_io_error)?;
self.offset += buf.len() as u64;
Ok(buf.len())
}
}
impl Write for Cache {
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
if self.offset > self.device_size() {
return Ok(0);
}
let max_len = self.device_size() - self.offset;
if buf.len() as u64 > max_len {
buf = &buf[0..max_len as usize];
}
self.write_at(&buf, self.offset).map_err(into_io_error)?;
self.offset += buf.len() as u64;
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
let max = self.device_size();
for mut entry in self.map.entries() {
if entry.get().dirty {
self.stats.write_count += 1;
self.device
.write_at(
BufferSlice::new_with_vmo_id(
&self.vmo_id,
entry.get().vmo_offset,
std::cmp::min(BLOCK_SIZE, max - *entry.key()),
),
*entry.key(),
)
.map_err(into_io_error)?;
entry.get_mut().dirty = false;
}
}
Ok(())
}
}
impl std::io::Seek for Cache {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.offset = match pos {
SeekFrom::Start(offset) => Some(offset),
SeekFrom::End(delta) => {
if delta >= 0 {
self.device_size().checked_add(delta as u64)
} else {
self.device_size().checked_sub(-delta as u64)
}
}
SeekFrom::Current(delta) => {
if delta >= 0 {
self.offset.checked_add(delta as u64)
} else {
self.offset.checked_sub(-delta as u64)
}
}
}
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "bad delta"))?;
Ok(self.offset)
}
}
#[cfg(test)]
mod tests {
use super::{Cache, Stats};
use crate::RemoteBlockClientSync;
use ramdevice_client::RamdiskClient;
use std::io::{Read as _, Seek as _, SeekFrom, Write as _};
const RAMDISK_BLOCK_SIZE: u64 = 1024;
const RAMDISK_BLOCK_COUNT: u64 = 1023; const RAMDISK_SIZE: u64 = RAMDISK_BLOCK_SIZE * RAMDISK_BLOCK_COUNT;
pub async fn make_ramdisk() -> (RamdiskClient, RemoteBlockClientSync) {
let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
.await
.expect("RamdiskClient::create failed");
let client_end = ramdisk.open().expect("ramdisk.open failed");
let block_client =
RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
(ramdisk, block_client)
}
#[fuchsia::test]
async fn test_cache_read_at_and_write_at_with_no_hits() {
let (_ramdisk, block_client) = make_ramdisk().await;
let mut cache = Cache::new(block_client).expect("Cache::new failed");
let mut offset = 5;
const TEST_COUNT: usize = super::BLOCK_COUNT * 2; const DATA: &[u8] = b"hello";
for _ in 0..TEST_COUNT {
cache.write_at(DATA, offset).expect("cache.write failed");
offset += super::BLOCK_SIZE + 1;
}
assert_eq!(
cache.stats(),
&Stats {
read_count: TEST_COUNT as u64,
write_count: super::BLOCK_COUNT as u64,
cache_hits: 0
}
);
offset = 5;
for _ in 0..TEST_COUNT {
let mut buf = [0; 5];
cache.read_at(&mut buf, offset).expect("cache.read_at failed");
assert_eq!(&buf, DATA);
offset += super::BLOCK_SIZE + 1;
}
assert_eq!(
cache.stats(),
&Stats {
read_count: 2 * TEST_COUNT as u64,
write_count: TEST_COUNT as u64,
cache_hits: 0
}
);
}
#[fuchsia::test]
async fn test_cache_read_at_and_write_at_with_hit() {
let (_ramdisk, block_client) = make_ramdisk().await;
let mut cache = Cache::new(block_client).expect("Cache::new failed");
const OFFSET: u64 = 11;
const DATA: &[u8] = b"hello";
cache.write_at(DATA, OFFSET).expect("cache.write failed");
let mut buf = [0; 5];
cache.read_at(&mut buf, OFFSET).expect("cache.read_at failed");
assert_eq!(&buf, DATA);
assert_eq!(cache.stats(), &Stats { read_count: 1, write_count: 0, cache_hits: 1 });
}
#[fuchsia::test]
async fn test_cache_aligned_read_at_and_write_at() {
let (_ramdisk, block_client) = make_ramdisk().await;
let mut cache = Cache::new(block_client).expect("Cache::new failed");
const OFFSET: u64 = 11;
const BLOCKS: usize = 3;
const DATA_LEN: usize = super::BLOCK_SIZE as usize * BLOCKS + 11;
let data = [0xe2u8; DATA_LEN];
cache.write_at(&data, OFFSET).expect("cache.write failed");
let mut buf = [0; DATA_LEN + 2]; cache.read_at(&mut buf, OFFSET - 1).expect("cache.read_at failed");
assert_eq!(buf[0], 0);
assert_eq!(buf[DATA_LEN + 1], 0);
assert_eq!(&buf[1..DATA_LEN + 1], &data[0..DATA_LEN]);
assert_eq!(
cache.stats(),
&Stats { read_count: 2, write_count: 0, cache_hits: BLOCKS as u64 + 1 }
);
}
#[fuchsia::test]
async fn test_cache_aligned_read_at_and_write_at_cold() {
let (ramdisk, block_client) = make_ramdisk().await;
let mut cache = Cache::new(block_client).expect("Cache::new failed");
const OFFSET: u64 = 11;
const BLOCKS: usize = 3;
const DATA_LEN: usize = super::BLOCK_SIZE as usize * BLOCKS + 11;
let data = [0xe2u8; DATA_LEN];
cache.write_at(&data, OFFSET).expect("cache.write failed");
assert_eq!(cache.stats(), &Stats { read_count: 2, write_count: 0, cache_hits: 0 });
drop(cache);
let client_end = ramdisk.open().expect("ramdisk.open failed");
let block_client =
RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
let mut cache = Cache::new(block_client).expect("Cache::new failed");
let mut buf = [0; DATA_LEN + 2]; cache.read_at(&mut buf, OFFSET - 1).expect("cache.read_at failed");
assert_eq!(buf[0], 0);
assert_eq!(buf[DATA_LEN + 1], 0);
assert_eq!(&buf[1..DATA_LEN + 1], &data[0..DATA_LEN]);
assert_eq!(
cache.stats(),
&Stats { read_count: BLOCKS as u64 + 1, write_count: 0, cache_hits: 0 }
);
}
#[fuchsia::test]
async fn test_io_read_write_and_seek() {
let (_ramdisk, block_client) = make_ramdisk().await;
let mut cache = Cache::new(block_client).expect("Cache::new failed");
const OFFSET: u64 = 11;
const DATA: &[u8] = b"hello";
assert_eq!(cache.seek(SeekFrom::Start(OFFSET)).expect("seek failed"), OFFSET);
cache.write_all(DATA).expect("cache.write failed");
assert_eq!(
cache.seek(SeekFrom::Current(-(DATA.len() as i64))).expect("seek failed"),
OFFSET
);
let mut buf = [0u8; 5];
assert_eq!(cache.read(&mut buf).expect("cache.read failed"), DATA.len());
assert_eq!(&buf, DATA);
}
#[fuchsia::test]
async fn test_io_read_write_and_seek_at_max_offset() {
let (_ramdisk, block_client) = make_ramdisk().await;
let mut cache = Cache::new(block_client).expect("Cache::new failed");
const DATA: &[u8] = b"hello";
assert_eq!(cache.seek(SeekFrom::End(-1)).expect("seek failed"), RAMDISK_SIZE - 1);
assert_eq!(cache.write(DATA).expect("cache.write failed"), 1);
assert_eq!(cache.seek(SeekFrom::End(-4)).expect("seek failed"), RAMDISK_SIZE - 4);
let mut buf = [0x56u8; 5];
assert_eq!(cache.read(&mut buf).expect("cache.read failed"), 4);
assert_eq!(&buf, &[0, 0, 0, b'h', 0x56]);
}
#[fuchsia::test]
async fn test_read_beyond_max_offset_returns_error() {
let (_ramdisk, block_client) = make_ramdisk().await;
let mut cache = Cache::new(block_client).expect("Cache::new failed");
let mut buf = [0u8; 2];
cache.read_at(&mut buf, RAMDISK_SIZE).expect_err("read_at succeeded");
cache.read_at(&mut buf, RAMDISK_SIZE - 1).expect_err("read_at succeeded");
}
#[fuchsia::test]
async fn test_write_beyond_max_offset_returns_error() {
let (_ramdisk, block_client) = make_ramdisk().await;
let mut cache = Cache::new(block_client).expect("Cache::new failed");
let buf = [0u8; 2];
cache.write_at(&buf, RAMDISK_SIZE).expect_err("write_at succeeded");
cache.write_at(&buf, RAMDISK_SIZE - 1).expect_err("write_at succeeded");
}
#[fuchsia::test]
async fn test_read_with_overflow_returns_error() {
let (_ramdisk, block_client) = make_ramdisk().await;
let mut cache = Cache::new(block_client).expect("Cache::new failed");
let mut buf = [0u8; 2];
cache.read_at(&mut buf, u64::MAX - 1).expect_err("read_at succeeded");
}
#[fuchsia::test]
async fn test_write_with_overflow_returns_error() {
let (_ramdisk, block_client) = make_ramdisk().await;
let mut cache = Cache::new(block_client).expect("Cache::new failed");
let buf = [0u8; 2];
cache.write_at(&buf, u64::MAX - 1).expect_err("write_at succeeded");
}
#[fuchsia::test]
async fn test_read_and_write_at_max_offset_suceeds() {
let (_ramdisk, block_client) = make_ramdisk().await;
let mut cache = Cache::new(block_client).expect("Cache::new failed");
let buf = [0xd4u8; 2];
cache.write_at(&buf, RAMDISK_SIZE - buf.len() as u64).expect("write_at failed");
let mut read_buf = [0xf3u8; 2];
cache.read_at(&mut read_buf, RAMDISK_SIZE - buf.len() as u64).expect("read_at failed");
assert_eq!(&buf, &read_buf);
}
#[fuchsia::test]
async fn test_seek_with_bad_delta_returns_error() {
let (_ramdisk, block_client) = make_ramdisk().await;
let mut cache = Cache::new(block_client).expect("Cache::new failed");
cache.seek(SeekFrom::End(-(RAMDISK_SIZE as i64) - 1)).expect_err("seek suceeded");
cache.seek(SeekFrom::Current(-1)).expect_err("seek succeeded");
}
#[fuchsia::test]
async fn test_ramdisk_with_large_block_size_returns_error() {
let ramdisk = RamdiskClient::create(super::BLOCK_SIZE * 2, 10)
.await
.expect("RamdiskClient::create failed");
let client_end = ramdisk.open().expect("ramdisk.open failed");
let block_client =
RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
Cache::new(block_client).err().expect("Cache::new succeeded");
}
}