1use crate::fuchsia::file::FxFile;
6use crate::fuchsia::fxblob::BlobDirectory;
7use crate::fuchsia::fxblob::blob::FxBlob;
8use crate::fuchsia::node::{FxNode, OpenedNode};
9use crate::fuchsia::pager::PagerBacked;
10use crate::fuchsia::volume::FxVolume;
11use anyhow::{Context as _, Error, anyhow, ensure};
12use arrayref::{array_refs, mut_array_refs};
13use async_trait::async_trait;
14use fuchsia_async as fasync;
15use fuchsia_hash::Hash;
16use futures::future::{self, BoxFuture, RemoteHandle, join_all};
17use futures::lock::Mutex;
18use futures::{FutureExt, select};
19use fxfs::errors::FxfsError;
20use fxfs::log::*;
21use fxfs::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle};
22use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
23use fxfs::object_store::{
24 HandleOptions, ObjectDescriptor, ObjectStore, Timestamp, VOLUME_DATA_KEY_ID, directory,
25};
26use linked_hash_map::LinkedHashMap;
27use scopeguard::ScopeGuard;
28use std::cmp::{Eq, PartialEq};
29use std::collections::btree_map::{BTreeMap, Entry};
30use std::marker::PhantomData;
31use std::mem::size_of;
32use std::pin::pin;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicU64, Ordering};
35use vfs::execution_scope::ActiveGuard;
36
37const FILE_OPEN_MARKER: u64 = u64::MAX;
38const REPLAY_THREADS: usize = 2;
39const MESSAGE_CHUNK_SIZE: usize = 64;
42const IO_SIZE: usize = 1 << 17; pub static RECORDED: AtomicU64 = AtomicU64::new(0);
45
46trait RecordedVolume: Send + Sync + Sized + Unpin {
47 type IdType: std::fmt::Display + Ord + Send + Sized;
48 type NodeType: PagerBacked;
49 type MessageType: Message<IdType = Self::IdType>;
50
51 fn new(volume: Arc<FxVolume>) -> Self;
52
53 fn volume(&self) -> &Arc<FxVolume>;
54
55 fn open(
56 &self,
57 id: Self::IdType,
58 ) -> impl std::future::Future<Output = Result<OpenedNode<Self::NodeType>, Error>> + Send;
59
60 fn file_is_replayable(
62 &self,
63 id: &Self::IdType,
64 ) -> impl std::future::Future<Output = bool> + Send;
65
66 fn read_and_queue(
67 &self,
68 handle: Box<dyn ReadObjectHandle>,
69 sender: &async_channel::Sender<Request<Self::NodeType>>,
70 local_cache: &mut BTreeMap<Self::IdType, Option<OpenedNode<Self::NodeType>>>,
71 ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
72 async move {
73 let mut io_buf = handle.allocate_buffer(IO_SIZE).await;
74 let block_size = handle.block_size() as usize;
75 let file_size = handle.get_size() as usize;
76 let mut offset = 0;
77 while offset < file_size {
78 let actual = handle
79 .read(offset as u64, io_buf.as_mut())
80 .await
81 .map_err(|e| e.context(format!("Failed to read at offset: {}", offset)))?;
82 offset += actual;
83 let mut local_offset = 0;
84 let mut next_block = block_size;
85 let mut next_offset = size_of::<Self::MessageType>();
86 while next_offset <= actual {
87 let msg = Self::MessageType::decode_from(
88 &io_buf.as_slice()[local_offset..next_offset],
89 );
90
91 local_offset = next_offset;
92 next_offset = local_offset + size_of::<Self::MessageType>();
93 if next_offset > next_block {
95 local_offset = next_block;
96 next_offset = local_offset + size_of::<Self::MessageType>();
97 next_block += block_size;
98 }
99
100 if msg.is_zeroes() {
103 break;
104 }
105
106 let file = match local_cache.entry(msg.id()) {
107 Entry::Occupied(entry) => match entry.get() {
108 Some(opened_file) => (*opened_file).clone(),
109 None => continue,
111 },
112 Entry::Vacant(entry) => match self.open(msg.id()).await {
113 Err(e) => {
114 debug!("Failed to open object {} from profile: {:?}", msg.id(), e);
115 entry.insert(None);
117 continue;
118 }
119 Ok(opened_file) => {
120 let file_clone = (*opened_file).clone();
121 entry.insert(Some(opened_file));
122 file_clone
123 }
124 },
125 };
126
127 sender.send(Request { file, offset: msg.offset() }).await?;
128 }
129 }
130 Ok(())
131 }
132 }
133
134 fn record(
135 &self,
136 name: &str,
137 receiver: async_channel::Receiver<Vec<Self::MessageType>>,
138 ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
139 async move {
140 let mut recorded_offsets = LinkedHashMap::<Self::MessageType, ()>::new();
141 let mut recorded_opens = BTreeMap::<Self::IdType, bool>::new();
142 while let Ok(buffer) = receiver.recv().await {
143 for message in buffer {
144 if message.is_open_marker() {
145 if let Entry::Vacant(entry) = recorded_opens.entry(message.id()) {
146 let usable = self.file_is_replayable(entry.key()).await;
147 entry.insert(usable);
148 }
149 } else {
150 recorded_offsets.insert(message, ());
151 }
152 }
153 }
154
155 let store = self.volume().store();
158 let fs = store.filesystem();
159 let mut transaction =
160 fs.clone().new_transaction(lock_keys![], Options::default()).await?;
161 let handle = ObjectStore::create_object(
162 self.volume(),
163 &mut transaction,
164 HandleOptions::default(),
165 None,
166 )
167 .await?;
168 store.add_to_graveyard(&mut transaction, handle.object_id());
169 transaction.commit().await?;
170
171 let clean_up = scopeguard::guard((), |_| {
172 fs.graveyard().queue_tombstone_object(store.store_object_id(), handle.object_id())
173 });
174
175 let block_size = handle.block_size() as usize;
176 let mut offset = 0;
177 let mut io_buf = handle.allocate_buffer(IO_SIZE).await;
178 let mut next_block = block_size;
179 while let Some((message, _)) = recorded_offsets.pop_front() {
180 if !recorded_opens.get(&message.id()).copied().unwrap_or(false) {
182 continue;
183 }
184
185 let mut next_offset = offset + size_of::<Self::MessageType>();
186 if next_offset > next_block {
187 io_buf.as_mut_slice()[offset..next_block].fill(0);
190 if next_block >= IO_SIZE {
191 handle
193 .write_or_append(None, io_buf.as_ref())
194 .await
195 .context("Failed to write profile block")?;
196 offset = 0;
197 next_offset = size_of::<Self::MessageType>();
198 next_block = block_size;
199 } else {
200 offset = next_block;
201 next_offset = offset + size_of::<Self::MessageType>();
202 next_block += block_size;
203 }
204 }
205 message.encode_to(&mut io_buf.as_mut_slice()[offset..next_offset]);
206 offset = next_offset;
207 }
208 if offset > 0 {
209 io_buf.as_mut_slice()[offset..next_block].fill(0);
210 handle
211 .write_or_append(None, io_buf.subslice(0..next_block))
212 .await
213 .context("Failed to write profile block")?;
214 }
215
216 let profile_dir = self.volume().get_profile_directory().await?;
217
218 let mut lock_keys =
219 lock_keys![LockKey::object(store.store_object_id(), profile_dir.object_id())];
220 let mut old_id = INVALID_OBJECT_ID;
221 let mut transaction = loop {
222 let transaction = fs.clone().new_transaction(lock_keys, Options::default()).await?;
223 if let Some((id, descriptor, _)) = profile_dir.lookup(name).await? {
224 ensure!(matches!(descriptor, ObjectDescriptor::File), FxfsError::Inconsistent);
225 if id == old_id {
226 break transaction;
227 }
228 lock_keys = lock_keys![
229 LockKey::object(store.store_object_id(), profile_dir.object_id()),
230 LockKey::object(store.store_object_id(), id)
231 ];
232 old_id = id;
233 } else {
234 old_id = INVALID_OBJECT_ID;
235 break transaction;
236 }
237 };
238
239 store.remove_from_graveyard(&mut transaction, handle.object_id());
240 directory::replace_child_with_object(
241 &mut transaction,
242 Some((handle.object_id(), ObjectDescriptor::File)),
243 (&profile_dir, name),
244 0,
245 Timestamp::now(),
246 )
247 .await?;
248 transaction.commit().await?;
249
250 ScopeGuard::into_inner(clean_up);
251 if old_id != INVALID_OBJECT_ID {
252 fs.graveyard().queue_tombstone_object(store.store_object_id(), old_id);
253 }
254
255 Ok(())
256 }
257 }
258}
259
260struct BlobVolume {
261 volume: Arc<FxVolume>,
262 root_dir: Mutex<Option<Arc<BlobDirectory>>>,
265}
266
267impl RecordedVolume for BlobVolume {
268 type IdType = Hash;
269 type NodeType = FxBlob;
270 type MessageType = BlobMessage;
271
272 fn new(volume: Arc<FxVolume>) -> Self {
273 Self { volume, root_dir: Mutex::new(None) }
274 }
275
276 fn volume(&self) -> &Arc<FxVolume> {
277 &self.volume
278 }
279
280 async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
281 let mut root_dir = self.root_dir.lock().await;
282 if root_dir.is_none() {
283 *root_dir = Some(
284 self.volume
285 .get_or_load_node(
286 self.volume.store().root_directory_object_id(),
287 ObjectDescriptor::Directory,
288 None,
289 )
290 .await?
291 .into_any()
292 .downcast::<BlobDirectory>()
293 .map_err(|_| FxfsError::Inconsistent)?,
294 );
295 };
296 root_dir
297 .as_ref()
298 .unwrap()
299 .open_blob(&id.into())
300 .await?
301 .ok_or_else(|| FxfsError::NotFound.into())
302 }
303
304 async fn file_is_replayable(&self, _id: &Self::IdType) -> bool {
305 true
307 }
308}
309
310struct FileVolume {
311 volume: Arc<FxVolume>,
312}
313
314impl RecordedVolume for FileVolume {
315 type IdType = u64;
316 type NodeType = FxFile;
317 type MessageType = FileMessage;
318
319 fn new(volume: Arc<FxVolume>) -> Self {
320 Self { volume }
321 }
322
323 fn volume(&self) -> &Arc<FxVolume> {
324 &self.volume
325 }
326
327 async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
328 self.volume
329 .get_or_load_node(id, ObjectDescriptor::File, None)
330 .await?
331 .into_any()
332 .downcast::<FxFile>()
333 .map_err(|_| anyhow!("Non-file opened"))?
334 .into_opened_node()
335 .ok_or_else(|| anyhow!("File being purged"))
336 }
337
338 async fn file_is_replayable(&self, id: &Self::IdType) -> bool {
339 match self.volume.store().get_keys(*id).await {
340 Ok(keys)
343 if keys.is_empty()
344 || (keys.len() == 1 && keys.first().unwrap().0 == VOLUME_DATA_KEY_ID) =>
345 {
346 true
347 }
348 _ => false,
349 }
350 }
351}
352
353trait Message: Eq + PartialEq + Sized + Send + Sync + std::hash::Hash + 'static {
354 type IdType: std::fmt::Display + Ord + Send + Sized;
355
356 fn id(&self) -> Self::IdType;
357 fn offset(&self) -> u64;
358 fn encode_to(&self, dest: &mut [u8]);
359 fn decode_from(src: &[u8]) -> Self;
360 fn is_zeroes(&self) -> bool;
361 fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error>;
362 fn is_open_marker(&self) -> bool;
363}
364
365#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
366struct BlobMessage {
367 id: Hash,
368 offset: u64,
371}
372
373impl BlobMessage {
374 fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
375 let (first, second) = mut_array_refs![dest, size_of::<Hash>(), size_of::<u64>()];
376 *first = self.id.into();
377 *second = self.offset.to_le_bytes();
378 }
379
380 fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
381 let (first, second) = array_refs!(src, size_of::<Hash>(), size_of::<u64>());
382 Self { id: Hash::from_array(*first), offset: u64::from_le_bytes(*second) }
383 }
384}
385
386impl Message for BlobMessage {
387 type IdType = Hash;
388
389 fn id(&self) -> Self::IdType {
390 self.id
391 }
392
393 fn offset(&self) -> u64 {
394 self.offset
395 }
396
397 fn encode_to(&self, dest: &mut [u8]) {
398 self.encode_to_impl(dest.try_into().unwrap());
399 }
400
401 fn decode_from(src: &[u8]) -> Self {
402 Self::decode_from_impl(src.try_into().unwrap())
403 }
404
405 fn is_zeroes(&self) -> bool {
406 self.id == Hash::from_array([0u8; size_of::<Hash>()]) && self.offset == 0
407 }
408
409 fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
410 match node.into_any().downcast::<FxBlob>() {
411 Ok(blob) => Ok(Self { id: blob.root(), offset }),
412 Err(_) => Err(anyhow!("Cannot record non-blob entry.")),
413 }
414 }
415
416 fn is_open_marker(&self) -> bool {
417 self.offset == FILE_OPEN_MARKER
418 }
419}
420
421#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
422struct FileMessage {
423 id: u64,
424 offset: u64,
427}
428
429impl FileMessage {
430 fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
431 let (first, second) = mut_array_refs![dest, size_of::<u64>(), size_of::<u64>()];
432 *first = self.id.to_le_bytes();
433 *second = self.offset.to_le_bytes();
434 }
435
436 fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
437 let (first, second) = array_refs!(src, size_of::<u64>(), size_of::<u64>());
438 Self { id: u64::from_le_bytes(*first), offset: u64::from_le_bytes(*second) }
439 }
440}
441
442impl Message for FileMessage {
443 type IdType = u64;
444
445 fn id(&self) -> Self::IdType {
446 self.id
447 }
448
449 fn offset(&self) -> u64 {
450 self.offset
451 }
452
453 fn encode_to(&self, dest: &mut [u8]) {
454 self.encode_to_impl(dest.try_into().unwrap())
455 }
456
457 fn decode_from(src: &[u8]) -> Self {
458 Self::decode_from_impl(src.try_into().unwrap())
459 }
460
461 fn is_zeroes(&self) -> bool {
462 self.id == 0 && self.offset == 0
463 }
464
465 fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
466 match node.into_any().downcast::<FxFile>() {
467 Ok(file) => Ok(Self { id: file.object_id(), offset }),
468 Err(_) => Err(anyhow!("Cannot record non-file entry")),
469 }
470 }
471
472 fn is_open_marker(&self) -> bool {
473 self.offset == FILE_OPEN_MARKER
474 }
475}
476
477pub trait Recorder: Send + Sync {
480 fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error>;
482
483 fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error>;
485}
486
487struct RecorderImpl<T: Message> {
488 sender: async_channel::Sender<Vec<T>>,
489 buffer: Vec<T>,
490}
491
492impl<T: Message> RecorderImpl<T> {
493 fn new(sender: async_channel::Sender<Vec<T>>) -> Self {
494 Self { sender, buffer: Vec::with_capacity(MESSAGE_CHUNK_SIZE) }
495 }
496}
497
498impl<T: Message> Recorder for RecorderImpl<T> {
499 fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error> {
500 self.buffer.push(T::from_node_request(node, offset)?);
501 if self.buffer.len() >= MESSAGE_CHUNK_SIZE {
502 self.sender.try_send(std::mem::replace(
505 &mut self.buffer,
506 Vec::with_capacity(MESSAGE_CHUNK_SIZE),
507 ))?;
508 }
509 RECORDED.fetch_add(1, Ordering::Relaxed);
510 Ok(())
511 }
512
513 fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error> {
514 self.record(node, FILE_OPEN_MARKER)
515 }
516}
517
518impl<T: Message> Drop for RecorderImpl<T> {
519 fn drop(&mut self) {
520 if self.buffer.len() > 0 {
522 let buffer = std::mem::take(&mut self.buffer);
523 let _ = self.sender.try_send(buffer);
524 }
525 }
526}
527
528struct Request<P: PagerBacked> {
529 file: Arc<P>,
530 offset: u64,
531}
532
533struct ReplayState<T> {
534 replay_threads: future::Shared<BoxFuture<'static, ()>>,
535 _cache_task: fasync::Task<()>,
536 _phantom: PhantomData<T>,
537}
538
539impl<T: RecordedVolume> ReplayState<T> {
540 fn new(handle: Box<dyn ReadObjectHandle>, volume: Arc<FxVolume>, guard: ActiveGuard) -> Self {
541 let (sender, receiver) = async_channel::unbounded::<Request<T::NodeType>>();
542
543 let mut replay_threads = Vec::with_capacity(REPLAY_THREADS);
546 for _ in 0..REPLAY_THREADS {
547 let receiver = receiver.clone();
548 let guard = guard.clone();
551 replay_threads.push(fasync::unblock(move || {
552 let _guard = guard;
553 Self::page_in_thread(receiver);
554 }));
555 }
556 let replay_threads = (Box::pin(async {
557 join_all(replay_threads).await;
558 }) as BoxFuture<'static, ()>)
559 .shared();
560
561 let scope = volume.scope().clone();
562 let cache_task = scope
563 .spawn({
564 async move {
568 let mut task = pin!(
569 async {
570 let mut local_cache: BTreeMap<
574 T::IdType,
575 Option<OpenedNode<T::NodeType>>,
576 > = BTreeMap::new();
577
578 let volume_id = volume.id();
579
580 if let Err(error) = T::new(volume)
581 .read_and_queue(handle, &sender, &mut local_cache)
582 .await
583 {
584 error!(error:?; "Failed to read back profile");
585 }
586 sender.close();
587
588 info!(
589 "Replay for volume {} opened {} of {} objects.",
590 volume_id,
591 local_cache.iter().filter(|(_, e)| e.is_some()).count(),
592 local_cache.len()
593 );
594
595 let () = std::future::pending().await;
597 }
598 .fuse()
599 );
600
601 select! {
602 _ = task => {}
603 _ = guard.on_cancel().fuse() => {}
604 }
605 }
606 })
607 .into();
608
609 Self { replay_threads, _cache_task: cache_task, _phantom: PhantomData }
610 }
611
612 fn page_in_thread(queue: async_channel::Receiver<Request<T::NodeType>>) {
613 while let Ok(request) = queue.recv_blocking() {
614 let res = request.file.vmo().op_range(
615 zx::VmoOp::PREFETCH,
616 request.offset,
617 zx::system_get_page_size() as u64,
618 );
619 if let Err(e) = res {
620 warn!("Failed to prefetch page: {:?}", e);
621 }
622 if queue.sender_count() == 0 {
624 return;
625 }
626 }
627 }
628}
629
630#[async_trait]
633pub trait ProfileState: Send + Sync {
634 fn record_new(&mut self, volume: &Arc<FxVolume>, name: &str) -> Box<dyn Recorder>;
638
639 fn replay_profile(
642 &mut self,
643 handle: Box<dyn ReadObjectHandle>,
644 volume: Arc<FxVolume>,
645 guard: ActiveGuard,
646 );
647
648 async fn wait_for_replay_to_finish(&mut self);
651
652 async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error>;
654}
655
656pub fn new_profile_state(is_blob: bool) -> Box<dyn ProfileState> {
657 if is_blob {
658 Box::new(ProfileStateImpl::<BlobVolume>::new())
659 } else {
660 Box::new(ProfileStateImpl::<FileVolume>::new())
661 }
662}
663
664struct ProfileStateImpl<T> {
665 recording: Option<RemoteHandle<Result<(), Error>>>,
666 replay: Option<ReplayState<T>>,
667}
668
669impl<T> ProfileStateImpl<T> {
670 fn new() -> Self {
671 Self { recording: None, replay: None }
672 }
673}
674
675#[async_trait]
676impl<T: RecordedVolume> ProfileState for ProfileStateImpl<T> {
677 fn record_new(&mut self, volume: &Arc<FxVolume>, name: &str) -> Box<dyn Recorder> {
678 let (sender, receiver) = async_channel::unbounded();
679 let volume = volume.clone();
680 let name = name.to_string();
681 self.recording = None;
683 let scope = volume.scope().clone();
684 let (task, remote_handle) = async move {
685 let recording = T::new(volume);
686 recording
687 .record(&name, receiver)
688 .await
689 .inspect_err(|error| warn!(error:?; "Profile recording '{name}' failed"))
690 }
691 .remote_handle();
692 self.recording = Some(remote_handle);
693 scope.spawn(task);
694 Box::new(RecorderImpl::new(sender))
695 }
696
697 fn replay_profile(
698 &mut self,
699 handle: Box<dyn ReadObjectHandle>,
700 volume: Arc<FxVolume>,
701 guard: ActiveGuard,
702 ) {
703 self.replay = Some(ReplayState::new(handle, volume, guard));
704 }
705
706 async fn wait_for_replay_to_finish(&mut self) {
707 if let Some(replay) = &mut self.replay {
708 replay.replay_threads.clone().await;
709 }
710 }
711
712 async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error> {
713 if let Some(recording) = self.recording.take() { recording.await } else { Ok(()) }
714 }
715}
716
717#[cfg(test)]
718mod tests {
719 use super::{
720 BlobMessage, BlobVolume, FileMessage, FileVolume, IO_SIZE, Message, RecordedVolume,
721 Request, new_profile_state,
722 };
723 use crate::fuchsia::file::FxFile;
724 use crate::fuchsia::fxblob::blob::FxBlob;
725 use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture, open_blob_fixture};
726 use crate::fuchsia::node::{FxNode, OpenedNode};
727 use crate::fuchsia::pager::PagerBacked;
728 use crate::fuchsia::testing::{TestFixture, TestFixtureOptions, open_file_checked};
729 use crate::fuchsia::volume::FxVolume;
730 use anyhow::Error;
731 use async_trait::async_trait;
732 use delivery_blob::CompressionMode;
733 use event_listener::{Event, EventListener};
734 use fuchsia_hash::Hash;
735 use fuchsia_sync::Mutex;
736 use fxfs::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
737 use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
738 use fxfs::object_store::{DataObjectHandle, HandleOptions, ObjectDescriptor, ObjectStore};
739 use std::collections::BTreeMap;
740 use std::mem::size_of;
741 use std::sync::Arc;
742 use std::time::Duration;
743 use storage_device::buffer::{BufferRef, MutableBufferRef};
744 use storage_device::buffer_allocator::{BufferAllocator, BufferFuture, BufferSource};
745 use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
746
747 struct FakeReaderWriterInner {
748 data: Vec<u8>,
749 delays: Vec<EventListener>,
750 }
751
752 struct FakeReaderWriter {
753 allocator: BufferAllocator,
754 inner: Arc<Mutex<FakeReaderWriterInner>>,
755 }
756
757 const BLOCK_SIZE: usize = 4096;
758
759 impl FakeReaderWriter {
760 fn new() -> Self {
761 Self {
762 allocator: BufferAllocator::new(BLOCK_SIZE, BufferSource::new(IO_SIZE * 2)),
763 inner: Arc::new(Mutex::new(FakeReaderWriterInner {
764 data: Vec::new(),
765 delays: Vec::new(),
766 })),
767 }
768 }
769
770 fn push_delay(&self, delay: EventListener) {
771 self.inner.lock().delays.insert(0, delay);
772 }
773 }
774
775 impl ObjectHandle for FakeReaderWriter {
776 fn object_id(&self) -> u64 {
777 0
778 }
779
780 fn block_size(&self) -> u64 {
781 self.allocator.block_size() as u64
782 }
783
784 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
785 self.allocator.allocate_buffer(size)
786 }
787 }
788
789 impl WriteObjectHandle for FakeReaderWriter {
790 async fn write_or_append(
791 &self,
792 offset: Option<u64>,
793 buf: BufferRef<'_>,
794 ) -> Result<u64, Error> {
795 assert!(offset.is_none());
797 let delay = self.inner.lock().delays.pop();
798 if let Some(delay) = delay {
799 delay.await;
800 }
801 self.inner.lock().data.extend_from_slice(buf.as_slice());
803 Ok(buf.len() as u64)
804 }
805
806 async fn truncate(&self, _size: u64) -> Result<(), Error> {
807 unreachable!();
808 }
809
810 async fn flush(&self) -> Result<(), Error> {
811 unreachable!();
812 }
813 }
814
815 async fn write_file(fixture: &TestFixture, name: &str, data: &[u8]) -> u64 {
816 let root_dir = fixture.volume().root_dir();
817 let mut transaction = fixture
818 .volume()
819 .volume()
820 .store()
821 .filesystem()
822 .new_transaction(
823 lock_keys![LockKey::object(
824 fixture.volume().volume().store().store_object_id(),
825 root_dir.object_id()
826 )],
827 Options::default(),
828 )
829 .await
830 .expect("Creating transaction for new file");
831 let id = root_dir
832 .directory()
833 .create_child_file(&mut transaction, name)
834 .await
835 .expect("Creating new_file")
836 .object_id();
837 transaction.commit().await.unwrap();
838 let file = open_file_checked(
839 fixture.root(),
840 name,
841 fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_FILE,
842 &Default::default(),
843 )
844 .await;
845 file.write(data).await.unwrap().expect("Writing file");
846 id
847 }
848
849 #[async_trait]
850 impl ReadObjectHandle for FakeReaderWriter {
851 async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
852 let delay = self.inner.lock().delays.pop();
853 if let Some(delay) = delay {
854 delay.await;
855 }
856 let inner = self.inner.lock();
858 assert!(offset as usize <= inner.data.len());
859 let offset_end = std::cmp::min(offset as usize + buf.len(), inner.data.len());
860 let size = offset_end - offset as usize;
861 buf.as_mut_slice()[..size].clone_from_slice(&inner.data[offset as usize..offset_end]);
862 Ok(size)
863 }
864
865 fn get_size(&self) -> u64 {
866 self.inner.lock().data.len() as u64
867 }
868 }
869
870 #[fuchsia::test]
871 async fn test_encode_decode_blob() {
872 let mut buf = [0u8; size_of::<BlobMessage>()];
873 let m = BlobMessage { id: [88u8; 32].into(), offset: 77 };
874 m.encode_to(&mut buf.as_mut_slice());
875 let m2 = BlobMessage::decode_from(&buf);
876 assert_eq!(m, m2);
877 }
878
879 #[fuchsia::test]
880 async fn test_encode_decode_file() {
881 let mut buf = [0u8; size_of::<FileMessage>()];
882 let m = FileMessage { id: 88, offset: 77 };
883 m.encode_to(&mut buf.as_mut_slice());
884 let m2 = FileMessage::decode_from(&buf);
885 assert!(!m2.is_zeroes());
886 assert_eq!(m, m2);
887 }
888
889 const TEST_PROFILE_NAME: &str = "test_profile";
890
891 async fn get_test_profile_handle(volume: &Arc<FxVolume>) -> DataObjectHandle<FxVolume> {
892 let profile_dir = volume.get_profile_directory().await.unwrap();
893 ObjectStore::open_object(
894 volume,
895 profile_dir
896 .lookup(TEST_PROFILE_NAME)
897 .await
898 .expect("lookup failed")
899 .expect("not found")
900 .0,
901 HandleOptions::default(),
902 None,
903 )
904 .await
905 .unwrap()
906 }
907
908 async fn get_test_profile_contents(volume: &Arc<FxVolume>) -> Vec<u8> {
909 get_test_profile_handle(volume).await.contents(1024 * 1024).await.unwrap().to_vec()
910 }
911
912 #[fuchsia::test]
913 async fn test_recording_basic_blob() {
914 let fixture = new_blob_fixture().await;
915 {
916 let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
917 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
918
919 let mut state = new_profile_state(true);
920 let volume = fixture.volume().volume();
921
922 {
923 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
925 recorder.record(blob.clone(), 0).unwrap();
926 recorder.record_open(blob).unwrap();
927 }
928
929 state.wait_for_recording_to_finish().await.unwrap();
930
931 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
932 }
933 fixture.close().await;
934 }
935
936 #[fuchsia::test]
937 async fn test_recording_basic_file() {
938 let fixture = TestFixture::new().await;
939 {
940 let id = write_file(&fixture, "foo", &[88u8]).await;
941 let node = fixture
942 .volume()
943 .volume()
944 .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
945 .await
946 .unwrap();
947
948 let mut state = new_profile_state(false);
949 let volume = fixture.volume().volume();
950
951 {
952 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
954 recorder.record(node.clone(), 0).unwrap();
955 recorder.record_open(node).unwrap();
956 }
957 state.wait_for_recording_to_finish().await.unwrap();
958
959 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
960 }
961 fixture.close().await;
962 }
963
964 #[fuchsia::test]
965 async fn test_recording_filtered_without_open() {
966 let fixture = new_blob_fixture().await;
967 {
968 let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
969 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
970
971 let mut state = new_profile_state(true);
972 let volume = fixture.volume().volume();
973
974 {
975 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
977 recorder.record(blob.clone(), 0).unwrap();
978 }
979 state.wait_for_recording_to_finish().await.unwrap();
980
981 assert_eq!(get_test_profile_contents(volume).await.len(), 0);
982 }
983 fixture.close().await;
984 }
985
986 #[fuchsia::test]
987 async fn test_recording_blob_more_than_block() {
988 let mut state = new_profile_state(true);
989
990 let fixture = new_blob_fixture().await;
991 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
992 let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
993 let hash;
994 let volume = fixture.volume().volume();
995
996 {
997 hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
998 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
999 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1001 recorder.record_open(blob.clone()).unwrap();
1002 for i in 0..message_count {
1003 recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1004 }
1005 }
1006 state.wait_for_recording_to_finish().await.unwrap();
1007
1008 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1009
1010 let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1011 let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1012
1013 let volume = fixture.volume().volume().clone();
1014 let task = fasync::Task::spawn(async move {
1015 let handle = Box::new(get_test_profile_handle(&volume).await);
1016 let blob = BlobVolume::new(volume);
1017 blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1018 });
1019
1020 let mut recv_count = 0;
1021 while let Ok(msg) = receiver.recv().await {
1022 assert_eq!(msg.file.root(), hash);
1023 assert_eq!(msg.offset, 4096 * recv_count);
1024 recv_count += 1;
1025 }
1026 task.await;
1027 assert_eq!(recv_count, message_count as u64);
1028
1029 fixture.close().await;
1030 }
1031
1032 #[fuchsia::test]
1033 async fn test_recording_file_more_than_block() {
1034 let mut state = new_profile_state(false);
1035
1036 let fixture = TestFixture::new().await;
1037 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1038 let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1039 let id;
1040 let volume = fixture.volume().volume();
1041 {
1042 id = write_file(&fixture, "foo", &[88u8]).await;
1043 let node = volume
1044 .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1045 .await
1046 .unwrap();
1047 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1049 recorder.record_open(node.clone()).unwrap();
1050 for i in 0..message_count {
1051 recorder.record(node.clone(), 4096 * i as u64).unwrap();
1052 }
1053 }
1054 state.wait_for_recording_to_finish().await.unwrap();
1055
1056 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1057
1058 let mut local_cache: BTreeMap<u64, Option<OpenedNode<FxFile>>> = BTreeMap::new();
1059 let (sender, receiver) = async_channel::unbounded::<Request<FxFile>>();
1060
1061 let volume = fixture.volume().volume().clone();
1062 let task = fasync::Task::spawn(async move {
1063 let handle = Box::new(get_test_profile_handle(&volume).await);
1064 let file = FileVolume::new(volume);
1065 file.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1066 });
1067
1068 let mut recv_count = 0;
1069 while let Ok(msg) = receiver.recv().await {
1070 assert_eq!(msg.file.object_id(), id);
1071 assert_eq!(msg.offset, 4096 * recv_count);
1072 recv_count += 1;
1073 }
1074 task.await;
1075 assert_eq!(recv_count, message_count as u64);
1076
1077 fixture.close().await;
1078 }
1079
1080 #[fuchsia::test]
1081 async fn test_recording_more_than_io_size() {
1082 let fixture = new_blob_fixture().await;
1083
1084 {
1085 let mut state = new_profile_state(true);
1086 let message_count = (IO_SIZE as usize / size_of::<BlobMessage>()) + 1;
1087 let hash;
1088 let volume = fixture.volume().volume();
1089 {
1090 hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
1091 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1092 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1094 recorder.record_open(blob.clone()).unwrap();
1095 for i in 0..message_count {
1096 recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1097 }
1098 }
1099 state.wait_for_recording_to_finish().await.unwrap();
1100 assert_eq!(get_test_profile_contents(volume).await.len(), IO_SIZE + BLOCK_SIZE);
1101
1102 let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1103 let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1104
1105 let volume = volume.clone();
1106 let task = fasync::Task::spawn(async move {
1107 let handle = Box::new(get_test_profile_handle(&volume).await);
1108 let blob = BlobVolume::new(volume);
1109 blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1110 });
1111
1112 let mut recv_count = 0;
1113 while let Ok(msg) = receiver.recv().await {
1114 assert_eq!(msg.file.root(), hash);
1115 assert_eq!(msg.offset, 4096 * recv_count);
1116 recv_count += 1;
1117 }
1118 task.await;
1119 assert_eq!(recv_count, message_count as u64);
1120 }
1121
1122 fixture.close().await;
1123 }
1124
1125 #[fuchsia::test]
1126 async fn test_replay_profile_blob() {
1127 let mut state = new_profile_state(true);
1129
1130 let mut hashes = Vec::new();
1131
1132 let fixture = new_blob_fixture().await;
1133 {
1134 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1135 let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
1136
1137 let volume = fixture.volume().volume();
1138 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1139 for i in 0..message_count {
1141 let hash =
1142 fixture.write_blob(i.to_le_bytes().as_slice(), CompressionMode::Never).await;
1143 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1144 recorder.record_open(blob.clone()).unwrap();
1145 hashes.push(hash);
1146 recorder.record(blob.clone(), 0).unwrap();
1147 }
1148 };
1149 let device = fixture.close().await;
1150 device.ensure_unique();
1151 state.wait_for_recording_to_finish().await.unwrap();
1152
1153 device.reopen(false);
1154 let fixture = open_blob_fixture(device).await;
1155 {
1156 for hash in &hashes {
1159 let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1160 assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1161 }
1162
1163 let volume = fixture.volume().volume();
1164 state.replay_profile(
1165 Box::new(get_test_profile_handle(volume).await),
1166 volume.clone(),
1167 volume.scope().try_active_guard().unwrap(),
1168 );
1169
1170 for hash in &hashes {
1172 let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1173 while blob.vmo().info().unwrap().committed_bytes == 0 {
1174 fasync::Timer::new(Duration::from_millis(25)).await;
1175 }
1176 }
1177 }
1178 fixture.close().await;
1179 }
1180
1181 #[fuchsia::test]
1182 async fn test_replay_profile_file() {
1183 let mut state = new_profile_state(false);
1185
1186 let mut ids = Vec::new();
1187
1188 let fixture = TestFixture::new().await;
1189 {
1190 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1191 let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1192
1193 let volume = fixture.volume().volume();
1194 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1195 for i in 0..message_count {
1197 let id = write_file(&fixture, &i.to_string(), &[88u8]).await;
1198 let node = fixture
1199 .volume()
1200 .volume()
1201 .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1202 .await
1203 .unwrap();
1204 recorder.record_open(node.clone()).unwrap();
1205 ids.push(id);
1206 recorder.record(node.clone(), 0).unwrap();
1207 }
1208 };
1209 let device = fixture.close().await;
1210 device.ensure_unique();
1211 state.wait_for_recording_to_finish().await.unwrap();
1212
1213 device.reopen(false);
1214 let fixture = TestFixture::open(
1215 device,
1216 TestFixtureOptions { encrypted: true, format: false, ..Default::default() },
1217 )
1218 .await;
1219 {
1220 for id in &ids {
1222 let file = fixture
1223 .volume()
1224 .volume()
1225 .get_or_load_node(
1226 *id,
1227 ObjectDescriptor::File,
1228 Some(fixture.volume().root_dir()),
1229 )
1230 .await
1231 .unwrap()
1232 .into_any()
1233 .downcast::<FxFile>()
1234 .unwrap();
1235 assert_eq!(file.vmo().info().unwrap().committed_bytes, 0);
1236 }
1237
1238 let volume = fixture.volume().volume();
1239 state.replay_profile(
1240 Box::new(get_test_profile_handle(volume).await),
1241 volume.clone(),
1242 volume.scope().try_active_guard().unwrap(),
1243 );
1244
1245 for id in &ids {
1247 let file = fixture
1248 .volume()
1249 .volume()
1250 .get_or_load_node(
1251 *id,
1252 ObjectDescriptor::File,
1253 Some(fixture.volume().root_dir()),
1254 )
1255 .await
1256 .unwrap()
1257 .into_any()
1258 .downcast::<FxFile>()
1259 .unwrap();
1260 while file.vmo().info().unwrap().committed_bytes == 0 {
1261 fasync::Timer::new(Duration::from_millis(25)).await;
1262 }
1263 }
1264 state.wait_for_recording_to_finish().await.unwrap();
1265 }
1266 fixture.close().await;
1267 }
1268
1269 #[fuchsia::test]
1270 async fn test_recording_during_replay() {
1271 let mut state = new_profile_state(true);
1272
1273 let hash;
1274 let first_recording;
1275 let fixture = new_blob_fixture().await;
1276 let volume = fixture.volume().volume();
1277
1278 {
1280 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1281 hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1282 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1283 recorder.record_open(blob.clone()).unwrap();
1284 recorder.record(blob.clone(), 0).unwrap();
1285 }
1286
1287 state.wait_for_recording_to_finish().await.unwrap();
1288 first_recording = get_test_profile_contents(volume).await;
1289 assert_ne!(first_recording.len(), 0);
1290 let device = fixture.close().await;
1291 device.ensure_unique();
1292
1293 device.reopen(false);
1294 let fixture = open_blob_fixture(device).await;
1295
1296 {
1297 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1300 assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1301
1302 let volume = fixture.volume().volume();
1304 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1305 recorder.record(blob.clone(), 4096).unwrap();
1306
1307 let volume = fixture.volume().volume();
1309 state.replay_profile(
1310 Box::new(get_test_profile_handle(volume).await),
1311 volume.clone(),
1312 volume.scope().try_active_guard().unwrap(),
1313 );
1314
1315 {
1317 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1318 while blob.vmo().info().unwrap().committed_bytes == 0 {
1319 fasync::Timer::new(Duration::from_millis(25)).await;
1320 }
1321 }
1322
1323 recorder.record_open(blob.clone()).unwrap();
1326 }
1327
1328 state.wait_for_recording_to_finish().await.unwrap();
1329
1330 let volume = fixture.volume().volume();
1331 let second_recording = get_test_profile_contents(volume).await;
1332 assert_ne!(second_recording.len(), 0);
1333 assert_ne!(&second_recording, &first_recording);
1334
1335 fixture.close().await;
1336 }
1337
1338 #[fuchsia::test]
1341 async fn test_replay_profile_stop_reading_early() {
1342 let mut state = new_profile_state(true);
1343 let fixture = new_blob_fixture().await;
1344
1345 {
1346 let volume = fixture.volume().volume();
1347
1348 let message;
1350 {
1351 let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1352 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1353 message = BlobMessage { id: blob.root(), offset: 0 };
1354 }
1355 state.wait_for_recording_to_finish().await.unwrap();
1356
1357 let replay_handle = Box::new(FakeReaderWriter::new());
1359 let mut buff = vec![0u8; IO_SIZE * 2];
1360 message.encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1361 message.encode_to_impl(
1362 (&mut buff[IO_SIZE..IO_SIZE + size_of::<BlobMessage>()]).try_into().unwrap(),
1363 );
1364
1365 replay_handle.inner.lock().data = buff;
1366 let delay1 = Event::new();
1367 replay_handle.push_delay(delay1.listen());
1368 let delay2 = Event::new();
1369 replay_handle.push_delay(delay2.listen());
1370
1371 state.replay_profile(
1372 replay_handle,
1373 volume.clone(),
1374 volume.scope().try_active_guard().unwrap(),
1375 );
1376
1377 fasync::Task::spawn(async move {
1379 fasync::Timer::new(Duration::from_millis(100)).await;
1381 delay1.notify(usize::MAX);
1382 })
1383 .detach();
1384 }
1385
1386 fixture.close().await;
1389 }
1390
1391 #[fuchsia::test]
1392 async fn test_replay_blob_missing() {
1393 let fixture = new_blob_fixture().await;
1394 let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1397 let mut buff = vec![0u8; IO_SIZE];
1398 {
1399 {
1402 let message = BlobMessage { id: [42u8; 32].into(), offset: 0 };
1403 message
1404 .encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1405 }
1406
1407 {
1409 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1410 let message = BlobMessage { id: blob.root(), offset: 0 };
1411 message.encode_to_impl(
1412 (&mut buff[size_of::<BlobMessage>()..(size_of::<BlobMessage>() * 2)])
1413 .try_into()
1414 .unwrap(),
1415 );
1416 }
1417 }
1418 let device = fixture.close().await;
1419 device.ensure_unique();
1420
1421 device.reopen(false);
1422 let fixture = open_blob_fixture(device).await;
1423 {
1424 let mut state = new_profile_state(true);
1425 let volume = fixture.volume().volume();
1426
1427 let replay_handle = Box::new(FakeReaderWriter::new());
1428 replay_handle.inner.lock().data = buff;
1429
1430 state.replay_profile(
1431 replay_handle,
1432 volume.clone(),
1433 volume.scope().try_active_guard().unwrap(),
1434 );
1435
1436 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1438 while blob.vmo().info().unwrap().committed_bytes == 0 {
1439 fasync::Timer::new(Duration::from_millis(25)).await;
1440 }
1441 }
1442 fixture.close().await;
1443 }
1444
1445 #[fuchsia::test]
1446 async fn test_replay_file_missing_or_tombstoned() {
1447 let fixture = TestFixture::new().await;
1448 let mut buff = vec![0u8; IO_SIZE];
1449 let remaining_file_id;
1452 let tombstoned_file_id;
1453 {
1455 let id = write_file(&fixture, "foo", &[1, 2, 3, 4]).await;
1456 let message = FileMessage { id, offset: 0 };
1457 message.encode_to_impl((&mut buff[0..size_of::<FileMessage>()]).try_into().unwrap());
1458 }
1459 fixture
1461 .root()
1462 .unlink("foo", &fio::UnlinkOptions::default())
1463 .await
1464 .unwrap()
1465 .expect("Unlinking");
1466
1467 {
1469 tombstoned_file_id = write_file(&fixture, "bar", &[1, 2, 3, 4]).await;
1470 let message = FileMessage { id: tombstoned_file_id, offset: 0 };
1471 message.encode_to_impl(
1472 (&mut buff[size_of::<FileMessage>()..(size_of::<FileMessage>() * 2)])
1473 .try_into()
1474 .unwrap(),
1475 );
1476 }
1477
1478 {
1480 remaining_file_id = write_file(&fixture, "baz", &[1, 2, 3, 4]).await;
1481 let message = FileMessage { id: remaining_file_id, offset: 0 };
1482 message.encode_to_impl(
1483 (&mut buff[(size_of::<FileMessage>() * 2)..(size_of::<FileMessage>() * 3)])
1484 .try_into()
1485 .unwrap(),
1486 );
1487 }
1488 let device = fixture.close().await;
1489 device.ensure_unique();
1490
1491 device.reopen(false);
1492 let fixture =
1493 TestFixture::open(device, TestFixtureOptions { format: false, ..Default::default() })
1494 .await;
1495 {
1496 let tombstoned_file = fixture
1499 .volume()
1500 .volume()
1501 .get_or_load_node(tombstoned_file_id, ObjectDescriptor::File, None)
1502 .await
1503 .expect("Opening file object")
1504 .into_any()
1505 .downcast::<FxFile>()
1506 .unwrap();
1507 fixture
1508 .root()
1509 .unlink("bar", &fio::UnlinkOptions::default())
1510 .await
1511 .unwrap()
1512 .expect("Unlinking");
1513
1514 let mut state = new_profile_state(false);
1515 let volume = fixture.volume().volume();
1516
1517 let replay_handle = Box::new(FakeReaderWriter::new());
1518 replay_handle.inner.lock().data = buff;
1519
1520 state.replay_profile(
1521 replay_handle,
1522 volume.clone(),
1523 volume.scope().try_active_guard().unwrap(),
1524 );
1525
1526 let remaining_file = fixture
1528 .volume()
1529 .volume()
1530 .get_or_load_node(remaining_file_id, ObjectDescriptor::File, None)
1531 .await
1532 .expect("Opening file object")
1533 .into_any()
1534 .downcast::<FxFile>()
1535 .unwrap();
1536 while remaining_file.vmo().info().unwrap().committed_bytes == 0 {
1537 fasync::Timer::new(Duration::from_millis(25)).await;
1538 }
1539
1540 assert_eq!(tombstoned_file.vmo().info().unwrap().committed_bytes, 0);
1543 }
1544 fixture.close().await;
1545 }
1546}