fxfs/object_store/
caching_object_handle.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::object_handle::{ObjectHandle, ReadObjectHandle};
7use anyhow::{Error, anyhow, ensure};
8use event_listener::{Event, EventListener};
9use fuchsia_sync::Mutex;
10use std::ops::Deref;
11use std::sync::Arc;
12use storage_device::buffer::BufferFuture;
13
14pub const CHUNK_SIZE: usize = 128 * 1024;
15
16fn block_aligned_size(source: &impl ReadObjectHandle) -> usize {
17    let block_size = source.block_size() as usize;
18    let source_size = source.get_size() as usize;
19    source_size.checked_next_multiple_of(block_size).unwrap()
20}
21
22/// A reference to a chunk of data which is currently in the cache.  The data will be held in the
23/// cache as long as a reference to this chunk exists.
24#[repr(transparent)]
25#[derive(Clone)]
26pub struct CachedChunk(Arc<Box<[u8]>>);
27
28impl std::fmt::Debug for CachedChunk {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
30        f.debug_struct("CachedChunk").field("len", &self.len()).finish()
31    }
32}
33
34impl Deref for CachedChunk {
35    type Target = [u8];
36
37    fn deref(&self) -> &Self::Target {
38        &**self.0
39    }
40}
41
42#[derive(Default, Debug)]
43enum Chunk {
44    #[default]
45    Missing,
46    Pending,
47    Present(CachedChunk),
48    // The chunk has been marked for eviction, and on the next pass it will be deallocated, unless
49    // it is used before then (which returns it to Present).
50    Expired(Box<[u8]>),
51}
52
53impl Chunk {
54    // If the chunk is Present and has no remaining references, move it to Expired.
55    // If the chunk is Expired, move it to Missing, and return its data (so the caller can
56    // deallocate).
57    fn maybe_purge(&mut self) -> Option<Box<[u8]>> {
58        let this = std::mem::take(self);
59        match this {
60            Chunk::Expired(data) => Some(data),
61            Chunk::Present(chunk) => {
62                match Arc::try_unwrap(chunk.0) {
63                    Ok(data) => *self = Chunk::Expired(data),
64                    Err(chunk) => *self = Chunk::Present(CachedChunk(chunk)),
65                }
66                None
67            }
68            _ => {
69                *self = this;
70                None
71            }
72        }
73    }
74}
75
76/// A wrapper handle around a `ReadObjectHandle` which provides a memory cache for its contents.
77/// Contents are fetched as needed and can be evicted (for example due to memory pressure).
78pub struct CachingObjectHandle<S> {
79    source: S,
80
81    // Data is stored in separately accessible chunks.  These can be independently loaded and
82    // evicted.  The size of this array never changes.
83    chunks: Mutex<Vec<Chunk>>,
84
85    // Reads that are waiting on another read to load a chunk will wait on this event. There's only
86    // 1 event for all chunks so a read may receive a notification for a different chunk than the
87    // one it's waiting for and need to re-wait. It's rare for multiple chunks to be loading at once
88    // and even rarer for a read to be waiting on a chunk to be loaded while another chunk is also
89    // loading.
90    event: Event,
91}
92
93// SAFETY: Only `buffer` isn't Sync. Access to `buffer` is synchronized with `chunk_states`.
94unsafe impl<S> Sync for CachingObjectHandle<S> {}
95
96#[fxfs_trace::trace]
97impl<S: ReadObjectHandle> CachingObjectHandle<S> {
98    pub fn new(source: S) -> Self {
99        let block_size = source.block_size() as usize;
100        assert!(CHUNK_SIZE % block_size == 0);
101        let aligned_size = block_aligned_size(&source);
102        let chunk_count = aligned_size.div_ceil(CHUNK_SIZE);
103
104        let mut chunks = Vec::<Chunk>::new();
105        chunks.resize_with(chunk_count, Default::default);
106        Self { source, chunks: Mutex::new(chunks), event: Event::new() }
107    }
108
109    /// Returns a reference to the chunk (up to `CHUNK_SIZE` bytes) containing `offset`.  If the
110    /// data is already cached, this does not require reading from `source`.
111    /// `offset` must be less than the size of `source`.
112    pub async fn read(&self, offset: usize) -> Result<CachedChunk, Error> {
113        ensure!(offset < self.source.get_size() as usize, FxfsError::OutOfRange);
114        let chunk_num = offset / CHUNK_SIZE;
115
116        enum Action {
117            Wait(EventListener),
118            Load,
119        }
120        loop {
121            let action = {
122                let mut chunks = self.chunks.lock();
123                match std::mem::take(&mut chunks[chunk_num]) {
124                    Chunk::Missing => {
125                        chunks[chunk_num] = Chunk::Pending;
126                        Action::Load
127                    }
128                    Chunk::Pending => {
129                        chunks[chunk_num] = Chunk::Pending;
130                        Action::Wait(self.event.listen())
131                    }
132                    Chunk::Present(cached_chunk) => {
133                        chunks[chunk_num] = Chunk::Present(cached_chunk.clone());
134                        return Ok(cached_chunk);
135                    }
136                    Chunk::Expired(data) => {
137                        let cached_chunk = CachedChunk(Arc::new(data));
138                        chunks[chunk_num] = Chunk::Present(cached_chunk.clone());
139                        return Ok(cached_chunk);
140                    }
141                }
142            };
143            match action {
144                Action::Wait(listener) => {
145                    listener.await;
146                }
147                Action::Load => {
148                    return self.load(chunk_num).await;
149                }
150            }
151        }
152    }
153
154    #[trace]
155    async fn load(&self, chunk_num: usize) -> Result<CachedChunk, Error> {
156        // If this future is dropped or reading fails then put the chunk back into the
157        // `Missing` state.
158        let drop_guard = scopeguard::guard((), |_| {
159            {
160                let mut chunks = self.chunks.lock();
161                debug_assert!(matches!(chunks[chunk_num], Chunk::Pending));
162                chunks[chunk_num] = Chunk::Missing;
163            }
164            self.event.notify(usize::MAX);
165        });
166
167        let read_start = chunk_num * CHUNK_SIZE;
168        let len =
169            std::cmp::min(read_start + CHUNK_SIZE, self.source.get_size() as usize) - read_start;
170        let aligned_len =
171            std::cmp::min(read_start + CHUNK_SIZE, block_aligned_size(&self.source)) - read_start;
172
173        let mut read_buf = self.source.allocate_buffer(aligned_len).await;
174        let amount_read = self.source.read(read_start as u64, read_buf.as_mut()).await?;
175        ensure!(amount_read >= len, anyhow!(FxfsError::Internal).context("Short read"));
176
177        log::debug!("COH {}: Read {len}@{read_start}", self.source.object_id());
178
179        let data = Vec::from(&read_buf.as_slice()[..len]).into_boxed_slice();
180        let cached_chunk = CachedChunk(Arc::new(data));
181
182        {
183            let mut chunks = self.chunks.lock();
184            debug_assert!(matches!(chunks[chunk_num], Chunk::Pending));
185            chunks[chunk_num] = Chunk::Present(cached_chunk.clone());
186        }
187        self.event.notify(usize::MAX);
188
189        scopeguard::ScopeGuard::into_inner(drop_guard);
190        return Ok(cached_chunk);
191    }
192
193    /// Purges unused extents, freeing unused memory.  This follows a second-chance algorithm:
194    /// unused extents will be marked for purging the first time this is called, and if they are not
195    /// used again by the next time this is called, they will be deallocated.
196    /// This is intended to be run regularly, e.g. on a timer.
197    pub fn purge(&self) {
198        // Deallocate out of the lock scope, so we don't hold up readers.
199        let mut to_deallocate = vec![];
200        let mut chunks = self.chunks.lock();
201        for chunk in chunks.iter_mut() {
202            if let Some(data) = chunk.maybe_purge() {
203                to_deallocate.push(data);
204            }
205        }
206        log::debug!(
207            "COH {}: Purging {} cached chunks ({} bytes)",
208            self.source.object_id(),
209            to_deallocate.len(),
210            to_deallocate.len() * CHUNK_SIZE
211        );
212    }
213}
214
215impl<S: ReadObjectHandle> ObjectHandle for CachingObjectHandle<S> {
216    fn set_trace(&self, v: bool) {
217        self.source.set_trace(v);
218    }
219
220    fn object_id(&self) -> u64 {
221        self.source.object_id()
222    }
223
224    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
225        self.source.allocate_buffer(size)
226    }
227
228    fn block_size(&self) -> u64 {
229        self.source.block_size()
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::{CHUNK_SIZE, CachingObjectHandle};
236    use crate::object_handle::{ObjectHandle, ReadObjectHandle};
237    use anyhow::{Error, anyhow, ensure};
238    use async_trait::async_trait;
239    use event_listener::Event;
240    use std::sync::Arc;
241    use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
242    use storage_device::Device;
243    use storage_device::buffer::{BufferFuture, MutableBufferRef};
244    use storage_device::fake_device::FakeDevice;
245
246    // Fills a buffer with a pattern seeded by counter.
247    fn fill_buf(buf: &mut [u8], counter: u8) {
248        for (i, chunk) in buf.chunks_exact_mut(2).enumerate() {
249            chunk[0] = counter;
250            chunk[1] = i as u8;
251        }
252    }
253
254    // Returns a buffer filled with fill_buf.
255    fn make_buf(counter: u8, size: usize) -> Vec<u8> {
256        let mut buf = vec![0; size];
257        fill_buf(&mut buf, counter);
258        buf
259    }
260
261    struct FakeSource {
262        device: Arc<dyn Device>,
263        size: usize,
264        started: AtomicBool,
265        allow_reads: AtomicBool,
266        wake: Event,
267        counter: AtomicU8,
268    }
269
270    impl FakeSource {
271        // `device` is only used to provide allocate_buffer; reads don't go to the device.
272        fn new(device: Arc<dyn Device>, size: usize) -> Self {
273            FakeSource {
274                started: AtomicBool::new(false),
275                allow_reads: AtomicBool::new(true),
276                size,
277                wake: Event::new(),
278                device,
279                counter: AtomicU8::new(1),
280            }
281        }
282
283        fn start(&self) {
284            self.started.store(true, Ordering::SeqCst);
285            self.wake.notify(usize::MAX);
286        }
287
288        // Toggle whether reads from source are allowed.  If an uncached read occurs while this was
289        // set to false, the test panics.
290        fn allow_reads(&self, allow: bool) {
291            self.allow_reads.store(allow, Ordering::SeqCst);
292        }
293
294        async fn wait_for_start(&self) {
295            while !self.started.load(Ordering::SeqCst) {
296                let listener = self.wake.listen();
297                if self.started.load(Ordering::SeqCst) {
298                    break;
299                }
300                listener.await;
301            }
302        }
303    }
304
305    #[async_trait]
306    impl ReadObjectHandle for FakeSource {
307        async fn read(&self, _offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
308            ensure!(self.allow_reads.load(Ordering::SeqCst), anyhow!("Received unexpected read"));
309            let counter = self.counter.fetch_add(1, Ordering::Relaxed);
310            self.wait_for_start().await;
311            fill_buf(buf.as_mut_slice(), counter);
312            Ok(buf.len())
313        }
314
315        fn get_size(&self) -> u64 {
316            self.size as u64
317        }
318    }
319
320    impl ObjectHandle for FakeSource {
321        fn object_id(&self) -> u64 {
322            0
323        }
324
325        fn block_size(&self) -> u64 {
326            self.device.block_size().into()
327        }
328
329        fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
330            self.device.allocate_buffer(size)
331        }
332    }
333
334    #[fuchsia::test]
335    async fn test_read_with_missing_chunk() {
336        let device = Arc::new(FakeDevice::new(1024, 512));
337        let source = FakeSource::new(device, 4096);
338        source.start();
339        let caching_object_handle = CachingObjectHandle::new(source);
340
341        let chunk = caching_object_handle.read(0).await.unwrap();
342        assert_eq!(&*chunk, make_buf(1, 4096));
343    }
344
345    #[fuchsia::test]
346    async fn test_read_with_present_chunk() {
347        let device = Arc::new(FakeDevice::new(1024, 512));
348        let source = FakeSource::new(device, 4096);
349        source.start();
350        let caching_object_handle = CachingObjectHandle::new(source);
351
352        let expected = make_buf(1, 4096);
353        let chunk = caching_object_handle.read(0).await.unwrap();
354        assert_eq!(&*chunk, expected);
355
356        // The chunk was already populated so this read receives the same value as the above read.
357        let chunk = caching_object_handle.read(0).await.unwrap();
358        assert_eq!(&*chunk, expected);
359    }
360
361    #[fuchsia::test]
362    async fn test_read_with_pending_chunk() {
363        let device = Arc::new(FakeDevice::new(1024, 512));
364        let source = FakeSource::new(device, 8192);
365        let caching_object_handle = CachingObjectHandle::new(source);
366
367        // The two reads should target the same chunk.
368        let mut read_fut1 = std::pin::pin!(caching_object_handle.read(0));
369        let mut read_fut2 = std::pin::pin!(caching_object_handle.read(4096));
370
371        // The first future will transition the chunk from `Missing` to `Pending` and then wait
372        // on the source.
373        assert!(futures::poll!(&mut read_fut1).is_pending());
374        // The second future will wait on the event.
375        assert!(futures::poll!(&mut read_fut2).is_pending());
376        caching_object_handle.source.start();
377        // Even though the source is ready the second future can't make progress.
378        assert!(futures::poll!(&mut read_fut2).is_pending());
379        // The first future reads from the source, transition the chunk from `Pending` to
380        // `Present`, and then notifies the event.
381        let expected = make_buf(1, 8192);
382        assert_eq!(&*read_fut1.await.unwrap(), expected);
383        // The event has been notified and the second future can now complete.
384        assert_eq!(&*read_fut2.await.unwrap(), expected);
385    }
386
387    #[fuchsia::test]
388    async fn test_read_with_notification_for_other_chunk() {
389        let device = Arc::new(FakeDevice::new(1024, 512));
390        let source = FakeSource::new(device, CHUNK_SIZE + 4096);
391        let caching_object_handle = CachingObjectHandle::new(source);
392
393        let mut read_fut1 = std::pin::pin!(caching_object_handle.read(0));
394        let mut read_fut2 = std::pin::pin!(caching_object_handle.read(CHUNK_SIZE));
395        let mut read_fut3 = std::pin::pin!(caching_object_handle.read(0));
396
397        // The first and second futures will transition their chunks from `Missing` to `Pending`
398        // and then wait on the source.
399        assert!(futures::poll!(&mut read_fut1).is_pending());
400        assert!(futures::poll!(&mut read_fut2).is_pending());
401        // The third future will wait on the event.
402        assert!(futures::poll!(&mut read_fut3).is_pending());
403        caching_object_handle.source.start();
404        // Even though the source is ready the third future can't make progress.
405        assert!(futures::poll!(&mut read_fut3).is_pending());
406        // The second future will read from the source and notify the event.
407        assert_eq!(&*read_fut2.await.unwrap(), make_buf(2, 4096));
408        // The event was notified but the first chunk is still `Pending` so the third future
409        // resumes waiting.
410        assert!(futures::poll!(&mut read_fut3).is_pending());
411        // The first future will read from the source, transition the first chunk to `Present`,
412        // and notify the event.
413        let expected = make_buf(1, CHUNK_SIZE);
414        assert_eq!(&*read_fut1.await.unwrap(), expected);
415        // The first chunk is now present so the third future can complete.
416        assert_eq!(&*read_fut3.await.unwrap(), expected);
417    }
418
419    #[fuchsia::test]
420    async fn test_read_with_dropped_future() {
421        let device = Arc::new(FakeDevice::new(1024, 512));
422        let source = FakeSource::new(device, 4096);
423        let caching_object_handle = CachingObjectHandle::new(source);
424
425        let mut read_fut2 = std::pin::pin!(caching_object_handle.read(0));
426        {
427            let mut read_fut1 = std::pin::pin!(caching_object_handle.read(0));
428
429            // The first future will transition the chunk from `Missing` to `Pending` and then
430            // wait on the source.
431            assert!(futures::poll!(&mut read_fut1).is_pending());
432            // The second future will wait on the event.
433            assert!(futures::poll!(&mut read_fut2).is_pending());
434            caching_object_handle.source.start();
435            // Even though the source is ready the second future can't make progress.
436            assert!(futures::poll!(&mut read_fut2).is_pending());
437        }
438        // The first future was dropped which transitioned the chunk from `Pending` to `Missing`
439        // and notified the event. When the second future is polled it transitions the chunk
440        // from `Missing` back to `Pending`, reads from the source, and then transitions the
441        // chunk to `Present`.
442        assert_eq!(&*read_fut2.await.unwrap(), make_buf(2, 4096));
443    }
444
445    #[fuchsia::test]
446    async fn test_read_past_end_of_source() {
447        let device = Arc::new(FakeDevice::new(1024, 512));
448        let source = FakeSource::new(device, 300);
449        source.start();
450        let caching_object_handle = CachingObjectHandle::new(source);
451
452        caching_object_handle.read(500).await.expect_err("Read should fail");
453    }
454
455    #[fuchsia::test]
456    async fn test_read_to_end_of_source() {
457        let device = Arc::new(FakeDevice::new(1024, 512));
458        const SOURCE_SIZE: usize = 300;
459        let source = FakeSource::new(device, SOURCE_SIZE);
460        source.start();
461        let caching_object_handle = CachingObjectHandle::new(source);
462
463        let chunk = caching_object_handle.read(0).await.unwrap();
464        assert_eq!(&*chunk, make_buf(1, SOURCE_SIZE));
465    }
466
467    #[fuchsia::test]
468    async fn test_chunk_purging() {
469        let device = Arc::new(FakeDevice::new(1024, 512));
470        let source = Arc::new(FakeSource::new(device, CHUNK_SIZE + 4096));
471        source.start();
472        let caching_object_handle =
473            CachingObjectHandle::new(source.clone() as Arc<dyn ReadObjectHandle>);
474
475        let _chunk1 = caching_object_handle.read(0).await.unwrap();
476        // Immediately drop the second chunk.
477        caching_object_handle.read(CHUNK_SIZE).await.unwrap();
478
479        source.allow_reads(false);
480
481        // The first purge should not evict the second chunk yet, and the read should save the chunk
482        // from being evicted by the next purge too.
483        caching_object_handle.purge();
484        caching_object_handle.read(0).await.unwrap();
485        caching_object_handle.read(CHUNK_SIZE).await.unwrap();
486
487        caching_object_handle.purge();
488        caching_object_handle.read(0).await.unwrap();
489        caching_object_handle.read(CHUNK_SIZE).await.unwrap();
490
491        // Purging twice should result in evicting the second chunk.
492        caching_object_handle.purge();
493        caching_object_handle.purge();
494        caching_object_handle.read(0).await.unwrap();
495        caching_object_handle.read(CHUNK_SIZE).await.expect_err("Chunk was not purged");
496    }
497}