fxfs/object_store/
caching_object_handle.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::object_handle::{ObjectHandle, ReadObjectHandle};
7use anyhow::{anyhow, ensure, Error};
8use event_listener::{Event, EventListener};
9use fuchsia_sync::Mutex;
10use std::ops::Deref;
11use std::sync::Arc;
12use storage_device::buffer::BufferFuture;
13
14pub const CHUNK_SIZE: usize = 128 * 1024;
15
16fn block_aligned_size(source: &impl ReadObjectHandle) -> usize {
17    let block_size = source.block_size() as usize;
18    let source_size = source.get_size() as usize;
19    source_size.checked_next_multiple_of(block_size).unwrap()
20}
21
22/// A reference to a chunk of data which is currently in the cache.  The data will be held in the
23/// cache as long as a reference to this chunk exists.
24#[repr(transparent)]
25#[derive(Clone)]
26pub struct CachedChunk(Arc<Box<[u8]>>);
27
28impl std::fmt::Debug for CachedChunk {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
30        f.debug_struct("CachedChunk").field("len", &self.len()).finish()
31    }
32}
33
34impl Deref for CachedChunk {
35    type Target = [u8];
36
37    fn deref(&self) -> &Self::Target {
38        &**self.0
39    }
40}
41
42#[derive(Default, Debug)]
43enum Chunk {
44    #[default]
45    Missing,
46    Pending,
47    Present(CachedChunk),
48    // The chunk has been marked for eviction, and on the next pass it will be deallocated, unless
49    // it is used before then (which returns it to Present).
50    Expired(Box<[u8]>),
51}
52
53impl Chunk {
54    // If the chunk is Present and has no remaining references, move it to Expired.
55    // If the chunk is Expired, move it to Missing, and return its data (so the caller can
56    // deallocate).
57    fn maybe_purge(&mut self) -> Option<Box<[u8]>> {
58        let this = std::mem::take(self);
59        match this {
60            Chunk::Expired(data) => Some(data),
61            Chunk::Present(chunk) => {
62                match Arc::try_unwrap(chunk.0) {
63                    Ok(data) => *self = Chunk::Expired(data),
64                    Err(chunk) => *self = Chunk::Present(CachedChunk(chunk)),
65                }
66                None
67            }
68            _ => {
69                *self = this;
70                None
71            }
72        }
73    }
74}
75
76/// A wrapper handle around a `ReadObjectHandle` which provides a memory cache for its contents.
77/// Contents are fetched as needed and can be evicted (for example due to memory pressure).
78pub struct CachingObjectHandle<S> {
79    source: S,
80
81    // Data is stored in separately accessible chunks.  These can be independently loaded and
82    // evicted.  The size of this array never changes.
83    chunks: Mutex<Vec<Chunk>>,
84
85    // Reads that are waiting on another read to load a chunk will wait on this event. There's only
86    // 1 event for all chunks so a read may receive a notification for a different chunk than the
87    // one it's waiting for and need to re-wait. It's rare for multiple chunks to be loading at once
88    // and even rarer for a read to be waiting on a chunk to be loaded while another chunk is also
89    // loading.
90    event: Event,
91}
92
93// SAFETY: Only `buffer` isn't Sync. Access to `buffer` is synchronized with `chunk_states`.
94unsafe impl<S> Sync for CachingObjectHandle<S> {}
95
96impl<S: ReadObjectHandle> CachingObjectHandle<S> {
97    pub fn new(source: S) -> Self {
98        let block_size = source.block_size() as usize;
99        assert!(CHUNK_SIZE % block_size == 0);
100        let aligned_size = block_aligned_size(&source);
101        let chunk_count = aligned_size.div_ceil(CHUNK_SIZE);
102
103        let mut chunks = Vec::<Chunk>::new();
104        chunks.resize_with(chunk_count, Default::default);
105        Self { source, chunks: Mutex::new(chunks), event: Event::new() }
106    }
107
108    /// Returns a reference to the chunk (up to `CHUNK_SIZE` bytes) containing `offset`.  If the
109    /// data is already cached, this does not require reading from `source`.
110    /// `offset` must be less than the size of `source`.
111    pub async fn read(&self, offset: usize) -> Result<CachedChunk, Error> {
112        ensure!(offset < self.source.get_size() as usize, FxfsError::OutOfRange);
113        let chunk_num = offset / CHUNK_SIZE;
114
115        enum Action {
116            Wait(EventListener),
117            Load,
118        }
119        loop {
120            let action = {
121                let mut chunks = self.chunks.lock();
122                match std::mem::take(&mut chunks[chunk_num]) {
123                    Chunk::Missing => {
124                        chunks[chunk_num] = Chunk::Pending;
125                        Action::Load
126                    }
127                    Chunk::Pending => {
128                        chunks[chunk_num] = Chunk::Pending;
129                        Action::Wait(self.event.listen())
130                    }
131                    Chunk::Present(cached_chunk) => {
132                        chunks[chunk_num] = Chunk::Present(cached_chunk.clone());
133                        return Ok(cached_chunk);
134                    }
135                    Chunk::Expired(data) => {
136                        let cached_chunk = CachedChunk(Arc::new(data));
137                        chunks[chunk_num] = Chunk::Present(cached_chunk.clone());
138                        return Ok(cached_chunk);
139                    }
140                }
141            };
142            match action {
143                Action::Wait(listener) => {
144                    listener.await;
145                }
146                Action::Load => {
147                    // If this future is dropped or reading fails then put the chunk back into the
148                    // `Missing` state.
149                    let drop_guard = scopeguard::guard((), |_| {
150                        {
151                            let mut chunks = self.chunks.lock();
152                            debug_assert!(matches!(chunks[chunk_num], Chunk::Pending));
153                            chunks[chunk_num] = Chunk::Missing;
154                        }
155                        self.event.notify(usize::MAX);
156                    });
157
158                    let read_start = chunk_num * CHUNK_SIZE;
159                    let len =
160                        std::cmp::min(read_start + CHUNK_SIZE, self.source.get_size() as usize)
161                            - read_start;
162                    let aligned_len =
163                        std::cmp::min(read_start + CHUNK_SIZE, block_aligned_size(&self.source))
164                            - read_start;
165
166                    fxfs_trace::duration!(c"CachingObjectHandle::load", "len" => aligned_len);
167
168                    let mut read_buf = self.source.allocate_buffer(aligned_len).await;
169                    let amount_read =
170                        self.source.read(read_start as u64, read_buf.as_mut()).await?;
171                    ensure!(amount_read >= len, anyhow!(FxfsError::Internal).context("Short read"));
172
173                    log::debug!("COH {}: Read {len}@{read_start}", self.source.object_id());
174
175                    let data = Vec::from(&read_buf.as_slice()[..len]).into_boxed_slice();
176                    let cached_chunk = CachedChunk(Arc::new(data));
177
178                    {
179                        let mut chunks = self.chunks.lock();
180                        debug_assert!(matches!(chunks[chunk_num], Chunk::Pending));
181                        chunks[chunk_num] = Chunk::Present(cached_chunk.clone());
182                    }
183                    self.event.notify(usize::MAX);
184
185                    scopeguard::ScopeGuard::into_inner(drop_guard);
186                    return Ok(cached_chunk);
187                }
188            }
189        }
190    }
191
192    /// Purges unused extents, freeing unused memory.  This follows a second-chance algorithm:
193    /// unused extents will be marked for purging the first time this is called, and if they are not
194    /// used again by the next time this is called, they will be deallocated.
195    /// This is intended to be run regularly, e.g. on a timer.
196    pub fn purge(&self) {
197        // Deallocate out of the lock scope, so we don't hold up readers.
198        let mut to_deallocate = vec![];
199        let mut chunks = self.chunks.lock();
200        for chunk in chunks.iter_mut() {
201            if let Some(data) = chunk.maybe_purge() {
202                to_deallocate.push(data);
203            }
204        }
205        log::debug!(
206            "COH {}: Purging {} cached chunks ({} bytes)",
207            self.source.object_id(),
208            to_deallocate.len(),
209            to_deallocate.len() * CHUNK_SIZE
210        );
211    }
212}
213
214impl<S: ReadObjectHandle> ObjectHandle for CachingObjectHandle<S> {
215    fn set_trace(&self, v: bool) {
216        self.source.set_trace(v);
217    }
218
219    fn object_id(&self) -> u64 {
220        self.source.object_id()
221    }
222
223    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
224        self.source.allocate_buffer(size)
225    }
226
227    fn block_size(&self) -> u64 {
228        self.source.block_size()
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::{CachingObjectHandle, CHUNK_SIZE};
235    use crate::object_handle::{ObjectHandle, ReadObjectHandle};
236    use anyhow::{anyhow, ensure, Error};
237    use async_trait::async_trait;
238    use event_listener::Event;
239    use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
240    use std::sync::Arc;
241    use storage_device::buffer::{BufferFuture, MutableBufferRef};
242    use storage_device::fake_device::FakeDevice;
243    use storage_device::Device;
244
245    // Fills a buffer with a pattern seeded by counter.
246    fn fill_buf(buf: &mut [u8], counter: u8) {
247        for (i, chunk) in buf.chunks_exact_mut(2).enumerate() {
248            chunk[0] = counter;
249            chunk[1] = i as u8;
250        }
251    }
252
253    // Returns a buffer filled with fill_buf.
254    fn make_buf(counter: u8, size: usize) -> Vec<u8> {
255        let mut buf = vec![0; size];
256        fill_buf(&mut buf, counter);
257        buf
258    }
259
260    struct FakeSource {
261        device: Arc<dyn Device>,
262        size: usize,
263        started: AtomicBool,
264        allow_reads: AtomicBool,
265        wake: Event,
266        counter: AtomicU8,
267    }
268
269    impl FakeSource {
270        // `device` is only used to provide allocate_buffer; reads don't go to the device.
271        fn new(device: Arc<dyn Device>, size: usize) -> Self {
272            FakeSource {
273                started: AtomicBool::new(false),
274                allow_reads: AtomicBool::new(true),
275                size,
276                wake: Event::new(),
277                device,
278                counter: AtomicU8::new(1),
279            }
280        }
281
282        fn start(&self) {
283            self.started.store(true, Ordering::SeqCst);
284            self.wake.notify(usize::MAX);
285        }
286
287        // Toggle whether reads from source are allowed.  If an uncached read occurs while this was
288        // set to false, the test panics.
289        fn allow_reads(&self, allow: bool) {
290            self.allow_reads.store(allow, Ordering::SeqCst);
291        }
292
293        async fn wait_for_start(&self) {
294            while !self.started.load(Ordering::SeqCst) {
295                let listener = self.wake.listen();
296                if self.started.load(Ordering::SeqCst) {
297                    break;
298                }
299                listener.await;
300            }
301        }
302    }
303
304    #[async_trait]
305    impl ReadObjectHandle for FakeSource {
306        async fn read(&self, _offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
307            ensure!(self.allow_reads.load(Ordering::SeqCst), anyhow!("Received unexpected read"));
308            let counter = self.counter.fetch_add(1, Ordering::Relaxed);
309            self.wait_for_start().await;
310            fill_buf(buf.as_mut_slice(), counter);
311            Ok(buf.len())
312        }
313
314        fn get_size(&self) -> u64 {
315            self.size as u64
316        }
317    }
318
319    impl ObjectHandle for FakeSource {
320        fn object_id(&self) -> u64 {
321            0
322        }
323
324        fn block_size(&self) -> u64 {
325            self.device.block_size().into()
326        }
327
328        fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
329            self.device.allocate_buffer(size)
330        }
331    }
332
333    #[fuchsia::test]
334    async fn test_read_with_missing_chunk() {
335        let device = Arc::new(FakeDevice::new(1024, 512));
336        let source = FakeSource::new(device, 4096);
337        source.start();
338        let caching_object_handle = CachingObjectHandle::new(source);
339
340        let chunk = caching_object_handle.read(0).await.unwrap();
341        assert_eq!(&*chunk, make_buf(1, 4096));
342    }
343
344    #[fuchsia::test]
345    async fn test_read_with_present_chunk() {
346        let device = Arc::new(FakeDevice::new(1024, 512));
347        let source = FakeSource::new(device, 4096);
348        source.start();
349        let caching_object_handle = CachingObjectHandle::new(source);
350
351        let expected = make_buf(1, 4096);
352        let chunk = caching_object_handle.read(0).await.unwrap();
353        assert_eq!(&*chunk, expected);
354
355        // The chunk was already populated so this read receives the same value as the above read.
356        let chunk = caching_object_handle.read(0).await.unwrap();
357        assert_eq!(&*chunk, expected);
358    }
359
360    #[fuchsia::test]
361    async fn test_read_with_pending_chunk() {
362        let device = Arc::new(FakeDevice::new(1024, 512));
363        let source = FakeSource::new(device, 8192);
364        let caching_object_handle = CachingObjectHandle::new(source);
365
366        // The two reads should target the same chunk.
367        let mut read_fut1 = std::pin::pin!(caching_object_handle.read(0));
368        let mut read_fut2 = std::pin::pin!(caching_object_handle.read(4096));
369
370        // The first future will transition the chunk from `Missing` to `Pending` and then wait
371        // on the source.
372        assert!(futures::poll!(&mut read_fut1).is_pending());
373        // The second future will wait on the event.
374        assert!(futures::poll!(&mut read_fut2).is_pending());
375        caching_object_handle.source.start();
376        // Even though the source is ready the second future can't make progress.
377        assert!(futures::poll!(&mut read_fut2).is_pending());
378        // The first future reads from the source, transition the chunk from `Pending` to
379        // `Present`, and then notifies the event.
380        let expected = make_buf(1, 8192);
381        assert_eq!(&*read_fut1.await.unwrap(), expected);
382        // The event has been notified and the second future can now complete.
383        assert_eq!(&*read_fut2.await.unwrap(), expected);
384    }
385
386    #[fuchsia::test]
387    async fn test_read_with_notification_for_other_chunk() {
388        let device = Arc::new(FakeDevice::new(1024, 512));
389        let source = FakeSource::new(device, CHUNK_SIZE + 4096);
390        let caching_object_handle = CachingObjectHandle::new(source);
391
392        let mut read_fut1 = std::pin::pin!(caching_object_handle.read(0));
393        let mut read_fut2 = std::pin::pin!(caching_object_handle.read(CHUNK_SIZE));
394        let mut read_fut3 = std::pin::pin!(caching_object_handle.read(0));
395
396        // The first and second futures will transition their chunks from `Missing` to `Pending`
397        // and then wait on the source.
398        assert!(futures::poll!(&mut read_fut1).is_pending());
399        assert!(futures::poll!(&mut read_fut2).is_pending());
400        // The third future will wait on the event.
401        assert!(futures::poll!(&mut read_fut3).is_pending());
402        caching_object_handle.source.start();
403        // Even though the source is ready the third future can't make progress.
404        assert!(futures::poll!(&mut read_fut3).is_pending());
405        // The second future will read from the source and notify the event.
406        assert_eq!(&*read_fut2.await.unwrap(), make_buf(2, 4096));
407        // The event was notified but the first chunk is still `Pending` so the third future
408        // resumes waiting.
409        assert!(futures::poll!(&mut read_fut3).is_pending());
410        // The first future will read from the source, transition the first chunk to `Present`,
411        // and notify the event.
412        let expected = make_buf(1, CHUNK_SIZE);
413        assert_eq!(&*read_fut1.await.unwrap(), expected);
414        // The first chunk is now present so the third future can complete.
415        assert_eq!(&*read_fut3.await.unwrap(), expected);
416    }
417
418    #[fuchsia::test]
419    async fn test_read_with_dropped_future() {
420        let device = Arc::new(FakeDevice::new(1024, 512));
421        let source = FakeSource::new(device, 4096);
422        let caching_object_handle = CachingObjectHandle::new(source);
423
424        let mut read_fut2 = std::pin::pin!(caching_object_handle.read(0));
425        {
426            let mut read_fut1 = std::pin::pin!(caching_object_handle.read(0));
427
428            // The first future will transition the chunk from `Missing` to `Pending` and then
429            // wait on the source.
430            assert!(futures::poll!(&mut read_fut1).is_pending());
431            // The second future will wait on the event.
432            assert!(futures::poll!(&mut read_fut2).is_pending());
433            caching_object_handle.source.start();
434            // Even though the source is ready the second future can't make progress.
435            assert!(futures::poll!(&mut read_fut2).is_pending());
436        }
437        // The first future was dropped which transitioned the chunk from `Pending` to `Missing`
438        // and notified the event. When the second future is polled it transitions the chunk
439        // from `Missing` back to `Pending`, reads from the source, and then transitions the
440        // chunk to `Present`.
441        assert_eq!(&*read_fut2.await.unwrap(), make_buf(2, 4096));
442    }
443
444    #[fuchsia::test]
445    async fn test_read_past_end_of_source() {
446        let device = Arc::new(FakeDevice::new(1024, 512));
447        let source = FakeSource::new(device, 300);
448        source.start();
449        let caching_object_handle = CachingObjectHandle::new(source);
450
451        caching_object_handle.read(500).await.expect_err("Read should fail");
452    }
453
454    #[fuchsia::test]
455    async fn test_read_to_end_of_source() {
456        let device = Arc::new(FakeDevice::new(1024, 512));
457        const SOURCE_SIZE: usize = 300;
458        let source = FakeSource::new(device, SOURCE_SIZE);
459        source.start();
460        let caching_object_handle = CachingObjectHandle::new(source);
461
462        let chunk = caching_object_handle.read(0).await.unwrap();
463        assert_eq!(&*chunk, make_buf(1, SOURCE_SIZE));
464    }
465
466    #[fuchsia::test]
467    async fn test_chunk_purging() {
468        let device = Arc::new(FakeDevice::new(1024, 512));
469        let source = Arc::new(FakeSource::new(device, CHUNK_SIZE + 4096));
470        source.start();
471        let caching_object_handle =
472            CachingObjectHandle::new(source.clone() as Arc<dyn ReadObjectHandle>);
473
474        let _chunk1 = caching_object_handle.read(0).await.unwrap();
475        // Immediately drop the second chunk.
476        caching_object_handle.read(CHUNK_SIZE).await.unwrap();
477
478        source.allow_reads(false);
479
480        // The first purge should not evict the second chunk yet, and the read should save the chunk
481        // from being evicted by the next purge too.
482        caching_object_handle.purge();
483        caching_object_handle.read(0).await.unwrap();
484        caching_object_handle.read(CHUNK_SIZE).await.unwrap();
485
486        caching_object_handle.purge();
487        caching_object_handle.read(0).await.unwrap();
488        caching_object_handle.read(CHUNK_SIZE).await.unwrap();
489
490        // Purging twice should result in evicting the second chunk.
491        caching_object_handle.purge();
492        caching_object_handle.purge();
493        caching_object_handle.read(0).await.unwrap();
494        caching_object_handle.read(CHUNK_SIZE).await.expect_err("Chunk was not purged");
495    }
496}