block_client/
cache.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{BufferSlice, MutableBufferSlice, RemoteBlockClientSync, VmoId};
6use anyhow::{ensure, Error};
7
8use linked_hash_map::LinkedHashMap;
9use log::error;
10use std::io::{SeekFrom, Write};
11
12const VMO_SIZE: u64 = 262_144;
13const BLOCK_SIZE: u64 = 8192;
14const BLOCK_COUNT: usize = (VMO_SIZE / BLOCK_SIZE) as usize;
15
16struct CacheEntry {
17    vmo_offset: u64,
18    dirty: bool,
19}
20
21#[derive(Debug, Default, Eq, PartialEq)]
22pub struct Stats {
23    read_count: u64,
24    write_count: u64,
25    cache_hits: u64,
26}
27
28/// Wraps RemoteBlockDeviceSync providing a simple LRU cache and trait implementations for
29/// std::io::{Read, Seek, Write}. This is unlikely to be performant; the implementation is single
30/// threaded. The cache works by dividing up a VMO into BLOCK_COUNT blocks of BLOCK_SIZE bytes, and
31/// maintaining mappings from device offsets to offsets in the VMO.
32pub struct Cache {
33    device: RemoteBlockClientSync,
34    vmo: zx::Vmo,
35    vmo_id: VmoId,
36    map: LinkedHashMap<u64, CacheEntry>,
37    offset: u64, // For std::io::{Read, Seek, Write}
38    stats: Stats,
39}
40
41impl Cache {
42    /// Returns a new Cache wrapping the given RemoteBlockClientSync.
43    pub fn new(device: RemoteBlockClientSync) -> Result<Self, Error> {
44        ensure!(
45            BLOCK_SIZE % device.block_size() as u64 == 0,
46            "underlying block size not supported"
47        );
48        let vmo = zx::Vmo::create(VMO_SIZE)?;
49        let vmo_id = device.attach_vmo(&vmo)?;
50        Ok(Self {
51            device,
52            vmo,
53            vmo_id,
54            map: Default::default(),
55            offset: 0,
56            stats: Stats::default(),
57        })
58    }
59
60    fn device_size(&self) -> u64 {
61        self.device.block_count() * self.device.block_size() as u64
62    }
63
64    // Finds a block that can be used for the given offset, marking dirty if requested. Returns a
65    // tuple with the VMO offset and whether it was a cache hit. If not a cache hit, the caller is
66    // responsible for initializing the data and inserting a cache entry.
67    fn get_block(&mut self, offset: u64, mark_dirty: bool) -> Result<(u64, bool), Error> {
68        if let Some(ref mut entry) = self.map.get_refresh(&offset) {
69            self.stats.cache_hits += 1;
70            if mark_dirty {
71                entry.dirty = true;
72            }
73            Ok((entry.vmo_offset, true))
74        } else {
75            let vmo_offset = if self.map.len() < BLOCK_COUNT {
76                self.map.len() as u64 * BLOCK_SIZE
77            } else {
78                let entry = self.map.pop_front().unwrap();
79                if entry.1.dirty {
80                    self.stats.write_count += 1;
81                    self.device.write_at(
82                        BufferSlice::new_with_vmo_id(
83                            &self.vmo_id,
84                            entry.1.vmo_offset,
85                            std::cmp::min(BLOCK_SIZE, self.device_size() - entry.0),
86                        ),
87                        entry.0,
88                    )?;
89                }
90                entry.1.vmo_offset
91            };
92            Ok((vmo_offset, false))
93        }
94    }
95
96    // Reads the block at the given offset and marks it dirty if requested. Returns the offset in
97    // the VMO.
98    fn read_block(&mut self, offset: u64, mark_dirty: bool) -> Result<u64, Error> {
99        let (vmo_offset, hit) = self.get_block(offset, mark_dirty)?;
100        if !hit {
101            self.stats.read_count += 1;
102            self.device.read_at(
103                MutableBufferSlice::new_with_vmo_id(
104                    &self.vmo_id,
105                    vmo_offset,
106                    std::cmp::min(BLOCK_SIZE, self.device_size() - offset),
107                ),
108                offset,
109            )?;
110            self.map.insert(offset, CacheEntry { vmo_offset, dirty: mark_dirty });
111        }
112        Ok(vmo_offset)
113    }
114
115    /// Reads at |offset| into |buf|.
116    pub fn read_at(&mut self, mut buf: &mut [u8], offset: u64) -> Result<(), Error> {
117        ensure!(
118            offset <= self.device_size() && buf.len() as u64 <= self.device_size() - offset,
119            "read exceeds device size"
120        );
121
122        // Start by reading the head.
123        let mut aligned_offset = offset - offset % BLOCK_SIZE;
124        let end = offset + buf.len() as u64;
125        if aligned_offset < offset {
126            let vmo_offset = self.read_block(aligned_offset, false)?;
127            let to_copy = std::cmp::min(aligned_offset + BLOCK_SIZE, end) - offset;
128            self.vmo.read(&mut buf[..to_copy as usize], vmo_offset + offset - aligned_offset)?;
129            aligned_offset += BLOCK_SIZE;
130            buf = &mut buf[to_copy as usize..];
131        }
132
133        // Now do whole blocks.
134        while aligned_offset + BLOCK_SIZE <= end {
135            let vmo_offset = self.read_block(aligned_offset, false)?;
136            self.vmo.read(&mut buf[..BLOCK_SIZE as usize], vmo_offset)?;
137            aligned_offset += BLOCK_SIZE;
138            buf = &mut buf[BLOCK_SIZE as usize..];
139        }
140
141        // And finally the tail.
142        if end > aligned_offset {
143            let vmo_offset = self.read_block(aligned_offset, false)?;
144            self.vmo.read(buf, vmo_offset)?;
145        }
146        Ok(())
147    }
148
149    /// Writes from |buf| to |offset|.
150    pub fn write_at(&mut self, mut buf: &[u8], offset: u64) -> Result<(), Error> {
151        ensure!(
152            offset <= self.device_size() && buf.len() as u64 <= self.device_size() - offset,
153            "write exceeds device size"
154        );
155
156        // Start by writing the head.
157        let mut aligned_offset = offset - offset % BLOCK_SIZE;
158        let end = offset + buf.len() as u64;
159        if aligned_offset < offset {
160            let vmo_offset = self.read_block(aligned_offset, true)?;
161            let to_copy = std::cmp::min(aligned_offset + BLOCK_SIZE, end) - offset;
162            self.vmo.write(&buf[..to_copy as usize], vmo_offset + offset - aligned_offset)?;
163            aligned_offset += BLOCK_SIZE;
164            buf = &buf[to_copy as usize..];
165        }
166
167        // Now do whole blocks.
168        while aligned_offset + BLOCK_SIZE <= end {
169            let (vmo_offset, hit) = self.get_block(aligned_offset, true)?;
170            self.vmo.write(&buf[..BLOCK_SIZE as usize], vmo_offset)?;
171            if !hit {
172                self.map.insert(aligned_offset, CacheEntry { vmo_offset, dirty: true });
173            }
174            aligned_offset += BLOCK_SIZE;
175            buf = &buf[BLOCK_SIZE as usize..];
176        }
177
178        // And finally the tail.
179        if end > aligned_offset {
180            let vmo_offset = self.read_block(aligned_offset, true)?;
181            self.vmo.write(buf, vmo_offset)?;
182        }
183        Ok(())
184    }
185
186    /// Returns statistics.
187    pub fn stats(&self) -> &Stats {
188        &self.stats
189    }
190
191    /// Returns a reference to the underlying device
192    /// Can be used for additional control, like instructing the device to flush any written data
193    pub fn device(&self) -> &RemoteBlockClientSync {
194        &self.device
195    }
196
197    pub fn flush_device(&self) -> Result<(), Error> {
198        Ok(self.device.flush()?)
199    }
200}
201
202impl Drop for Cache {
203    fn drop(&mut self) {
204        if let Err(e) = self.flush() {
205            error!("Flush failed: {}", e);
206        }
207        let _ = self.vmo_id.take().into_id(); // Ok to leak because fifo will be closed.
208    }
209}
210
211fn into_io_error<E: Into<Box<dyn std::error::Error + Send + Sync>>>(error: E) -> std::io::Error {
212    std::io::Error::other(error)
213}
214
215impl std::io::Read for Cache {
216    fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
217        if self.offset > self.device_size() {
218            return Ok(0);
219        }
220        let max_len = self.device_size() - self.offset;
221        if buf.len() as u64 > max_len {
222            buf = &mut buf[0..max_len as usize];
223        }
224        self.read_at(buf, self.offset).map_err(into_io_error)?;
225        self.offset += buf.len() as u64;
226        Ok(buf.len())
227    }
228}
229
230impl Write for Cache {
231    fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
232        if self.offset > self.device_size() {
233            return Ok(0);
234        }
235        let max_len = self.device_size() - self.offset;
236        if buf.len() as u64 > max_len {
237            buf = &buf[0..max_len as usize];
238        }
239        self.write_at(&buf, self.offset).map_err(into_io_error)?;
240        self.offset += buf.len() as u64;
241        Ok(buf.len())
242    }
243
244    /// This does *not* issue a flush to the underlying block device; this will only send the
245    /// writes.
246    fn flush(&mut self) -> std::io::Result<()> {
247        let max = self.device_size();
248        for mut entry in self.map.entries() {
249            if entry.get().dirty {
250                self.stats.write_count += 1;
251                self.device
252                    .write_at(
253                        BufferSlice::new_with_vmo_id(
254                            &self.vmo_id,
255                            entry.get().vmo_offset,
256                            std::cmp::min(BLOCK_SIZE, max - *entry.key()),
257                        ),
258                        *entry.key(),
259                    )
260                    .map_err(into_io_error)?;
261                entry.get_mut().dirty = false;
262            }
263        }
264        Ok(())
265    }
266}
267
268impl std::io::Seek for Cache {
269    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
270        self.offset = match pos {
271            SeekFrom::Start(offset) => Some(offset),
272            SeekFrom::End(delta) => {
273                if delta >= 0 {
274                    self.device_size().checked_add(delta as u64)
275                } else {
276                    self.device_size().checked_sub(-delta as u64)
277                }
278            }
279            SeekFrom::Current(delta) => {
280                if delta >= 0 {
281                    self.offset.checked_add(delta as u64)
282                } else {
283                    self.offset.checked_sub(-delta as u64)
284                }
285            }
286        }
287        .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "bad delta"))?;
288        Ok(self.offset)
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::{Cache, Stats};
295    use crate::RemoteBlockClientSync;
296    use ramdevice_client::RamdiskClient;
297    use std::io::{Read as _, Seek as _, SeekFrom, Write as _};
298
299    const RAMDISK_BLOCK_SIZE: u64 = 1024;
300    const RAMDISK_BLOCK_COUNT: u64 = 1023; // Deliberate for testing max offset.
301    const RAMDISK_SIZE: u64 = RAMDISK_BLOCK_SIZE * RAMDISK_BLOCK_COUNT;
302
303    pub async fn make_ramdisk() -> (RamdiskClient, RemoteBlockClientSync) {
304        let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
305            .await
306            .expect("RamdiskClient::create failed");
307        let client_end = ramdisk.open().expect("ramdisk.open failed");
308        let block_client =
309            RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
310        (ramdisk, block_client)
311    }
312
313    #[fuchsia::test]
314    async fn test_cache_read_at_and_write_at_with_no_hits() {
315        let (_ramdisk, block_client) = make_ramdisk().await;
316        let mut cache = Cache::new(block_client).expect("Cache::new failed");
317        let mut offset = 5;
318        const TEST_COUNT: usize = super::BLOCK_COUNT * 2; // Chosen so there are no cache hits.
319        const DATA: &[u8] = b"hello";
320        for _ in 0..TEST_COUNT {
321            cache.write_at(DATA, offset).expect("cache.write failed");
322            // The delta here is deliberately chosen to catch mistakes such as returning data from
323            // the wrong block.
324            offset += super::BLOCK_SIZE + 1;
325        }
326        assert_eq!(
327            cache.stats(),
328            &Stats {
329                read_count: TEST_COUNT as u64,
330                write_count: super::BLOCK_COUNT as u64,
331                cache_hits: 0
332            }
333        );
334        offset = 5;
335        for _ in 0..TEST_COUNT {
336            let mut buf = [0; 5];
337            cache.read_at(&mut buf, offset).expect("cache.read_at failed");
338            assert_eq!(&buf, DATA);
339            offset += super::BLOCK_SIZE + 1;
340        }
341        assert_eq!(
342            cache.stats(),
343            &Stats {
344                read_count: 2 * TEST_COUNT as u64,
345                write_count: TEST_COUNT as u64,
346                cache_hits: 0
347            }
348        );
349    }
350
351    #[fuchsia::test]
352    async fn test_cache_read_at_and_write_at_with_hit() {
353        let (_ramdisk, block_client) = make_ramdisk().await;
354        let mut cache = Cache::new(block_client).expect("Cache::new failed");
355        const OFFSET: u64 = 11;
356        const DATA: &[u8] = b"hello";
357        cache.write_at(DATA, OFFSET).expect("cache.write failed");
358        let mut buf = [0; 5];
359        cache.read_at(&mut buf, OFFSET).expect("cache.read_at failed");
360        assert_eq!(&buf, DATA);
361        assert_eq!(cache.stats(), &Stats { read_count: 1, write_count: 0, cache_hits: 1 });
362    }
363
364    #[fuchsia::test]
365    async fn test_cache_aligned_read_at_and_write_at() {
366        let (_ramdisk, block_client) = make_ramdisk().await;
367        let mut cache = Cache::new(block_client).expect("Cache::new failed");
368        const OFFSET: u64 = 11;
369        const BLOCKS: usize = 3;
370        const DATA_LEN: usize = super::BLOCK_SIZE as usize * BLOCKS + 11;
371        let data = [0xe2u8; DATA_LEN];
372        // This should require alignment at the start, and at the end with some whole blocks.
373        cache.write_at(&data, OFFSET).expect("cache.write failed");
374        let mut buf = [0; DATA_LEN + 2]; // Read an extra byte at the start and at the end.
375        cache.read_at(&mut buf, OFFSET - 1).expect("cache.read_at failed");
376        assert_eq!(buf[0], 0);
377        assert_eq!(buf[DATA_LEN + 1], 0);
378        assert_eq!(&buf[1..DATA_LEN + 1], &data[0..DATA_LEN]);
379        // We should have only read the first and last blocks. The writes to the whole blocks should
380        // not have triggered reads.
381        assert_eq!(
382            cache.stats(),
383            &Stats { read_count: 2, write_count: 0, cache_hits: BLOCKS as u64 + 1 }
384        );
385    }
386
387    #[fuchsia::test]
388    async fn test_cache_aligned_read_at_and_write_at_cold() {
389        // The same as the previous test, but tear down the cache after the writes.
390        let (ramdisk, block_client) = make_ramdisk().await;
391        let mut cache = Cache::new(block_client).expect("Cache::new failed");
392        const OFFSET: u64 = 11;
393        const BLOCKS: usize = 3;
394        const DATA_LEN: usize = super::BLOCK_SIZE as usize * BLOCKS + 11;
395        let data = [0xe2u8; DATA_LEN];
396        // This should require alignment at the start, and at the end with some whole blocks.
397        cache.write_at(&data, OFFSET).expect("cache.write failed");
398        assert_eq!(cache.stats(), &Stats { read_count: 2, write_count: 0, cache_hits: 0 });
399
400        drop(cache);
401        let client_end = ramdisk.open().expect("ramdisk.open failed");
402        let block_client =
403            RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
404        let mut cache = Cache::new(block_client).expect("Cache::new failed");
405
406        let mut buf = [0; DATA_LEN + 2]; // Read an extra byte at the start and at the end.
407        cache.read_at(&mut buf, OFFSET - 1).expect("cache.read_at failed");
408        assert_eq!(buf[0], 0);
409        assert_eq!(buf[DATA_LEN + 1], 0);
410        assert_eq!(&buf[1..DATA_LEN + 1], &data[0..DATA_LEN]);
411        // We should have only read the first and last blocks. The writes to the whole blocks should
412        // not have triggered reads.
413        assert_eq!(
414            cache.stats(),
415            &Stats { read_count: BLOCKS as u64 + 1, write_count: 0, cache_hits: 0 }
416        );
417    }
418
419    #[fuchsia::test]
420    async fn test_io_read_write_and_seek() {
421        let (_ramdisk, block_client) = make_ramdisk().await;
422        let mut cache = Cache::new(block_client).expect("Cache::new failed");
423        const OFFSET: u64 = 11;
424        const DATA: &[u8] = b"hello";
425        assert_eq!(cache.seek(SeekFrom::Start(OFFSET)).expect("seek failed"), OFFSET);
426        cache.write_all(DATA).expect("cache.write failed");
427        assert_eq!(
428            cache.seek(SeekFrom::Current(-(DATA.len() as i64))).expect("seek failed"),
429            OFFSET
430        );
431        let mut buf = [0u8; 5];
432        assert_eq!(cache.read(&mut buf).expect("cache.read failed"), DATA.len());
433        assert_eq!(&buf, DATA);
434    }
435
436    #[fuchsia::test]
437    async fn test_io_read_write_and_seek_at_max_offset() {
438        let (_ramdisk, block_client) = make_ramdisk().await;
439        let mut cache = Cache::new(block_client).expect("Cache::new failed");
440        const DATA: &[u8] = b"hello";
441        assert_eq!(cache.seek(SeekFrom::End(-1)).expect("seek failed"), RAMDISK_SIZE - 1);
442        assert_eq!(cache.write(DATA).expect("cache.write failed"), 1);
443        assert_eq!(cache.seek(SeekFrom::End(-4)).expect("seek failed"), RAMDISK_SIZE - 4);
444        let mut buf = [0x56u8; 5];
445        assert_eq!(cache.read(&mut buf).expect("cache.read failed"), 4);
446        assert_eq!(&buf, &[0, 0, 0, b'h', 0x56]);
447    }
448
449    #[fuchsia::test]
450    async fn test_read_beyond_max_offset_returns_error() {
451        let (_ramdisk, block_client) = make_ramdisk().await;
452        let mut cache = Cache::new(block_client).expect("Cache::new failed");
453        let mut buf = [0u8; 2];
454        cache.read_at(&mut buf, RAMDISK_SIZE).expect_err("read_at succeeded");
455        cache.read_at(&mut buf, RAMDISK_SIZE - 1).expect_err("read_at succeeded");
456    }
457
458    #[fuchsia::test]
459    async fn test_write_beyond_max_offset_returns_error() {
460        let (_ramdisk, block_client) = make_ramdisk().await;
461        let mut cache = Cache::new(block_client).expect("Cache::new failed");
462        let buf = [0u8; 2];
463        cache.write_at(&buf, RAMDISK_SIZE).expect_err("write_at succeeded");
464        cache.write_at(&buf, RAMDISK_SIZE - 1).expect_err("write_at succeeded");
465    }
466
467    #[fuchsia::test]
468    async fn test_read_with_overflow_returns_error() {
469        let (_ramdisk, block_client) = make_ramdisk().await;
470        let mut cache = Cache::new(block_client).expect("Cache::new failed");
471        let mut buf = [0u8; 2];
472        cache.read_at(&mut buf, u64::MAX - 1).expect_err("read_at succeeded");
473    }
474
475    #[fuchsia::test]
476    async fn test_write_with_overflow_returns_error() {
477        let (_ramdisk, block_client) = make_ramdisk().await;
478        let mut cache = Cache::new(block_client).expect("Cache::new failed");
479        let buf = [0u8; 2];
480        cache.write_at(&buf, u64::MAX - 1).expect_err("write_at succeeded");
481    }
482
483    #[fuchsia::test]
484    async fn test_read_and_write_at_max_offset_suceeds() {
485        let (_ramdisk, block_client) = make_ramdisk().await;
486        let mut cache = Cache::new(block_client).expect("Cache::new failed");
487        let buf = [0xd4u8; 2];
488        cache.write_at(&buf, RAMDISK_SIZE - buf.len() as u64).expect("write_at failed");
489        let mut read_buf = [0xf3u8; 2];
490        cache.read_at(&mut read_buf, RAMDISK_SIZE - buf.len() as u64).expect("read_at failed");
491        assert_eq!(&buf, &read_buf);
492    }
493
494    #[fuchsia::test]
495    async fn test_seek_with_bad_delta_returns_error() {
496        let (_ramdisk, block_client) = make_ramdisk().await;
497        let mut cache = Cache::new(block_client).expect("Cache::new failed");
498        cache.seek(SeekFrom::End(-(RAMDISK_SIZE as i64) - 1)).expect_err("seek suceeded");
499        cache.seek(SeekFrom::Current(-1)).expect_err("seek succeeded");
500    }
501
502    #[fuchsia::test]
503    async fn test_ramdisk_with_large_block_size_returns_error() {
504        let ramdisk = RamdiskClient::create(super::BLOCK_SIZE * 2, 10)
505            .await
506            .expect("RamdiskClient::create failed");
507        let client_end = ramdisk.open().expect("ramdisk.open failed");
508        let block_client =
509            RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
510        Cache::new(block_client).err().expect("Cache::new succeeded");
511    }
512}