1use crate::errors::FxfsError;
6use crate::object_handle::{ObjectHandle, ReadObjectHandle};
7use anyhow::{Error, anyhow, ensure};
8use event_listener::{Event, EventListener};
9use fuchsia_sync::Mutex;
10use std::ops::Deref;
11use std::sync::Arc;
12use storage_device::buffer::BufferFuture;
13
14pub const CHUNK_SIZE: usize = 128 * 1024;
15
16fn block_aligned_size(source: &impl ReadObjectHandle) -> usize {
17 let block_size = source.block_size() as usize;
18 let source_size = source.get_size() as usize;
19 source_size.checked_next_multiple_of(block_size).unwrap()
20}
21
22#[repr(transparent)]
25#[derive(Clone)]
26pub struct CachedChunk(Arc<Box<[u8]>>);
27
28impl std::fmt::Debug for CachedChunk {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
30 f.debug_struct("CachedChunk").field("len", &self.len()).finish()
31 }
32}
33
34impl Deref for CachedChunk {
35 type Target = [u8];
36
37 fn deref(&self) -> &Self::Target {
38 &**self.0
39 }
40}
41
42#[derive(Default, Debug)]
43enum Chunk {
44 #[default]
45 Missing,
46 Pending,
47 Present(CachedChunk),
48 Expired(Box<[u8]>),
51}
52
53impl Chunk {
54 fn maybe_purge(&mut self) -> Option<Box<[u8]>> {
58 let this = std::mem::take(self);
59 match this {
60 Chunk::Expired(data) => Some(data),
61 Chunk::Present(chunk) => {
62 match Arc::try_unwrap(chunk.0) {
63 Ok(data) => *self = Chunk::Expired(data),
64 Err(chunk) => *self = Chunk::Present(CachedChunk(chunk)),
65 }
66 None
67 }
68 _ => {
69 *self = this;
70 None
71 }
72 }
73 }
74}
75
76pub struct CachingObjectHandle<S> {
79 source: S,
80
81 chunks: Mutex<Vec<Chunk>>,
84
85 event: Event,
91}
92
93unsafe impl<S> Sync for CachingObjectHandle<S> {}
95
96#[fxfs_trace::trace]
97impl<S: ReadObjectHandle> CachingObjectHandle<S> {
98 pub fn new(source: S) -> Self {
99 let block_size = source.block_size() as usize;
100 assert!(CHUNK_SIZE % block_size == 0);
101 let aligned_size = block_aligned_size(&source);
102 let chunk_count = aligned_size.div_ceil(CHUNK_SIZE);
103
104 let mut chunks = Vec::<Chunk>::new();
105 chunks.resize_with(chunk_count, Default::default);
106 Self { source, chunks: Mutex::new(chunks), event: Event::new() }
107 }
108
109 pub async fn read(&self, offset: usize) -> Result<CachedChunk, Error> {
113 ensure!(offset < self.source.get_size() as usize, FxfsError::OutOfRange);
114 let chunk_num = offset / CHUNK_SIZE;
115
116 enum Action {
117 Wait(EventListener),
118 Load,
119 }
120 loop {
121 let action = {
122 let mut chunks = self.chunks.lock();
123 match std::mem::take(&mut chunks[chunk_num]) {
124 Chunk::Missing => {
125 chunks[chunk_num] = Chunk::Pending;
126 Action::Load
127 }
128 Chunk::Pending => {
129 chunks[chunk_num] = Chunk::Pending;
130 Action::Wait(self.event.listen())
131 }
132 Chunk::Present(cached_chunk) => {
133 chunks[chunk_num] = Chunk::Present(cached_chunk.clone());
134 return Ok(cached_chunk);
135 }
136 Chunk::Expired(data) => {
137 let cached_chunk = CachedChunk(Arc::new(data));
138 chunks[chunk_num] = Chunk::Present(cached_chunk.clone());
139 return Ok(cached_chunk);
140 }
141 }
142 };
143 match action {
144 Action::Wait(listener) => {
145 listener.await;
146 }
147 Action::Load => {
148 return self.load(chunk_num).await;
149 }
150 }
151 }
152 }
153
154 #[trace]
155 async fn load(&self, chunk_num: usize) -> Result<CachedChunk, Error> {
156 let drop_guard = scopeguard::guard((), |_| {
159 {
160 let mut chunks = self.chunks.lock();
161 debug_assert!(matches!(chunks[chunk_num], Chunk::Pending));
162 chunks[chunk_num] = Chunk::Missing;
163 }
164 self.event.notify(usize::MAX);
165 });
166
167 let read_start = chunk_num * CHUNK_SIZE;
168 let len =
169 std::cmp::min(read_start + CHUNK_SIZE, self.source.get_size() as usize) - read_start;
170 let aligned_len =
171 std::cmp::min(read_start + CHUNK_SIZE, block_aligned_size(&self.source)) - read_start;
172
173 let mut read_buf = self.source.allocate_buffer(aligned_len).await;
174 let amount_read = self.source.read(read_start as u64, read_buf.as_mut()).await?;
175 ensure!(amount_read >= len, anyhow!(FxfsError::Internal).context("Short read"));
176
177 log::debug!("COH {}: Read {len}@{read_start}", self.source.object_id());
178
179 let data = Vec::from(&read_buf.as_slice()[..len]).into_boxed_slice();
180 let cached_chunk = CachedChunk(Arc::new(data));
181
182 {
183 let mut chunks = self.chunks.lock();
184 debug_assert!(matches!(chunks[chunk_num], Chunk::Pending));
185 chunks[chunk_num] = Chunk::Present(cached_chunk.clone());
186 }
187 self.event.notify(usize::MAX);
188
189 scopeguard::ScopeGuard::into_inner(drop_guard);
190 return Ok(cached_chunk);
191 }
192
193 pub fn purge(&self) {
198 let mut to_deallocate = vec![];
200 let mut chunks = self.chunks.lock();
201 for chunk in chunks.iter_mut() {
202 if let Some(data) = chunk.maybe_purge() {
203 to_deallocate.push(data);
204 }
205 }
206 log::debug!(
207 "COH {}: Purging {} cached chunks ({} bytes)",
208 self.source.object_id(),
209 to_deallocate.len(),
210 to_deallocate.len() * CHUNK_SIZE
211 );
212 }
213}
214
215impl<S: ReadObjectHandle> ObjectHandle for CachingObjectHandle<S> {
216 fn set_trace(&self, v: bool) {
217 self.source.set_trace(v);
218 }
219
220 fn object_id(&self) -> u64 {
221 self.source.object_id()
222 }
223
224 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
225 self.source.allocate_buffer(size)
226 }
227
228 fn block_size(&self) -> u64 {
229 self.source.block_size()
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::{CHUNK_SIZE, CachingObjectHandle};
236 use crate::object_handle::{ObjectHandle, ReadObjectHandle};
237 use anyhow::{Error, anyhow, ensure};
238 use async_trait::async_trait;
239 use event_listener::Event;
240 use std::sync::Arc;
241 use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
242 use storage_device::Device;
243 use storage_device::buffer::{BufferFuture, MutableBufferRef};
244 use storage_device::fake_device::FakeDevice;
245
246 fn fill_buf(buf: &mut [u8], counter: u8) {
248 for (i, chunk) in buf.chunks_exact_mut(2).enumerate() {
249 chunk[0] = counter;
250 chunk[1] = i as u8;
251 }
252 }
253
254 fn make_buf(counter: u8, size: usize) -> Vec<u8> {
256 let mut buf = vec![0; size];
257 fill_buf(&mut buf, counter);
258 buf
259 }
260
261 struct FakeSource {
262 device: Arc<dyn Device>,
263 size: usize,
264 started: AtomicBool,
265 allow_reads: AtomicBool,
266 wake: Event,
267 counter: AtomicU8,
268 }
269
270 impl FakeSource {
271 fn new(device: Arc<dyn Device>, size: usize) -> Self {
273 FakeSource {
274 started: AtomicBool::new(false),
275 allow_reads: AtomicBool::new(true),
276 size,
277 wake: Event::new(),
278 device,
279 counter: AtomicU8::new(1),
280 }
281 }
282
283 fn start(&self) {
284 self.started.store(true, Ordering::SeqCst);
285 self.wake.notify(usize::MAX);
286 }
287
288 fn allow_reads(&self, allow: bool) {
291 self.allow_reads.store(allow, Ordering::SeqCst);
292 }
293
294 async fn wait_for_start(&self) {
295 while !self.started.load(Ordering::SeqCst) {
296 let listener = self.wake.listen();
297 if self.started.load(Ordering::SeqCst) {
298 break;
299 }
300 listener.await;
301 }
302 }
303 }
304
305 #[async_trait]
306 impl ReadObjectHandle for FakeSource {
307 async fn read(&self, _offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
308 ensure!(self.allow_reads.load(Ordering::SeqCst), anyhow!("Received unexpected read"));
309 let counter = self.counter.fetch_add(1, Ordering::Relaxed);
310 self.wait_for_start().await;
311 fill_buf(buf.as_mut_slice(), counter);
312 Ok(buf.len())
313 }
314
315 fn get_size(&self) -> u64 {
316 self.size as u64
317 }
318 }
319
320 impl ObjectHandle for FakeSource {
321 fn object_id(&self) -> u64 {
322 0
323 }
324
325 fn block_size(&self) -> u64 {
326 self.device.block_size().into()
327 }
328
329 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
330 self.device.allocate_buffer(size)
331 }
332 }
333
334 #[fuchsia::test]
335 async fn test_read_with_missing_chunk() {
336 let device = Arc::new(FakeDevice::new(1024, 512));
337 let source = FakeSource::new(device, 4096);
338 source.start();
339 let caching_object_handle = CachingObjectHandle::new(source);
340
341 let chunk = caching_object_handle.read(0).await.unwrap();
342 assert_eq!(&*chunk, make_buf(1, 4096));
343 }
344
345 #[fuchsia::test]
346 async fn test_read_with_present_chunk() {
347 let device = Arc::new(FakeDevice::new(1024, 512));
348 let source = FakeSource::new(device, 4096);
349 source.start();
350 let caching_object_handle = CachingObjectHandle::new(source);
351
352 let expected = make_buf(1, 4096);
353 let chunk = caching_object_handle.read(0).await.unwrap();
354 assert_eq!(&*chunk, expected);
355
356 let chunk = caching_object_handle.read(0).await.unwrap();
358 assert_eq!(&*chunk, expected);
359 }
360
361 #[fuchsia::test]
362 async fn test_read_with_pending_chunk() {
363 let device = Arc::new(FakeDevice::new(1024, 512));
364 let source = FakeSource::new(device, 8192);
365 let caching_object_handle = CachingObjectHandle::new(source);
366
367 let mut read_fut1 = std::pin::pin!(caching_object_handle.read(0));
369 let mut read_fut2 = std::pin::pin!(caching_object_handle.read(4096));
370
371 assert!(futures::poll!(&mut read_fut1).is_pending());
374 assert!(futures::poll!(&mut read_fut2).is_pending());
376 caching_object_handle.source.start();
377 assert!(futures::poll!(&mut read_fut2).is_pending());
379 let expected = make_buf(1, 8192);
382 assert_eq!(&*read_fut1.await.unwrap(), expected);
383 assert_eq!(&*read_fut2.await.unwrap(), expected);
385 }
386
387 #[fuchsia::test]
388 async fn test_read_with_notification_for_other_chunk() {
389 let device = Arc::new(FakeDevice::new(1024, 512));
390 let source = FakeSource::new(device, CHUNK_SIZE + 4096);
391 let caching_object_handle = CachingObjectHandle::new(source);
392
393 let mut read_fut1 = std::pin::pin!(caching_object_handle.read(0));
394 let mut read_fut2 = std::pin::pin!(caching_object_handle.read(CHUNK_SIZE));
395 let mut read_fut3 = std::pin::pin!(caching_object_handle.read(0));
396
397 assert!(futures::poll!(&mut read_fut1).is_pending());
400 assert!(futures::poll!(&mut read_fut2).is_pending());
401 assert!(futures::poll!(&mut read_fut3).is_pending());
403 caching_object_handle.source.start();
404 assert!(futures::poll!(&mut read_fut3).is_pending());
406 assert_eq!(&*read_fut2.await.unwrap(), make_buf(2, 4096));
408 assert!(futures::poll!(&mut read_fut3).is_pending());
411 let expected = make_buf(1, CHUNK_SIZE);
414 assert_eq!(&*read_fut1.await.unwrap(), expected);
415 assert_eq!(&*read_fut3.await.unwrap(), expected);
417 }
418
419 #[fuchsia::test]
420 async fn test_read_with_dropped_future() {
421 let device = Arc::new(FakeDevice::new(1024, 512));
422 let source = FakeSource::new(device, 4096);
423 let caching_object_handle = CachingObjectHandle::new(source);
424
425 let mut read_fut2 = std::pin::pin!(caching_object_handle.read(0));
426 {
427 let mut read_fut1 = std::pin::pin!(caching_object_handle.read(0));
428
429 assert!(futures::poll!(&mut read_fut1).is_pending());
432 assert!(futures::poll!(&mut read_fut2).is_pending());
434 caching_object_handle.source.start();
435 assert!(futures::poll!(&mut read_fut2).is_pending());
437 }
438 assert_eq!(&*read_fut2.await.unwrap(), make_buf(2, 4096));
443 }
444
445 #[fuchsia::test]
446 async fn test_read_past_end_of_source() {
447 let device = Arc::new(FakeDevice::new(1024, 512));
448 let source = FakeSource::new(device, 300);
449 source.start();
450 let caching_object_handle = CachingObjectHandle::new(source);
451
452 caching_object_handle.read(500).await.expect_err("Read should fail");
453 }
454
455 #[fuchsia::test]
456 async fn test_read_to_end_of_source() {
457 let device = Arc::new(FakeDevice::new(1024, 512));
458 const SOURCE_SIZE: usize = 300;
459 let source = FakeSource::new(device, SOURCE_SIZE);
460 source.start();
461 let caching_object_handle = CachingObjectHandle::new(source);
462
463 let chunk = caching_object_handle.read(0).await.unwrap();
464 assert_eq!(&*chunk, make_buf(1, SOURCE_SIZE));
465 }
466
467 #[fuchsia::test]
468 async fn test_chunk_purging() {
469 let device = Arc::new(FakeDevice::new(1024, 512));
470 let source = Arc::new(FakeSource::new(device, CHUNK_SIZE + 4096));
471 source.start();
472 let caching_object_handle =
473 CachingObjectHandle::new(source.clone() as Arc<dyn ReadObjectHandle>);
474
475 let _chunk1 = caching_object_handle.read(0).await.unwrap();
476 caching_object_handle.read(CHUNK_SIZE).await.unwrap();
478
479 source.allow_reads(false);
480
481 caching_object_handle.purge();
484 caching_object_handle.read(0).await.unwrap();
485 caching_object_handle.read(CHUNK_SIZE).await.unwrap();
486
487 caching_object_handle.purge();
488 caching_object_handle.read(0).await.unwrap();
489 caching_object_handle.read(CHUNK_SIZE).await.unwrap();
490
491 caching_object_handle.purge();
493 caching_object_handle.purge();
494 caching_object_handle.read(0).await.unwrap();
495 caching_object_handle.read(CHUNK_SIZE).await.expect_err("Chunk was not purged");
496 }
497}