fxfs/object_store/journal/
bootstrap_handle.rs
1use crate::log::*;
6use crate::object_handle::{ObjectHandle, ReadObjectHandle};
7use crate::object_store::journal::JournalHandle;
8use crate::range::RangeExt;
9use anyhow::Error;
10use async_trait::async_trait;
11use std::cmp::min;
12use std::ops::Range;
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::Arc;
15use storage_device::buffer::{BufferFuture, MutableBufferRef};
16use storage_device::Device;
17
18#[derive(Debug, Clone)]
20struct Extent {
21 added_offset: u64,
24 device_range: Range<u64>,
25}
26pub struct BootstrapObjectHandle {
29 object_id: u64,
30 device: Arc<dyn Device>,
31 start_offset: u64,
32 end_offset: u64,
33
34 extents: Vec<Extent>,
37
38 initial_extent: Option<Extent>,
42 trace: AtomicBool,
43}
44
45impl BootstrapObjectHandle {
46 pub fn new(object_id: u64, device: Arc<dyn Device>, initial_extent: Range<u64>) -> Self {
47 Self {
48 object_id,
49 device,
50 start_offset: 0,
51 end_offset: initial_extent.end - initial_extent.start,
52 extents: Vec::new(),
53 initial_extent: Some(Extent { added_offset: 0, device_range: initial_extent }),
54 trace: AtomicBool::new(false),
55 }
56 }
57
58 pub fn new_with_start_offset(
59 object_id: u64,
60 device: Arc<dyn Device>,
61 start_offset: u64,
62 ) -> Self {
63 Self {
64 object_id,
65 device,
66 start_offset,
67 end_offset: start_offset,
68 extents: Vec::new(),
69 initial_extent: None,
70 trace: AtomicBool::new(false),
71 }
72 }
73}
74
75impl ObjectHandle for BootstrapObjectHandle {
76 fn object_id(&self) -> u64 {
77 self.object_id
78 }
79
80 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
81 self.device.allocate_buffer(size)
82 }
83
84 fn block_size(&self) -> u64 {
85 self.device.block_size().into()
86 }
87
88 fn set_trace(&self, trace: bool) {
89 let old_value = self.trace.swap(trace, Ordering::Relaxed);
90 if trace != old_value {
91 info!(oid = self.object_id, trace; "JH: trace");
92 }
93 }
94}
95
96#[async_trait]
97impl ReadObjectHandle for BootstrapObjectHandle {
98 async fn read(&self, mut offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
99 assert!(offset >= self.start_offset);
100 let trace = self.trace.load(Ordering::Relaxed);
101 if trace {
102 info!(len = buf.len(), offset; "JH: read");
103 }
104 let len = buf.len();
105 let mut buf_offset = 0;
106 let mut file_offset = self.start_offset;
107 let extents = if let Some(initial_extent) = &self.initial_extent {
108 std::slice::from_ref(initial_extent)
109 } else {
110 &self.extents
111 };
112 for extent in extents {
113 let device_range = &extent.device_range;
114 let extent_len = device_range.end - device_range.start;
115 if offset < file_offset + extent_len {
116 if trace {
117 info!(device_range:?; "JH: matching extent");
118 }
119 let device_offset = device_range.start + offset - file_offset;
120 let to_read =
121 min(device_range.end - device_offset, (len - buf_offset) as u64) as usize;
122 assert!(buf_offset % self.device.block_size() as usize == 0);
123 self.device
124 .read(
125 device_offset,
126 buf.reborrow().subslice_mut(buf_offset..buf_offset + to_read),
127 )
128 .await?;
129 buf_offset += to_read;
130 if buf_offset == len {
131 break;
132 }
133 offset += to_read as u64;
134 }
135 file_offset += extent_len;
136 }
137 Ok(buf_offset)
138 }
139
140 fn get_size(&self) -> u64 {
141 self.end_offset
142 }
143}
144
145impl JournalHandle for BootstrapObjectHandle {
146 fn end_offset(&self) -> Option<u64> {
147 Some(self.end_offset)
148 }
149
150 fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>) {
151 if self.initial_extent.is_some() {
152 self.initial_extent = None;
153 self.end_offset = 0;
154 }
155 self.end_offset += device_range.length().unwrap();
156 debug_assert!(
157 self.extents.last().map_or(true, |e| e.added_offset <= added_offset),
158 "last extent added at {}; this added at {added_offset}",
159 self.extents.last().unwrap().added_offset
160 );
161 self.extents.push(Extent { added_offset, device_range });
162 }
163
164 fn discard_extents(&mut self, discard_offset: u64) {
165 let index = self.extents.partition_point(|extent| extent.added_offset < discard_offset);
166 if index == self.extents.len() {
167 return;
168 }
169 let discarded = self.extents.drain(index..);
170 let trace = self.trace.load(Ordering::Relaxed);
171 for extent in discarded {
172 self.end_offset -= extent.device_range.length().unwrap();
173 if trace {
174 info!(discard_offset, extent:?; "JH: Discarded extent");
175 }
176 }
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::BootstrapObjectHandle;
183 use crate::object_handle::ReadObjectHandle as _;
184 use crate::object_store::journal::JournalHandle as _;
185 use std::sync::Arc;
186 use storage_device::fake_device::FakeDevice;
187 use storage_device::Device as _;
188
189 #[fuchsia::test]
190 async fn test_initial_extent() {
191 let device = Arc::new(FakeDevice::new(64, 512));
192 let mut buffer = device.allocate_buffer(1024).await;
193 buffer.as_mut_slice().fill(1);
194 device.write(0, buffer.as_ref()).await.unwrap();
195 buffer.as_mut_slice().fill(2);
196 device.write(1024, buffer.as_ref()).await.unwrap();
197 buffer.as_mut_slice().fill(0);
198
199 let handle = BootstrapObjectHandle::new(1, device.clone(), 0..0);
200 assert_eq!(handle.get_size(), 0);
201 assert_eq!(handle.read(0, buffer.as_mut()).await.expect("no initial data"), 0);
202
203 let mut handle = BootstrapObjectHandle::new(1, device.clone(), 1024..2048);
204 assert_eq!(handle.get_size(), 1024);
205 handle.read(0, buffer.as_mut()).await.expect("read implicit extent");
206 assert_eq!(buffer.as_slice(), &[2u8; 1024]);
207 handle.push_extent(0, 0..1024);
208 handle.read(0, buffer.as_mut()).await.expect("read first explicit extent");
209 assert_eq!(buffer.as_slice(), &[1u8; 1024]);
210 }
211
212 #[fuchsia::test]
213 async fn test_discard_extents() {
214 let device = Arc::new(FakeDevice::new(64, 512));
215 let mut handle = BootstrapObjectHandle::new(1, device.clone(), 0..0);
216
217 let mut buffer = device.allocate_buffer(1024).await;
218 buffer.as_mut_slice().fill(1);
219 device.write(0, buffer.as_ref()).await.unwrap();
220 buffer.as_mut_slice().fill(2);
221 device.write(1024, buffer.as_ref()).await.unwrap();
222 buffer.as_mut_slice().fill(0);
223
224 handle.push_extent(0, 1024..2048);
225 handle.push_extent(131072, 0..1024);
226
227 assert_eq!(handle.get_size(), 2048);
228 handle.read(0, buffer.as_mut()).await.unwrap();
229 assert_eq!(buffer.as_slice(), &[2u8; 1024]);
230 handle.read(1024, buffer.as_mut()).await.unwrap();
231 assert_eq!(buffer.as_slice(), &[1u8; 1024]);
232
233 handle.discard_extents(131073);
235
236 assert_eq!(handle.get_size(), 2048);
237 assert_eq!(handle.read(0, buffer.as_mut()).await.unwrap(), 1024);
238 assert_eq!(buffer.as_slice(), &[2u8; 1024]);
239 assert_eq!(handle.read(1024, buffer.as_mut()).await.unwrap(), 1024);
240 assert_eq!(buffer.as_slice(), &[1u8; 1024]);
241
242 handle.discard_extents(131072);
244
245 assert_eq!(handle.get_size(), 1024);
246 assert_eq!(handle.read(0, buffer.as_mut()).await.unwrap(), 1024);
247 assert_eq!(buffer.as_slice(), &[2u8; 1024]);
248 assert_eq!(handle.read(1024, buffer.as_mut()).await.unwrap(), 0);
249 }
250}