1use crate::errors::FxfsError;
6use crate::object_handle::{ObjectHandle, ReadObjectHandle};
7use anyhow::{anyhow, ensure, Error};
8use event_listener::{Event, EventListener};
9use fuchsia_sync::Mutex;
10use std::ops::Deref;
11use std::sync::Arc;
12use storage_device::buffer::BufferFuture;
13
14pub const CHUNK_SIZE: usize = 128 * 1024;
15
16fn block_aligned_size(source: &impl ReadObjectHandle) -> usize {
17 let block_size = source.block_size() as usize;
18 let source_size = source.get_size() as usize;
19 source_size.checked_next_multiple_of(block_size).unwrap()
20}
21
22#[repr(transparent)]
25#[derive(Clone)]
26pub struct CachedChunk(Arc<Box<[u8]>>);
27
28impl std::fmt::Debug for CachedChunk {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
30 f.debug_struct("CachedChunk").field("len", &self.len()).finish()
31 }
32}
33
34impl Deref for CachedChunk {
35 type Target = [u8];
36
37 fn deref(&self) -> &Self::Target {
38 &**self.0
39 }
40}
41
42#[derive(Default, Debug)]
43enum Chunk {
44 #[default]
45 Missing,
46 Pending,
47 Present(CachedChunk),
48 Expired(Box<[u8]>),
51}
52
53impl Chunk {
54 fn maybe_purge(&mut self) -> Option<Box<[u8]>> {
58 let this = std::mem::take(self);
59 match this {
60 Chunk::Expired(data) => Some(data),
61 Chunk::Present(chunk) => {
62 match Arc::try_unwrap(chunk.0) {
63 Ok(data) => *self = Chunk::Expired(data),
64 Err(chunk) => *self = Chunk::Present(CachedChunk(chunk)),
65 }
66 None
67 }
68 _ => {
69 *self = this;
70 None
71 }
72 }
73 }
74}
75
76pub struct CachingObjectHandle<S> {
79 source: S,
80
81 chunks: Mutex<Vec<Chunk>>,
84
85 event: Event,
91}
92
93unsafe impl<S> Sync for CachingObjectHandle<S> {}
95
96impl<S: ReadObjectHandle> CachingObjectHandle<S> {
97 pub fn new(source: S) -> Self {
98 let block_size = source.block_size() as usize;
99 assert!(CHUNK_SIZE % block_size == 0);
100 let aligned_size = block_aligned_size(&source);
101 let chunk_count = aligned_size.div_ceil(CHUNK_SIZE);
102
103 let mut chunks = Vec::<Chunk>::new();
104 chunks.resize_with(chunk_count, Default::default);
105 Self { source, chunks: Mutex::new(chunks), event: Event::new() }
106 }
107
108 pub async fn read(&self, offset: usize) -> Result<CachedChunk, Error> {
112 ensure!(offset < self.source.get_size() as usize, FxfsError::OutOfRange);
113 let chunk_num = offset / CHUNK_SIZE;
114
115 enum Action {
116 Wait(EventListener),
117 Load,
118 }
119 loop {
120 let action = {
121 let mut chunks = self.chunks.lock();
122 match std::mem::take(&mut chunks[chunk_num]) {
123 Chunk::Missing => {
124 chunks[chunk_num] = Chunk::Pending;
125 Action::Load
126 }
127 Chunk::Pending => {
128 chunks[chunk_num] = Chunk::Pending;
129 Action::Wait(self.event.listen())
130 }
131 Chunk::Present(cached_chunk) => {
132 chunks[chunk_num] = Chunk::Present(cached_chunk.clone());
133 return Ok(cached_chunk);
134 }
135 Chunk::Expired(data) => {
136 let cached_chunk = CachedChunk(Arc::new(data));
137 chunks[chunk_num] = Chunk::Present(cached_chunk.clone());
138 return Ok(cached_chunk);
139 }
140 }
141 };
142 match action {
143 Action::Wait(listener) => {
144 listener.await;
145 }
146 Action::Load => {
147 let drop_guard = scopeguard::guard((), |_| {
150 {
151 let mut chunks = self.chunks.lock();
152 debug_assert!(matches!(chunks[chunk_num], Chunk::Pending));
153 chunks[chunk_num] = Chunk::Missing;
154 }
155 self.event.notify(usize::MAX);
156 });
157
158 let read_start = chunk_num * CHUNK_SIZE;
159 let len =
160 std::cmp::min(read_start + CHUNK_SIZE, self.source.get_size() as usize)
161 - read_start;
162 let aligned_len =
163 std::cmp::min(read_start + CHUNK_SIZE, block_aligned_size(&self.source))
164 - read_start;
165
166 fxfs_trace::duration!(c"CachingObjectHandle::load", "len" => aligned_len);
167
168 let mut read_buf = self.source.allocate_buffer(aligned_len).await;
169 let amount_read =
170 self.source.read(read_start as u64, read_buf.as_mut()).await?;
171 ensure!(amount_read >= len, anyhow!(FxfsError::Internal).context("Short read"));
172
173 log::debug!("COH {}: Read {len}@{read_start}", self.source.object_id());
174
175 let data = Vec::from(&read_buf.as_slice()[..len]).into_boxed_slice();
176 let cached_chunk = CachedChunk(Arc::new(data));
177
178 {
179 let mut chunks = self.chunks.lock();
180 debug_assert!(matches!(chunks[chunk_num], Chunk::Pending));
181 chunks[chunk_num] = Chunk::Present(cached_chunk.clone());
182 }
183 self.event.notify(usize::MAX);
184
185 scopeguard::ScopeGuard::into_inner(drop_guard);
186 return Ok(cached_chunk);
187 }
188 }
189 }
190 }
191
192 pub fn purge(&self) {
197 let mut to_deallocate = vec![];
199 let mut chunks = self.chunks.lock();
200 for chunk in chunks.iter_mut() {
201 if let Some(data) = chunk.maybe_purge() {
202 to_deallocate.push(data);
203 }
204 }
205 log::debug!(
206 "COH {}: Purging {} cached chunks ({} bytes)",
207 self.source.object_id(),
208 to_deallocate.len(),
209 to_deallocate.len() * CHUNK_SIZE
210 );
211 }
212}
213
214impl<S: ReadObjectHandle> ObjectHandle for CachingObjectHandle<S> {
215 fn set_trace(&self, v: bool) {
216 self.source.set_trace(v);
217 }
218
219 fn object_id(&self) -> u64 {
220 self.source.object_id()
221 }
222
223 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
224 self.source.allocate_buffer(size)
225 }
226
227 fn block_size(&self) -> u64 {
228 self.source.block_size()
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::{CachingObjectHandle, CHUNK_SIZE};
235 use crate::object_handle::{ObjectHandle, ReadObjectHandle};
236 use anyhow::{anyhow, ensure, Error};
237 use async_trait::async_trait;
238 use event_listener::Event;
239 use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
240 use std::sync::Arc;
241 use storage_device::buffer::{BufferFuture, MutableBufferRef};
242 use storage_device::fake_device::FakeDevice;
243 use storage_device::Device;
244
245 fn fill_buf(buf: &mut [u8], counter: u8) {
247 for (i, chunk) in buf.chunks_exact_mut(2).enumerate() {
248 chunk[0] = counter;
249 chunk[1] = i as u8;
250 }
251 }
252
253 fn make_buf(counter: u8, size: usize) -> Vec<u8> {
255 let mut buf = vec![0; size];
256 fill_buf(&mut buf, counter);
257 buf
258 }
259
260 struct FakeSource {
261 device: Arc<dyn Device>,
262 size: usize,
263 started: AtomicBool,
264 allow_reads: AtomicBool,
265 wake: Event,
266 counter: AtomicU8,
267 }
268
269 impl FakeSource {
270 fn new(device: Arc<dyn Device>, size: usize) -> Self {
272 FakeSource {
273 started: AtomicBool::new(false),
274 allow_reads: AtomicBool::new(true),
275 size,
276 wake: Event::new(),
277 device,
278 counter: AtomicU8::new(1),
279 }
280 }
281
282 fn start(&self) {
283 self.started.store(true, Ordering::SeqCst);
284 self.wake.notify(usize::MAX);
285 }
286
287 fn allow_reads(&self, allow: bool) {
290 self.allow_reads.store(allow, Ordering::SeqCst);
291 }
292
293 async fn wait_for_start(&self) {
294 while !self.started.load(Ordering::SeqCst) {
295 let listener = self.wake.listen();
296 if self.started.load(Ordering::SeqCst) {
297 break;
298 }
299 listener.await;
300 }
301 }
302 }
303
304 #[async_trait]
305 impl ReadObjectHandle for FakeSource {
306 async fn read(&self, _offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
307 ensure!(self.allow_reads.load(Ordering::SeqCst), anyhow!("Received unexpected read"));
308 let counter = self.counter.fetch_add(1, Ordering::Relaxed);
309 self.wait_for_start().await;
310 fill_buf(buf.as_mut_slice(), counter);
311 Ok(buf.len())
312 }
313
314 fn get_size(&self) -> u64 {
315 self.size as u64
316 }
317 }
318
319 impl ObjectHandle for FakeSource {
320 fn object_id(&self) -> u64 {
321 0
322 }
323
324 fn block_size(&self) -> u64 {
325 self.device.block_size().into()
326 }
327
328 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
329 self.device.allocate_buffer(size)
330 }
331 }
332
333 #[fuchsia::test]
334 async fn test_read_with_missing_chunk() {
335 let device = Arc::new(FakeDevice::new(1024, 512));
336 let source = FakeSource::new(device, 4096);
337 source.start();
338 let caching_object_handle = CachingObjectHandle::new(source);
339
340 let chunk = caching_object_handle.read(0).await.unwrap();
341 assert_eq!(&*chunk, make_buf(1, 4096));
342 }
343
344 #[fuchsia::test]
345 async fn test_read_with_present_chunk() {
346 let device = Arc::new(FakeDevice::new(1024, 512));
347 let source = FakeSource::new(device, 4096);
348 source.start();
349 let caching_object_handle = CachingObjectHandle::new(source);
350
351 let expected = make_buf(1, 4096);
352 let chunk = caching_object_handle.read(0).await.unwrap();
353 assert_eq!(&*chunk, expected);
354
355 let chunk = caching_object_handle.read(0).await.unwrap();
357 assert_eq!(&*chunk, expected);
358 }
359
360 #[fuchsia::test]
361 async fn test_read_with_pending_chunk() {
362 let device = Arc::new(FakeDevice::new(1024, 512));
363 let source = FakeSource::new(device, 8192);
364 let caching_object_handle = CachingObjectHandle::new(source);
365
366 let mut read_fut1 = std::pin::pin!(caching_object_handle.read(0));
368 let mut read_fut2 = std::pin::pin!(caching_object_handle.read(4096));
369
370 assert!(futures::poll!(&mut read_fut1).is_pending());
373 assert!(futures::poll!(&mut read_fut2).is_pending());
375 caching_object_handle.source.start();
376 assert!(futures::poll!(&mut read_fut2).is_pending());
378 let expected = make_buf(1, 8192);
381 assert_eq!(&*read_fut1.await.unwrap(), expected);
382 assert_eq!(&*read_fut2.await.unwrap(), expected);
384 }
385
386 #[fuchsia::test]
387 async fn test_read_with_notification_for_other_chunk() {
388 let device = Arc::new(FakeDevice::new(1024, 512));
389 let source = FakeSource::new(device, CHUNK_SIZE + 4096);
390 let caching_object_handle = CachingObjectHandle::new(source);
391
392 let mut read_fut1 = std::pin::pin!(caching_object_handle.read(0));
393 let mut read_fut2 = std::pin::pin!(caching_object_handle.read(CHUNK_SIZE));
394 let mut read_fut3 = std::pin::pin!(caching_object_handle.read(0));
395
396 assert!(futures::poll!(&mut read_fut1).is_pending());
399 assert!(futures::poll!(&mut read_fut2).is_pending());
400 assert!(futures::poll!(&mut read_fut3).is_pending());
402 caching_object_handle.source.start();
403 assert!(futures::poll!(&mut read_fut3).is_pending());
405 assert_eq!(&*read_fut2.await.unwrap(), make_buf(2, 4096));
407 assert!(futures::poll!(&mut read_fut3).is_pending());
410 let expected = make_buf(1, CHUNK_SIZE);
413 assert_eq!(&*read_fut1.await.unwrap(), expected);
414 assert_eq!(&*read_fut3.await.unwrap(), expected);
416 }
417
418 #[fuchsia::test]
419 async fn test_read_with_dropped_future() {
420 let device = Arc::new(FakeDevice::new(1024, 512));
421 let source = FakeSource::new(device, 4096);
422 let caching_object_handle = CachingObjectHandle::new(source);
423
424 let mut read_fut2 = std::pin::pin!(caching_object_handle.read(0));
425 {
426 let mut read_fut1 = std::pin::pin!(caching_object_handle.read(0));
427
428 assert!(futures::poll!(&mut read_fut1).is_pending());
431 assert!(futures::poll!(&mut read_fut2).is_pending());
433 caching_object_handle.source.start();
434 assert!(futures::poll!(&mut read_fut2).is_pending());
436 }
437 assert_eq!(&*read_fut2.await.unwrap(), make_buf(2, 4096));
442 }
443
444 #[fuchsia::test]
445 async fn test_read_past_end_of_source() {
446 let device = Arc::new(FakeDevice::new(1024, 512));
447 let source = FakeSource::new(device, 300);
448 source.start();
449 let caching_object_handle = CachingObjectHandle::new(source);
450
451 caching_object_handle.read(500).await.expect_err("Read should fail");
452 }
453
454 #[fuchsia::test]
455 async fn test_read_to_end_of_source() {
456 let device = Arc::new(FakeDevice::new(1024, 512));
457 const SOURCE_SIZE: usize = 300;
458 let source = FakeSource::new(device, SOURCE_SIZE);
459 source.start();
460 let caching_object_handle = CachingObjectHandle::new(source);
461
462 let chunk = caching_object_handle.read(0).await.unwrap();
463 assert_eq!(&*chunk, make_buf(1, SOURCE_SIZE));
464 }
465
466 #[fuchsia::test]
467 async fn test_chunk_purging() {
468 let device = Arc::new(FakeDevice::new(1024, 512));
469 let source = Arc::new(FakeSource::new(device, CHUNK_SIZE + 4096));
470 source.start();
471 let caching_object_handle =
472 CachingObjectHandle::new(source.clone() as Arc<dyn ReadObjectHandle>);
473
474 let _chunk1 = caching_object_handle.read(0).await.unwrap();
475 caching_object_handle.read(CHUNK_SIZE).await.unwrap();
477
478 source.allow_reads(false);
479
480 caching_object_handle.purge();
483 caching_object_handle.read(0).await.unwrap();
484 caching_object_handle.read(CHUNK_SIZE).await.unwrap();
485
486 caching_object_handle.purge();
487 caching_object_handle.read(0).await.unwrap();
488 caching_object_handle.read(CHUNK_SIZE).await.unwrap();
489
490 caching_object_handle.purge();
492 caching_object_handle.purge();
493 caching_object_handle.read(0).await.unwrap();
494 caching_object_handle.read(CHUNK_SIZE).await.expect_err("Chunk was not purged");
495 }
496}