1use crate::fuchsia::file::FxFile;
6use crate::fuchsia::fxblob::BlobDirectory;
7use crate::fuchsia::fxblob::blob::FxBlob;
8use crate::fuchsia::node::{FxNode, OpenedNode};
9use crate::fuchsia::pager::PagerBacked;
10use crate::fuchsia::volume::FxVolume;
11use anyhow::{Context as _, Error, anyhow, ensure};
12use arrayref::{array_refs, mut_array_refs};
13use async_trait::async_trait;
14use fuchsia_async as fasync;
15use fuchsia_hash::Hash;
16use futures::future::{self, BoxFuture, RemoteHandle, join_all};
17use futures::lock::Mutex;
18use futures::{FutureExt, select};
19use fxfs::errors::FxfsError;
20use fxfs::log::*;
21use fxfs::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle};
22use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
23use fxfs::object_store::{
24 HandleOptions, ObjectDescriptor, ObjectStore, Timestamp, VOLUME_DATA_KEY_ID, directory,
25};
26use linked_hash_map::LinkedHashMap;
27use scopeguard::ScopeGuard;
28use std::cmp::{Eq, PartialEq};
29use std::collections::btree_map::{BTreeMap, Entry};
30use std::marker::PhantomData;
31use std::mem::size_of;
32use std::pin::pin;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicU64, Ordering};
35use vfs::execution_scope::ActiveGuard;
36
37const FILE_OPEN_MARKER: u64 = u64::MAX;
38const REPLAY_THREADS: usize = 2;
39const MESSAGE_CHUNK_SIZE: usize = 64;
42const IO_SIZE: usize = 1 << 17; pub static RECORDED: AtomicU64 = AtomicU64::new(0);
45
46trait RecordedVolume: Send + Sync + Sized + Unpin {
47 type IdType: std::fmt::Display + Ord + Send + Sized;
48 type NodeType: PagerBacked;
49 type MessageType: Message<IdType = Self::IdType>;
50
51 fn new(volume: Arc<FxVolume>) -> Self;
52
53 fn volume(&self) -> &Arc<FxVolume>;
54
55 fn open(
56 &self,
57 id: Self::IdType,
58 ) -> impl std::future::Future<Output = Result<OpenedNode<Self::NodeType>, Error>> + Send;
59
60 fn file_is_replayable(
62 &self,
63 id: &Self::IdType,
64 ) -> impl std::future::Future<Output = bool> + Send;
65
66 fn read_and_queue(
67 &self,
68 handle: Box<dyn ReadObjectHandle>,
69 sender: &async_channel::Sender<Request<Self::NodeType>>,
70 local_cache: &mut BTreeMap<Self::IdType, Option<OpenedNode<Self::NodeType>>>,
71 ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
72 async move {
73 let mut io_buf = handle.allocate_buffer(IO_SIZE).await;
74 let block_size = handle.block_size() as usize;
75 let file_size = handle.get_size() as usize;
76 let mut offset = 0;
77 while offset < file_size {
78 let actual = handle
79 .read(offset as u64, io_buf.as_mut())
80 .await
81 .map_err(|e| e.context(format!("Failed to read at offset: {}", offset)))?;
82 offset += actual;
83 let mut local_offset = 0;
84 let mut next_block = block_size;
85 let mut next_offset = size_of::<Self::MessageType>();
86 while next_offset <= actual {
87 let msg = Self::MessageType::decode_from(
88 &io_buf.as_slice()[local_offset..next_offset],
89 );
90
91 local_offset = next_offset;
92 next_offset = local_offset + size_of::<Self::MessageType>();
93 if next_offset > next_block {
95 local_offset = next_block;
96 next_offset = local_offset + size_of::<Self::MessageType>();
97 next_block += block_size;
98 }
99
100 if msg.is_zeroes() {
103 break;
104 }
105
106 let file = match local_cache.entry(msg.id()) {
107 Entry::Occupied(entry) => match entry.get() {
108 Some(opened_file) => (*opened_file).clone(),
109 None => continue,
111 },
112 Entry::Vacant(entry) => match self.open(msg.id()).await {
113 Err(e) => {
114 debug!("Failed to open object {} from profile: {:?}", msg.id(), e);
115 entry.insert(None);
117 continue;
118 }
119 Ok(opened_file) => {
120 let file_clone = (*opened_file).clone();
121 entry.insert(Some(opened_file));
122 file_clone
123 }
124 },
125 };
126
127 sender.send(Request { file, offset: msg.offset() }).await?;
128 }
129 }
130 Ok(())
131 }
132 }
133
134 fn record(
135 &self,
136 name: &str,
137 receiver: async_channel::Receiver<Vec<Self::MessageType>>,
138 ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
139 async move {
140 let mut recorded_offsets = LinkedHashMap::<Self::MessageType, ()>::new();
141 let mut recorded_opens = BTreeMap::<Self::IdType, bool>::new();
142 while let Ok(buffer) = receiver.recv().await {
143 for message in buffer {
144 if message.is_open_marker() {
145 if let Entry::Vacant(entry) = recorded_opens.entry(message.id()) {
146 let usable = self.file_is_replayable(entry.key()).await;
147 entry.insert(usable);
148 }
149 } else {
150 recorded_offsets.insert(message, ());
151 }
152 }
153 }
154
155 let store = self.volume().store();
158 let fs = store.filesystem();
159 let mut transaction =
160 fs.clone().new_transaction(lock_keys![], Options::default()).await?;
161 let handle = ObjectStore::create_object(
162 self.volume(),
163 &mut transaction,
164 HandleOptions::default(),
165 None,
166 )
167 .await?;
168 store.add_to_graveyard(&mut transaction, handle.object_id());
169 transaction.commit().await?;
170
171 let clean_up = scopeguard::guard((), |_| {
172 fs.graveyard().queue_tombstone_object(store.store_object_id(), handle.object_id())
173 });
174
175 let block_size = handle.block_size() as usize;
176 let mut offset = 0;
177 let mut io_buf = handle.allocate_buffer(IO_SIZE).await;
178 let mut next_block = block_size;
179 while let Some((message, _)) = recorded_offsets.pop_front() {
180 if !recorded_opens.get(&message.id()).copied().unwrap_or(false) {
182 continue;
183 }
184
185 let mut next_offset = offset + size_of::<Self::MessageType>();
186 if next_offset > next_block {
187 io_buf.as_mut_slice()[offset..next_block].fill(0);
190 if next_block >= IO_SIZE {
191 handle
193 .write_or_append(None, io_buf.as_ref())
194 .await
195 .context("Failed to write profile block")?;
196 offset = 0;
197 next_offset = size_of::<Self::MessageType>();
198 next_block = block_size;
199 } else {
200 offset = next_block;
201 next_offset = offset + size_of::<Self::MessageType>();
202 next_block += block_size;
203 }
204 }
205 message.encode_to(&mut io_buf.as_mut_slice()[offset..next_offset]);
206 offset = next_offset;
207 }
208 if offset > 0 {
209 io_buf.as_mut_slice()[offset..next_block].fill(0);
210 handle
211 .write_or_append(None, io_buf.subslice(0..next_block))
212 .await
213 .context("Failed to write profile block")?;
214 }
215
216 let profile_dir = self.volume().get_profile_directory().await?;
217
218 let mut lock_keys =
219 lock_keys![LockKey::object(store.store_object_id(), profile_dir.object_id())];
220 let mut old_id = INVALID_OBJECT_ID;
221 let mut transaction = loop {
222 let transaction = fs.clone().new_transaction(lock_keys, Options::default()).await?;
223 if let Some((id, descriptor, _)) = profile_dir.lookup(name).await? {
224 ensure!(matches!(descriptor, ObjectDescriptor::File), FxfsError::Inconsistent);
225 if id == old_id {
226 break transaction;
227 }
228 lock_keys = lock_keys![
229 LockKey::object(store.store_object_id(), profile_dir.object_id()),
230 LockKey::object(store.store_object_id(), id)
231 ];
232 old_id = id;
233 } else {
234 old_id = INVALID_OBJECT_ID;
235 break transaction;
236 }
237 };
238
239 store.remove_from_graveyard(&mut transaction, handle.object_id());
240 directory::replace_child_with_object(
241 &mut transaction,
242 Some((handle.object_id(), ObjectDescriptor::File)),
243 (&profile_dir, name),
244 0,
245 Timestamp::now(),
246 )
247 .await?;
248 transaction.commit().await?;
249
250 ScopeGuard::into_inner(clean_up);
251 if old_id != INVALID_OBJECT_ID {
252 fs.graveyard().queue_tombstone_object(store.store_object_id(), old_id);
253 }
254
255 Ok(())
256 }
257 }
258}
259
260struct BlobVolume {
261 volume: Arc<FxVolume>,
262 root_dir: Mutex<Option<Arc<BlobDirectory>>>,
265}
266
267impl RecordedVolume for BlobVolume {
268 type IdType = Hash;
269 type NodeType = FxBlob;
270 type MessageType = BlobMessage;
271
272 fn new(volume: Arc<FxVolume>) -> Self {
273 Self { volume, root_dir: Mutex::new(None) }
274 }
275
276 fn volume(&self) -> &Arc<FxVolume> {
277 &self.volume
278 }
279
280 async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
281 let mut root_dir = self.root_dir.lock().await;
282 if root_dir.is_none() {
283 *root_dir = Some(
284 self.volume
285 .get_or_load_node(
286 self.volume.store().root_directory_object_id(),
287 ObjectDescriptor::Directory,
288 None,
289 )
290 .await?
291 .into_any()
292 .downcast::<BlobDirectory>()
293 .map_err(|_| FxfsError::Inconsistent)?,
294 );
295 };
296 root_dir
297 .as_ref()
298 .unwrap()
299 .open_blob(&id.into())
300 .await?
301 .ok_or_else(|| FxfsError::NotFound.into())
302 }
303
304 async fn file_is_replayable(&self, _id: &Self::IdType) -> bool {
305 true
307 }
308}
309
310struct FileVolume {
311 volume: Arc<FxVolume>,
312}
313
314impl RecordedVolume for FileVolume {
315 type IdType = u64;
316 type NodeType = FxFile;
317 type MessageType = FileMessage;
318
319 fn new(volume: Arc<FxVolume>) -> Self {
320 Self { volume }
321 }
322
323 fn volume(&self) -> &Arc<FxVolume> {
324 &self.volume
325 }
326
327 async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
328 self.volume
329 .get_or_load_node(id, ObjectDescriptor::File, None)
330 .await?
331 .into_any()
332 .downcast::<FxFile>()
333 .map_err(|_| anyhow!("Non-file opened"))?
334 .into_opened_node()
335 .ok_or_else(|| anyhow!("File being purged"))
336 }
337
338 async fn file_is_replayable(&self, id: &Self::IdType) -> bool {
339 match self.volume.store().get_keys(*id).await {
340 Ok(keys)
343 if keys.is_empty()
344 || (keys.len() == 1 && keys.first().unwrap().0 == VOLUME_DATA_KEY_ID) =>
345 {
346 true
347 }
348 _ => false,
349 }
350 }
351}
352
353trait Message: Eq + PartialEq + Sized + Send + Sync + std::hash::Hash + 'static {
354 type IdType: std::fmt::Display + Ord + Send + Sized;
355
356 fn id(&self) -> Self::IdType;
357 fn offset(&self) -> u64;
358 fn encode_to(&self, dest: &mut [u8]);
359 fn decode_from(src: &[u8]) -> Self;
360 fn is_zeroes(&self) -> bool;
361 fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error>;
362 fn is_open_marker(&self) -> bool;
363}
364
365#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
366struct BlobMessage {
367 id: Hash,
368 offset: u64,
371}
372
373impl BlobMessage {
374 fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
375 let (first, second) = mut_array_refs![dest, size_of::<Hash>(), size_of::<u64>()];
376 *first = self.id.into();
377 *second = self.offset.to_le_bytes();
378 }
379
380 fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
381 let (first, second) = array_refs!(src, size_of::<Hash>(), size_of::<u64>());
382 Self { id: Hash::from_array(*first), offset: u64::from_le_bytes(*second) }
383 }
384}
385
386impl Message for BlobMessage {
387 type IdType = Hash;
388
389 fn id(&self) -> Self::IdType {
390 self.id
391 }
392
393 fn offset(&self) -> u64 {
394 self.offset
395 }
396
397 fn encode_to(&self, dest: &mut [u8]) {
398 self.encode_to_impl(dest.try_into().unwrap());
399 }
400
401 fn decode_from(src: &[u8]) -> Self {
402 Self::decode_from_impl(src.try_into().unwrap())
403 }
404
405 fn is_zeroes(&self) -> bool {
406 self.id == Hash::from_array([0u8; size_of::<Hash>()]) && self.offset == 0
407 }
408
409 fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
410 match node.into_any().downcast::<FxBlob>() {
411 Ok(blob) => Ok(Self { id: blob.root(), offset }),
412 Err(_) => Err(anyhow!("Cannot record non-blob entry.")),
413 }
414 }
415
416 fn is_open_marker(&self) -> bool {
417 self.offset == FILE_OPEN_MARKER
418 }
419}
420
421#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
422struct FileMessage {
423 id: u64,
424 offset: u64,
427}
428
429impl FileMessage {
430 fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
431 let (first, second) = mut_array_refs![dest, size_of::<u64>(), size_of::<u64>()];
432 *first = self.id.to_le_bytes();
433 *second = self.offset.to_le_bytes();
434 }
435
436 fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
437 let (first, second) = array_refs!(src, size_of::<u64>(), size_of::<u64>());
438 Self { id: u64::from_le_bytes(*first), offset: u64::from_le_bytes(*second) }
439 }
440}
441
442impl Message for FileMessage {
443 type IdType = u64;
444
445 fn id(&self) -> Self::IdType {
446 self.id
447 }
448
449 fn offset(&self) -> u64 {
450 self.offset
451 }
452
453 fn encode_to(&self, dest: &mut [u8]) {
454 self.encode_to_impl(dest.try_into().unwrap())
455 }
456
457 fn decode_from(src: &[u8]) -> Self {
458 Self::decode_from_impl(src.try_into().unwrap())
459 }
460
461 fn is_zeroes(&self) -> bool {
462 self.id == 0 && self.offset == 0
463 }
464
465 fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
466 match node.into_any().downcast::<FxFile>() {
467 Ok(file) => Ok(Self { id: file.object_id(), offset }),
468 Err(_) => Err(anyhow!("Cannot record non-file entry")),
469 }
470 }
471
472 fn is_open_marker(&self) -> bool {
473 self.offset == FILE_OPEN_MARKER
474 }
475}
476
477pub trait Recorder: Send + Sync {
480 fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error>;
482
483 fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error>;
485}
486
487struct RecorderImpl<T: Message> {
488 sender: async_channel::Sender<Vec<T>>,
489 buffer: Vec<T>,
490}
491
492impl<T: Message> RecorderImpl<T> {
493 fn new(sender: async_channel::Sender<Vec<T>>) -> Self {
494 Self { sender, buffer: Vec::with_capacity(MESSAGE_CHUNK_SIZE) }
495 }
496}
497
498impl<T: Message> Recorder for RecorderImpl<T> {
499 fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error> {
500 self.buffer.push(T::from_node_request(node, offset)?);
501 if self.buffer.len() >= MESSAGE_CHUNK_SIZE {
502 self.sender.try_send(std::mem::replace(
505 &mut self.buffer,
506 Vec::with_capacity(MESSAGE_CHUNK_SIZE),
507 ))?;
508 }
509 RECORDED.fetch_add(1, Ordering::Relaxed);
510 Ok(())
511 }
512
513 fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error> {
514 self.record(node, FILE_OPEN_MARKER)
515 }
516}
517
518impl<T: Message> Drop for RecorderImpl<T> {
519 fn drop(&mut self) {
520 if self.buffer.len() > 0 {
522 let buffer = std::mem::take(&mut self.buffer);
523 let _ = self.sender.try_send(buffer);
524 }
525 }
526}
527
528struct Request<P: PagerBacked> {
529 file: Arc<P>,
530 offset: u64,
531}
532
533struct ReplayState<T> {
534 replay_threads: future::Shared<BoxFuture<'static, ()>>,
535 _cache_task: fasync::Task<()>,
536 _phantom: PhantomData<T>,
537}
538
539impl<T: RecordedVolume> ReplayState<T> {
540 fn new(handle: Box<dyn ReadObjectHandle>, volume: Arc<FxVolume>, guard: ActiveGuard) -> Self {
541 let (sender, receiver) = async_channel::unbounded::<Request<T::NodeType>>();
542
543 let mut replay_threads = Vec::with_capacity(REPLAY_THREADS);
546 for _ in 0..REPLAY_THREADS {
547 let receiver = receiver.clone();
548 let guard = guard.clone();
551 replay_threads.push(fasync::unblock(move || {
552 let _guard = guard;
553 Self::page_in_thread(receiver);
554 }));
555 }
556 let replay_threads = (Box::pin(async {
557 join_all(replay_threads).await;
558 }) as BoxFuture<'static, ()>)
559 .shared();
560
561 let scope = volume.scope().clone();
562 let cache_task = scope
563 .spawn({
564 async move {
568 let mut task = pin!(
569 async {
570 let mut local_cache: BTreeMap<
574 T::IdType,
575 Option<OpenedNode<T::NodeType>>,
576 > = BTreeMap::new();
577
578 let volume_id = volume.id();
579
580 if let Err(error) = T::new(volume)
581 .read_and_queue(handle, &sender, &mut local_cache)
582 .await
583 {
584 error!(error:?; "Failed to read back profile");
585 }
586 sender.close();
587
588 info!(
589 "Replay for volume {} opened {} of {} objects.",
590 volume_id,
591 local_cache.iter().filter(|(_, e)| e.is_some()).count(),
592 local_cache.len()
593 );
594
595 let () = std::future::pending().await;
597 }
598 .fuse()
599 );
600
601 select! {
602 _ = task => {}
603 _ = guard.on_cancel().fuse() => {}
604 }
605 }
606 })
607 .into();
608
609 Self { replay_threads, _cache_task: cache_task, _phantom: PhantomData }
610 }
611
612 fn page_in_thread(queue: async_channel::Receiver<Request<T::NodeType>>) {
613 while let Ok(request) = queue.recv_blocking() {
614 let res = request.file.vmo().op_range(
615 zx::VmoOp::PREFETCH,
616 request.offset,
617 zx::system_get_page_size() as u64,
618 );
619 if let Err(e) = res {
620 warn!("Failed to prefetch page: {:?}", e);
621 }
622 if queue.sender_count() == 0 {
624 return;
625 }
626 }
627 }
628}
629
630#[async_trait]
633pub trait ProfileState: Send + Sync {
634 fn record_new(&mut self, volume: &Arc<FxVolume>, name: &str) -> Box<dyn Recorder>;
638
639 fn replay_profile(
642 &mut self,
643 handle: Box<dyn ReadObjectHandle>,
644 volume: Arc<FxVolume>,
645 guard: ActiveGuard,
646 );
647
648 async fn wait_for_replay_to_finish(&mut self);
651
652 async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error>;
654}
655
656pub fn new_profile_state(is_blob: bool) -> Box<dyn ProfileState> {
657 if is_blob {
658 Box::new(ProfileStateImpl::<BlobVolume>::new())
659 } else {
660 Box::new(ProfileStateImpl::<FileVolume>::new())
661 }
662}
663
664struct ProfileStateImpl<T> {
665 recording: Option<RemoteHandle<Result<(), Error>>>,
666 replay: Option<ReplayState<T>>,
667}
668
669impl<T> ProfileStateImpl<T> {
670 fn new() -> Self {
671 Self { recording: None, replay: None }
672 }
673}
674
675#[async_trait]
676impl<T: RecordedVolume> ProfileState for ProfileStateImpl<T> {
677 fn record_new(&mut self, volume: &Arc<FxVolume>, name: &str) -> Box<dyn Recorder> {
678 let (sender, receiver) = async_channel::unbounded();
679 let volume = volume.clone();
680 let name = name.to_string();
681 self.recording = None;
683 let scope = volume.scope().clone();
684 let (task, remote_handle) = async move {
685 let recording = T::new(volume);
686 recording
687 .record(&name, receiver)
688 .await
689 .inspect_err(|error| warn!(error:?; "Profile recording '{name}' failed"))
690 }
691 .remote_handle();
692 self.recording = Some(remote_handle);
693 scope.spawn(task);
694 Box::new(RecorderImpl::new(sender))
695 }
696
697 fn replay_profile(
698 &mut self,
699 handle: Box<dyn ReadObjectHandle>,
700 volume: Arc<FxVolume>,
701 guard: ActiveGuard,
702 ) {
703 self.replay = Some(ReplayState::new(handle, volume, guard));
704 }
705
706 async fn wait_for_replay_to_finish(&mut self) {
707 if let Some(replay) = &mut self.replay {
708 replay.replay_threads.clone().await;
709 }
710 }
711
712 async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error> {
713 if let Some(recording) = self.recording.take() { recording.await } else { Ok(()) }
714 }
715}
716
717#[cfg(test)]
718mod tests {
719 use super::{
720 BlobMessage, BlobVolume, FileMessage, FileVolume, IO_SIZE, Message, RecordedVolume,
721 Request, new_profile_state,
722 };
723 use crate::fuchsia::file::FxFile;
724 use crate::fuchsia::fxblob::blob::FxBlob;
725 use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture, open_blob_fixture};
726 use crate::fuchsia::node::{FxNode, OpenedNode};
727 use crate::fuchsia::pager::PagerBacked;
728 use crate::fuchsia::testing::{TestFixture, TestFixtureOptions, open_file_checked};
729 use crate::fuchsia::volume::FxVolume;
730 use anyhow::Error;
731 use async_trait::async_trait;
732 use delivery_blob::CompressionMode;
733 use event_listener::{Event, EventListener};
734 use fidl_fuchsia_io as fio;
735 use fuchsia_async as fasync;
736 use fuchsia_hash::Hash;
737 use fuchsia_sync::Mutex;
738 use fxfs::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
739 use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
740 use fxfs::object_store::{DataObjectHandle, HandleOptions, ObjectDescriptor, ObjectStore};
741 use std::collections::BTreeMap;
742 use std::mem::size_of;
743 use std::sync::Arc;
744 use std::time::Duration;
745 use storage_device::buffer::{BufferRef, MutableBufferRef};
746 use storage_device::buffer_allocator::{BufferAllocator, BufferFuture, BufferSource};
747
748 struct FakeReaderWriterInner {
749 data: Vec<u8>,
750 delays: Vec<EventListener>,
751 }
752
753 struct FakeReaderWriter {
754 allocator: BufferAllocator,
755 inner: Arc<Mutex<FakeReaderWriterInner>>,
756 }
757
758 const BLOCK_SIZE: usize = 4096;
759
760 impl FakeReaderWriter {
761 fn new() -> Self {
762 Self {
763 allocator: BufferAllocator::new(BLOCK_SIZE, BufferSource::new(IO_SIZE * 2)),
764 inner: Arc::new(Mutex::new(FakeReaderWriterInner {
765 data: Vec::new(),
766 delays: Vec::new(),
767 })),
768 }
769 }
770
771 fn push_delay(&self, delay: EventListener) {
772 self.inner.lock().delays.insert(0, delay);
773 }
774 }
775
776 impl ObjectHandle for FakeReaderWriter {
777 fn object_id(&self) -> u64 {
778 0
779 }
780
781 fn block_size(&self) -> u64 {
782 self.allocator.block_size() as u64
783 }
784
785 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
786 self.allocator.allocate_buffer(size)
787 }
788 }
789
790 impl WriteObjectHandle for FakeReaderWriter {
791 async fn write_or_append(
792 &self,
793 offset: Option<u64>,
794 buf: BufferRef<'_>,
795 ) -> Result<u64, Error> {
796 assert!(offset.is_none());
798 let delay = self.inner.lock().delays.pop();
799 if let Some(delay) = delay {
800 delay.await;
801 }
802 self.inner.lock().data.extend_from_slice(buf.as_slice());
804 Ok(buf.len() as u64)
805 }
806
807 async fn truncate(&self, _size: u64) -> Result<(), Error> {
808 unreachable!();
809 }
810
811 async fn flush(&self) -> Result<(), Error> {
812 unreachable!();
813 }
814 }
815
816 async fn write_file(fixture: &TestFixture, name: &str, data: &[u8]) -> u64 {
817 let root_dir = fixture.volume().root_dir();
818 let mut transaction = fixture
819 .volume()
820 .volume()
821 .store()
822 .filesystem()
823 .new_transaction(
824 lock_keys![LockKey::object(
825 fixture.volume().volume().store().store_object_id(),
826 root_dir.object_id()
827 )],
828 Options::default(),
829 )
830 .await
831 .expect("Creating transaction for new file");
832 let id = root_dir
833 .directory()
834 .create_child_file(&mut transaction, name)
835 .await
836 .expect("Creating new_file")
837 .object_id();
838 transaction.commit().await.unwrap();
839 let file = open_file_checked(
840 fixture.root(),
841 name,
842 fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_FILE,
843 &Default::default(),
844 )
845 .await;
846 file.write(data).await.unwrap().expect("Writing file");
847 id
848 }
849
850 #[async_trait]
851 impl ReadObjectHandle for FakeReaderWriter {
852 async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
853 let delay = self.inner.lock().delays.pop();
854 if let Some(delay) = delay {
855 delay.await;
856 }
857 let inner = self.inner.lock();
859 assert!(offset as usize <= inner.data.len());
860 let offset_end = std::cmp::min(offset as usize + buf.len(), inner.data.len());
861 let size = offset_end - offset as usize;
862 buf.as_mut_slice()[..size].clone_from_slice(&inner.data[offset as usize..offset_end]);
863 Ok(size)
864 }
865
866 fn get_size(&self) -> u64 {
867 self.inner.lock().data.len() as u64
868 }
869 }
870
871 #[fuchsia::test]
872 async fn test_encode_decode_blob() {
873 let mut buf = [0u8; size_of::<BlobMessage>()];
874 let m = BlobMessage { id: [88u8; 32].into(), offset: 77 };
875 m.encode_to(&mut buf.as_mut_slice());
876 let m2 = BlobMessage::decode_from(&buf);
877 assert_eq!(m, m2);
878 }
879
880 #[fuchsia::test]
881 async fn test_encode_decode_file() {
882 let mut buf = [0u8; size_of::<FileMessage>()];
883 let m = FileMessage { id: 88, offset: 77 };
884 m.encode_to(&mut buf.as_mut_slice());
885 let m2 = FileMessage::decode_from(&buf);
886 assert!(!m2.is_zeroes());
887 assert_eq!(m, m2);
888 }
889
890 const TEST_PROFILE_NAME: &str = "test_profile";
891
892 async fn get_test_profile_handle(volume: &Arc<FxVolume>) -> DataObjectHandle<FxVolume> {
893 let profile_dir = volume.get_profile_directory().await.unwrap();
894 ObjectStore::open_object(
895 volume,
896 profile_dir
897 .lookup(TEST_PROFILE_NAME)
898 .await
899 .expect("lookup failed")
900 .expect("not found")
901 .0,
902 HandleOptions::default(),
903 None,
904 )
905 .await
906 .unwrap()
907 }
908
909 async fn get_test_profile_contents(volume: &Arc<FxVolume>) -> Vec<u8> {
910 get_test_profile_handle(volume).await.contents(1024 * 1024).await.unwrap().to_vec()
911 }
912
913 #[fuchsia::test]
914 async fn test_recording_basic_blob() {
915 let fixture = new_blob_fixture().await;
916 {
917 let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
918 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
919
920 let mut state = new_profile_state(true);
921 let volume = fixture.volume().volume();
922
923 {
924 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
926 recorder.record(blob.clone(), 0).unwrap();
927 recorder.record_open(blob).unwrap();
928 }
929
930 state.wait_for_recording_to_finish().await.unwrap();
931
932 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
933 }
934 fixture.close().await;
935 }
936
937 #[fuchsia::test]
938 async fn test_recording_basic_file() {
939 let fixture = TestFixture::new().await;
940 {
941 let id = write_file(&fixture, "foo", &[88u8]).await;
942 let node = fixture
943 .volume()
944 .volume()
945 .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
946 .await
947 .unwrap();
948
949 let mut state = new_profile_state(false);
950 let volume = fixture.volume().volume();
951
952 {
953 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
955 recorder.record(node.clone(), 0).unwrap();
956 recorder.record_open(node).unwrap();
957 }
958 state.wait_for_recording_to_finish().await.unwrap();
959
960 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
961 }
962 fixture.close().await;
963 }
964
965 #[fuchsia::test]
966 async fn test_recording_filtered_without_open() {
967 let fixture = new_blob_fixture().await;
968 {
969 let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
970 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
971
972 let mut state = new_profile_state(true);
973 let volume = fixture.volume().volume();
974
975 {
976 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
978 recorder.record(blob.clone(), 0).unwrap();
979 }
980 state.wait_for_recording_to_finish().await.unwrap();
981
982 assert_eq!(get_test_profile_contents(volume).await.len(), 0);
983 }
984 fixture.close().await;
985 }
986
987 #[fuchsia::test]
988 async fn test_recording_blob_more_than_block() {
989 let mut state = new_profile_state(true);
990
991 let fixture = new_blob_fixture().await;
992 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
993 let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
994 let hash;
995 let volume = fixture.volume().volume();
996
997 {
998 hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
999 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1000 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1002 recorder.record_open(blob.clone()).unwrap();
1003 for i in 0..message_count {
1004 recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1005 }
1006 }
1007 state.wait_for_recording_to_finish().await.unwrap();
1008
1009 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1010
1011 let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1012 let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1013
1014 let volume = fixture.volume().volume().clone();
1015 let task = fasync::Task::spawn(async move {
1016 let handle = Box::new(get_test_profile_handle(&volume).await);
1017 let blob = BlobVolume::new(volume);
1018 blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1019 });
1020
1021 let mut recv_count = 0;
1022 while let Ok(msg) = receiver.recv().await {
1023 assert_eq!(msg.file.root(), hash);
1024 assert_eq!(msg.offset, 4096 * recv_count);
1025 recv_count += 1;
1026 }
1027 task.await;
1028 assert_eq!(recv_count, message_count as u64);
1029
1030 fixture.close().await;
1031 }
1032
1033 #[fuchsia::test]
1034 async fn test_recording_file_more_than_block() {
1035 let mut state = new_profile_state(false);
1036
1037 let fixture = TestFixture::new().await;
1038 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1039 let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1040 let id;
1041 let volume = fixture.volume().volume();
1042 {
1043 id = write_file(&fixture, "foo", &[88u8]).await;
1044 let node = volume
1045 .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1046 .await
1047 .unwrap();
1048 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1050 recorder.record_open(node.clone()).unwrap();
1051 for i in 0..message_count {
1052 recorder.record(node.clone(), 4096 * i as u64).unwrap();
1053 }
1054 }
1055 state.wait_for_recording_to_finish().await.unwrap();
1056
1057 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1058
1059 let mut local_cache: BTreeMap<u64, Option<OpenedNode<FxFile>>> = BTreeMap::new();
1060 let (sender, receiver) = async_channel::unbounded::<Request<FxFile>>();
1061
1062 let volume = fixture.volume().volume().clone();
1063 let task = fasync::Task::spawn(async move {
1064 let handle = Box::new(get_test_profile_handle(&volume).await);
1065 let file = FileVolume::new(volume);
1066 file.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1067 });
1068
1069 let mut recv_count = 0;
1070 while let Ok(msg) = receiver.recv().await {
1071 assert_eq!(msg.file.object_id(), id);
1072 assert_eq!(msg.offset, 4096 * recv_count);
1073 recv_count += 1;
1074 }
1075 task.await;
1076 assert_eq!(recv_count, message_count as u64);
1077
1078 fixture.close().await;
1079 }
1080
1081 #[fuchsia::test]
1082 async fn test_recording_more_than_io_size() {
1083 let fixture = new_blob_fixture().await;
1084
1085 {
1086 let mut state = new_profile_state(true);
1087 let message_count = (IO_SIZE as usize / size_of::<BlobMessage>()) + 1;
1088 let hash;
1089 let volume = fixture.volume().volume();
1090 {
1091 hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
1092 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1093 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1095 recorder.record_open(blob.clone()).unwrap();
1096 for i in 0..message_count {
1097 recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1098 }
1099 }
1100 state.wait_for_recording_to_finish().await.unwrap();
1101 assert_eq!(get_test_profile_contents(volume).await.len(), IO_SIZE + BLOCK_SIZE);
1102
1103 let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1104 let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1105
1106 let volume = volume.clone();
1107 let task = fasync::Task::spawn(async move {
1108 let handle = Box::new(get_test_profile_handle(&volume).await);
1109 let blob = BlobVolume::new(volume);
1110 blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1111 });
1112
1113 let mut recv_count = 0;
1114 while let Ok(msg) = receiver.recv().await {
1115 assert_eq!(msg.file.root(), hash);
1116 assert_eq!(msg.offset, 4096 * recv_count);
1117 recv_count += 1;
1118 }
1119 task.await;
1120 assert_eq!(recv_count, message_count as u64);
1121 }
1122
1123 fixture.close().await;
1124 }
1125
1126 #[fuchsia::test]
1127 async fn test_replay_profile_blob() {
1128 let mut state = new_profile_state(true);
1130
1131 let mut hashes = Vec::new();
1132
1133 let fixture = new_blob_fixture().await;
1134 {
1135 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1136 let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
1137
1138 let volume = fixture.volume().volume();
1139 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1140 for i in 0..message_count {
1142 let hash =
1143 fixture.write_blob(i.to_string().as_bytes(), CompressionMode::Never).await;
1144 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1145 recorder.record_open(blob.clone()).unwrap();
1146 hashes.push(hash);
1147 recorder.record(blob.clone(), 0).unwrap();
1148 }
1149 };
1150 let device = fixture.close().await;
1151 device.ensure_unique();
1152 state.wait_for_recording_to_finish().await.unwrap();
1153
1154 device.reopen(false);
1155 let fixture = open_blob_fixture(device).await;
1156 {
1157 for hash in &hashes {
1160 let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1161 assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1162 }
1163
1164 let volume = fixture.volume().volume();
1165 state.replay_profile(
1166 Box::new(get_test_profile_handle(volume).await),
1167 volume.clone(),
1168 volume.scope().try_active_guard().unwrap(),
1169 );
1170
1171 for hash in &hashes {
1173 let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1174 while blob.vmo().info().unwrap().committed_bytes == 0 {
1175 fasync::Timer::new(Duration::from_millis(25)).await;
1176 }
1177 }
1178 }
1179 fixture.close().await;
1180 }
1181
1182 #[fuchsia::test]
1183 async fn test_replay_profile_file() {
1184 let mut state = new_profile_state(false);
1186
1187 let mut ids = Vec::new();
1188
1189 let fixture = TestFixture::new().await;
1190 {
1191 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1192 let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1193
1194 let volume = fixture.volume().volume();
1195 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1196 for i in 0..message_count {
1198 let id = write_file(&fixture, &i.to_string(), &[88u8]).await;
1199 let node = fixture
1200 .volume()
1201 .volume()
1202 .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1203 .await
1204 .unwrap();
1205 recorder.record_open(node.clone()).unwrap();
1206 ids.push(id);
1207 recorder.record(node.clone(), 0).unwrap();
1208 }
1209 };
1210 let device = fixture.close().await;
1211 device.ensure_unique();
1212 state.wait_for_recording_to_finish().await.unwrap();
1213
1214 device.reopen(false);
1215 let fixture = TestFixture::open(
1216 device,
1217 TestFixtureOptions { encrypted: true, format: false, ..Default::default() },
1218 )
1219 .await;
1220 {
1221 for id in &ids {
1223 let file = fixture
1224 .volume()
1225 .volume()
1226 .get_or_load_node(
1227 *id,
1228 ObjectDescriptor::File,
1229 Some(fixture.volume().root_dir()),
1230 )
1231 .await
1232 .unwrap()
1233 .into_any()
1234 .downcast::<FxFile>()
1235 .unwrap();
1236 assert_eq!(file.vmo().info().unwrap().committed_bytes, 0);
1237 }
1238
1239 let volume = fixture.volume().volume();
1240 state.replay_profile(
1241 Box::new(get_test_profile_handle(volume).await),
1242 volume.clone(),
1243 volume.scope().try_active_guard().unwrap(),
1244 );
1245
1246 for id in &ids {
1248 let file = fixture
1249 .volume()
1250 .volume()
1251 .get_or_load_node(
1252 *id,
1253 ObjectDescriptor::File,
1254 Some(fixture.volume().root_dir()),
1255 )
1256 .await
1257 .unwrap()
1258 .into_any()
1259 .downcast::<FxFile>()
1260 .unwrap();
1261 while file.vmo().info().unwrap().committed_bytes == 0 {
1262 fasync::Timer::new(Duration::from_millis(25)).await;
1263 }
1264 }
1265 state.wait_for_recording_to_finish().await.unwrap();
1266 }
1267 fixture.close().await;
1268 }
1269
1270 #[fuchsia::test]
1271 async fn test_recording_during_replay() {
1272 let mut state = new_profile_state(true);
1273
1274 let hash;
1275 let first_recording;
1276 let fixture = new_blob_fixture().await;
1277 let volume = fixture.volume().volume();
1278
1279 {
1281 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1282 hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1283 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1284 recorder.record_open(blob.clone()).unwrap();
1285 recorder.record(blob.clone(), 0).unwrap();
1286 }
1287
1288 state.wait_for_recording_to_finish().await.unwrap();
1289 first_recording = get_test_profile_contents(volume).await;
1290 assert_ne!(first_recording.len(), 0);
1291 let device = fixture.close().await;
1292 device.ensure_unique();
1293
1294 device.reopen(false);
1295 let fixture = open_blob_fixture(device).await;
1296
1297 {
1298 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1301 assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1302
1303 let volume = fixture.volume().volume();
1305 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1306 recorder.record(blob.clone(), 4096).unwrap();
1307
1308 let volume = fixture.volume().volume();
1310 state.replay_profile(
1311 Box::new(get_test_profile_handle(volume).await),
1312 volume.clone(),
1313 volume.scope().try_active_guard().unwrap(),
1314 );
1315
1316 {
1318 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1319 while blob.vmo().info().unwrap().committed_bytes == 0 {
1320 fasync::Timer::new(Duration::from_millis(25)).await;
1321 }
1322 }
1323
1324 recorder.record_open(blob.clone()).unwrap();
1327 }
1328
1329 state.wait_for_recording_to_finish().await.unwrap();
1330
1331 let volume = fixture.volume().volume();
1332 let second_recording = get_test_profile_contents(volume).await;
1333 assert_ne!(second_recording.len(), 0);
1334 assert_ne!(&second_recording, &first_recording);
1335
1336 fixture.close().await;
1337 }
1338
1339 #[fuchsia::test]
1342 async fn test_replay_profile_stop_reading_early() {
1343 let mut state = new_profile_state(true);
1344 let fixture = new_blob_fixture().await;
1345
1346 {
1347 let volume = fixture.volume().volume();
1348
1349 let message;
1351 {
1352 let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1353 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1354 message = BlobMessage { id: blob.root(), offset: 0 };
1355 }
1356 state.wait_for_recording_to_finish().await.unwrap();
1357
1358 let replay_handle = Box::new(FakeReaderWriter::new());
1360 let mut buff = vec![0u8; IO_SIZE * 2];
1361 message.encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1362 message.encode_to_impl(
1363 (&mut buff[IO_SIZE..IO_SIZE + size_of::<BlobMessage>()]).try_into().unwrap(),
1364 );
1365
1366 replay_handle.inner.lock().data = buff;
1367 let delay1 = Event::new();
1368 replay_handle.push_delay(delay1.listen());
1369 let delay2 = Event::new();
1370 replay_handle.push_delay(delay2.listen());
1371
1372 state.replay_profile(
1373 replay_handle,
1374 volume.clone(),
1375 volume.scope().try_active_guard().unwrap(),
1376 );
1377
1378 fasync::Task::spawn(async move {
1380 fasync::Timer::new(Duration::from_millis(100)).await;
1382 delay1.notify(usize::MAX);
1383 })
1384 .detach();
1385 }
1386
1387 fixture.close().await;
1390 }
1391
1392 #[fuchsia::test]
1393 async fn test_replay_blob_missing() {
1394 let fixture = new_blob_fixture().await;
1395 let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1398 let mut buff = vec![0u8; IO_SIZE];
1399 {
1400 {
1403 let message = BlobMessage { id: [42u8; 32].into(), offset: 0 };
1404 message
1405 .encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1406 }
1407
1408 {
1410 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1411 let message = BlobMessage { id: blob.root(), offset: 0 };
1412 message.encode_to_impl(
1413 (&mut buff[size_of::<BlobMessage>()..(size_of::<BlobMessage>() * 2)])
1414 .try_into()
1415 .unwrap(),
1416 );
1417 }
1418 }
1419 let device = fixture.close().await;
1420 device.ensure_unique();
1421
1422 device.reopen(false);
1423 let fixture = open_blob_fixture(device).await;
1424 {
1425 let mut state = new_profile_state(true);
1426 let volume = fixture.volume().volume();
1427
1428 let replay_handle = Box::new(FakeReaderWriter::new());
1429 replay_handle.inner.lock().data = buff;
1430
1431 state.replay_profile(
1432 replay_handle,
1433 volume.clone(),
1434 volume.scope().try_active_guard().unwrap(),
1435 );
1436
1437 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1439 while blob.vmo().info().unwrap().committed_bytes == 0 {
1440 fasync::Timer::new(Duration::from_millis(25)).await;
1441 }
1442 }
1443 fixture.close().await;
1444 }
1445
1446 #[fuchsia::test]
1447 async fn test_replay_file_missing_or_tombstoned() {
1448 let fixture = TestFixture::new().await;
1449 let mut buff = vec![0u8; IO_SIZE];
1450 let remaining_file_id;
1453 let tombstoned_file_id;
1454 {
1456 let id = write_file(&fixture, "foo", &[1, 2, 3, 4]).await;
1457 let message = FileMessage { id, offset: 0 };
1458 message.encode_to_impl((&mut buff[0..size_of::<FileMessage>()]).try_into().unwrap());
1459 }
1460 fixture
1462 .root()
1463 .unlink("foo", &fio::UnlinkOptions::default())
1464 .await
1465 .unwrap()
1466 .expect("Unlinking");
1467
1468 {
1470 tombstoned_file_id = write_file(&fixture, "bar", &[1, 2, 3, 4]).await;
1471 let message = FileMessage { id: tombstoned_file_id, offset: 0 };
1472 message.encode_to_impl(
1473 (&mut buff[size_of::<FileMessage>()..(size_of::<FileMessage>() * 2)])
1474 .try_into()
1475 .unwrap(),
1476 );
1477 }
1478
1479 {
1481 remaining_file_id = write_file(&fixture, "baz", &[1, 2, 3, 4]).await;
1482 let message = FileMessage { id: remaining_file_id, offset: 0 };
1483 message.encode_to_impl(
1484 (&mut buff[(size_of::<FileMessage>() * 2)..(size_of::<FileMessage>() * 3)])
1485 .try_into()
1486 .unwrap(),
1487 );
1488 }
1489 let device = fixture.close().await;
1490 device.ensure_unique();
1491
1492 device.reopen(false);
1493 let fixture =
1494 TestFixture::open(device, TestFixtureOptions { format: false, ..Default::default() })
1495 .await;
1496 {
1497 let tombstoned_file = fixture
1500 .volume()
1501 .volume()
1502 .get_or_load_node(tombstoned_file_id, ObjectDescriptor::File, None)
1503 .await
1504 .expect("Opening file object")
1505 .into_any()
1506 .downcast::<FxFile>()
1507 .unwrap();
1508 fixture
1509 .root()
1510 .unlink("bar", &fio::UnlinkOptions::default())
1511 .await
1512 .unwrap()
1513 .expect("Unlinking");
1514
1515 let mut state = new_profile_state(false);
1516 let volume = fixture.volume().volume();
1517
1518 let replay_handle = Box::new(FakeReaderWriter::new());
1519 replay_handle.inner.lock().data = buff;
1520
1521 state.replay_profile(
1522 replay_handle,
1523 volume.clone(),
1524 volume.scope().try_active_guard().unwrap(),
1525 );
1526
1527 let remaining_file = fixture
1529 .volume()
1530 .volume()
1531 .get_or_load_node(remaining_file_id, ObjectDescriptor::File, None)
1532 .await
1533 .expect("Opening file object")
1534 .into_any()
1535 .downcast::<FxFile>()
1536 .unwrap();
1537 while remaining_file.vmo().info().unwrap().committed_bytes == 0 {
1538 fasync::Timer::new(Duration::from_millis(25)).await;
1539 }
1540
1541 assert_eq!(tombstoned_file.vmo().info().unwrap().committed_bytes, 0);
1544 }
1545 fixture.close().await;
1546 }
1547}