1use crate::fuchsia::file::FxFile;
6use crate::fuchsia::fxblob::BlobDirectory;
7use crate::fuchsia::fxblob::blob::FxBlob;
8use crate::fuchsia::node::{FxNode, OpenedNode};
9use crate::fuchsia::pager::PagerBacked;
10use crate::fuchsia::volume::FxVolume;
11use anyhow::{Context as _, Error, anyhow, ensure};
12use arrayref::{array_refs, mut_array_refs};
13use async_trait::async_trait;
14use fuchsia_async as fasync;
15use fuchsia_hash::Hash;
16use futures::future::{self, BoxFuture, RemoteHandle, join_all};
17use futures::lock::Mutex;
18use futures::{FutureExt, select};
19use fxfs::errors::FxfsError;
20use fxfs::log::*;
21use fxfs::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle};
22use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
23use fxfs::object_store::{
24 HandleOptions, ObjectDescriptor, ObjectStore, Timestamp, VOLUME_DATA_KEY_ID, directory,
25};
26use linked_hash_map::LinkedHashMap;
27use scopeguard::ScopeGuard;
28use std::cmp::{Eq, PartialEq};
29use std::collections::btree_map::{BTreeMap, Entry};
30use std::marker::PhantomData;
31use std::mem::size_of;
32use std::pin::pin;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicU64, Ordering};
35use vfs::execution_scope::ActiveGuard;
36
37const FILE_OPEN_MARKER: u64 = u64::MAX;
38const REPLAY_THREADS: usize = 2;
39const MESSAGE_CHUNK_SIZE: usize = 64;
42const IO_SIZE: usize = 1 << 17; pub static RECORDED: AtomicU64 = AtomicU64::new(0);
45
46trait RecordedVolume: Send + Sync + Sized + Unpin {
47 type IdType: std::fmt::Display + Ord + Send + Sized;
48 type NodeType: PagerBacked;
49 type MessageType: Message<IdType = Self::IdType>;
50
51 fn new(volume: Arc<FxVolume>) -> Self;
52
53 fn volume(&self) -> &Arc<FxVolume>;
54
55 fn open(
56 &self,
57 id: Self::IdType,
58 ) -> impl std::future::Future<Output = Result<OpenedNode<Self::NodeType>, Error>> + Send;
59
60 fn file_is_replayable(
62 &self,
63 id: &Self::IdType,
64 ) -> impl std::future::Future<Output = bool> + Send;
65
66 fn read_and_queue(
67 &self,
68 handle: Box<dyn ReadObjectHandle>,
69 sender: &async_channel::Sender<Request<Self::NodeType>>,
70 local_cache: &mut BTreeMap<Self::IdType, Option<OpenedNode<Self::NodeType>>>,
71 ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
72 async move {
73 let mut io_buf = handle.allocate_buffer(IO_SIZE).await;
74 let block_size = handle.block_size() as usize;
75 let file_size = handle.get_size() as usize;
76 let mut offset = 0;
77 while offset < file_size {
78 let actual = handle
79 .read(offset as u64, io_buf.as_mut())
80 .await
81 .map_err(|e| e.context(format!("Failed to read at offset: {}", offset)))?;
82 offset += actual;
83 let mut local_offset = 0;
84 let mut next_block = block_size;
85 let mut next_offset = size_of::<Self::MessageType>();
86 while next_offset <= actual {
87 let msg = Self::MessageType::decode_from(
88 &io_buf.as_slice()[local_offset..next_offset],
89 );
90
91 local_offset = next_offset;
92 next_offset = local_offset + size_of::<Self::MessageType>();
93 if next_offset > next_block {
95 local_offset = next_block;
96 next_offset = local_offset + size_of::<Self::MessageType>();
97 next_block += block_size;
98 }
99
100 if msg.is_zeroes() {
103 break;
104 }
105
106 let file = match local_cache.entry(msg.id()) {
107 Entry::Occupied(entry) => match entry.get() {
108 Some(opened_file) => (*opened_file).clone(),
109 None => continue,
111 },
112 Entry::Vacant(entry) => match self.open(msg.id()).await {
113 Err(e) => {
114 debug!("Failed to open object {} from profile: {:?}", msg.id(), e);
115 entry.insert(None);
117 continue;
118 }
119 Ok(opened_file) => {
120 let file_clone = (*opened_file).clone();
121 entry.insert(Some(opened_file));
122 file_clone
123 }
124 },
125 };
126
127 sender.send(Request { file, offset: msg.offset() }).await?;
128 }
129 }
130 Ok(())
131 }
132 }
133
134 fn record(
135 &self,
136 name: &str,
137 receiver: async_channel::Receiver<Vec<Self::MessageType>>,
138 ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
139 async move {
140 let mut recorded_offsets = LinkedHashMap::<Self::MessageType, ()>::new();
141 let mut recorded_opens = BTreeMap::<Self::IdType, bool>::new();
142 while let Ok(buffer) = receiver.recv().await {
143 for message in buffer {
144 if message.is_open_marker() {
145 if let Entry::Vacant(entry) = recorded_opens.entry(message.id()) {
146 let usable = self.file_is_replayable(entry.key()).await;
147 entry.insert(usable);
148 }
149 } else {
150 recorded_offsets.insert(message, ());
151 }
152 }
153 }
154
155 let store = self.volume().store();
158 let fs = store.filesystem();
159 let mut transaction =
160 fs.clone().new_transaction(lock_keys![], Options::default()).await?;
161 let handle = ObjectStore::create_object(
162 self.volume(),
163 &mut transaction,
164 HandleOptions::default(),
165 None,
166 )
167 .await?;
168 store.add_to_graveyard(&mut transaction, handle.object_id());
169 transaction.commit().await?;
170
171 let clean_up = scopeguard::guard((), |_| {
172 fs.graveyard().queue_tombstone_object(store.store_object_id(), handle.object_id())
173 });
174
175 let block_size = handle.block_size() as usize;
176 let mut offset = 0;
177 let mut io_buf = handle.allocate_buffer(IO_SIZE).await;
178 let mut next_block = block_size;
179 while let Some((message, _)) = recorded_offsets.pop_front() {
180 if !recorded_opens.get(&message.id()).copied().unwrap_or(false) {
182 continue;
183 }
184
185 let mut next_offset = offset + size_of::<Self::MessageType>();
186 if next_offset > next_block {
187 io_buf.as_mut_slice()[offset..next_block].fill(0);
190 if next_block >= IO_SIZE {
191 handle
193 .write_or_append(None, io_buf.as_ref())
194 .await
195 .context("Failed to write profile block")?;
196 offset = 0;
197 next_offset = size_of::<Self::MessageType>();
198 next_block = block_size;
199 } else {
200 offset = next_block;
201 next_offset = offset + size_of::<Self::MessageType>();
202 next_block += block_size;
203 }
204 }
205 message.encode_to(&mut io_buf.as_mut_slice()[offset..next_offset]);
206 offset = next_offset;
207 }
208 if offset > 0 {
209 io_buf.as_mut_slice()[offset..next_block].fill(0);
210 handle
211 .write_or_append(None, io_buf.subslice(0..next_block))
212 .await
213 .context("Failed to write profile block")?;
214 }
215
216 let profile_dir = self.volume().get_profile_directory().await?;
217
218 let mut lock_keys =
219 lock_keys![LockKey::object(store.store_object_id(), profile_dir.object_id())];
220 let mut old_id = INVALID_OBJECT_ID;
221 let mut transaction = loop {
222 let transaction = fs.clone().new_transaction(lock_keys, Options::default()).await?;
223 if let Some((id, descriptor, _)) = profile_dir.lookup(name).await? {
224 ensure!(matches!(descriptor, ObjectDescriptor::File), FxfsError::Inconsistent);
225 if id == old_id {
226 break transaction;
227 }
228 lock_keys = lock_keys![
229 LockKey::object(store.store_object_id(), profile_dir.object_id()),
230 LockKey::object(store.store_object_id(), id)
231 ];
232 old_id = id;
233 } else {
234 old_id = INVALID_OBJECT_ID;
235 break transaction;
236 }
237 };
238
239 store.remove_from_graveyard(&mut transaction, handle.object_id());
240 directory::replace_child_with_object(
241 &mut transaction,
242 Some((handle.object_id(), ObjectDescriptor::File)),
243 (&profile_dir, name),
244 0,
245 Timestamp::now(),
246 )
247 .await?;
248 transaction.commit().await?;
249
250 ScopeGuard::into_inner(clean_up);
251 if old_id != INVALID_OBJECT_ID {
252 fs.graveyard().queue_tombstone_object(store.store_object_id(), old_id);
253 }
254
255 Ok(())
256 }
257 }
258}
259
260struct BlobVolume {
261 volume: Arc<FxVolume>,
262 root_dir: Mutex<Option<Arc<BlobDirectory>>>,
265}
266
267impl RecordedVolume for BlobVolume {
268 type IdType = Hash;
269 type NodeType = FxBlob;
270 type MessageType = BlobMessage;
271
272 fn new(volume: Arc<FxVolume>) -> Self {
273 Self { volume, root_dir: Mutex::new(None) }
274 }
275
276 fn volume(&self) -> &Arc<FxVolume> {
277 &self.volume
278 }
279
280 async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
281 let mut root_dir = self.root_dir.lock().await;
282 if root_dir.is_none() {
283 *root_dir = Some(
284 self.volume
285 .get_or_load_node(
286 self.volume.store().root_directory_object_id(),
287 ObjectDescriptor::Directory,
288 None,
289 )
290 .await?
291 .into_any()
292 .downcast::<BlobDirectory>()
293 .map_err(|_| FxfsError::Inconsistent)?,
294 );
295 };
296 root_dir
297 .as_ref()
298 .unwrap()
299 .open_blob(&id.into())
300 .await?
301 .ok_or_else(|| FxfsError::NotFound.into())
302 }
303
304 async fn file_is_replayable(&self, _id: &Self::IdType) -> bool {
305 true
307 }
308}
309
310struct FileVolume {
311 volume: Arc<FxVolume>,
312}
313
314impl RecordedVolume for FileVolume {
315 type IdType = u64;
316 type NodeType = FxFile;
317 type MessageType = FileMessage;
318
319 fn new(volume: Arc<FxVolume>) -> Self {
320 Self { volume }
321 }
322
323 fn volume(&self) -> &Arc<FxVolume> {
324 &self.volume
325 }
326
327 async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
328 self.volume
329 .get_or_load_node(id, ObjectDescriptor::File, None)
330 .await?
331 .into_any()
332 .downcast::<FxFile>()
333 .map_err(|_| anyhow!("Non-file opened"))?
334 .into_opened_node()
335 .ok_or_else(|| anyhow!("File being purged"))
336 }
337
338 async fn file_is_replayable(&self, id: &Self::IdType) -> bool {
339 match self.volume.store().get_keys(*id).await {
340 Ok(keys)
343 if keys.is_empty()
344 || (keys.len() == 1 && keys.first().unwrap().0 == VOLUME_DATA_KEY_ID) =>
345 {
346 true
347 }
348 _ => false,
349 }
350 }
351}
352
353trait Message: Eq + PartialEq + Sized + Send + Sync + std::hash::Hash + 'static {
354 type IdType: std::fmt::Display + Ord + Send + Sized;
355
356 fn id(&self) -> Self::IdType;
357 fn offset(&self) -> u64;
358 fn encode_to(&self, dest: &mut [u8]);
359 fn decode_from(src: &[u8]) -> Self;
360 fn is_zeroes(&self) -> bool;
361 fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error>;
362 fn is_open_marker(&self) -> bool;
363}
364
365#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
366struct BlobMessage {
367 id: Hash,
368 offset: u64,
371}
372
373impl BlobMessage {
374 fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
375 let (first, second) = mut_array_refs![dest, size_of::<Hash>(), size_of::<u64>()];
376 *first = self.id.into();
377 *second = self.offset.to_le_bytes();
378 }
379
380 fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
381 let (first, second) = array_refs!(src, size_of::<Hash>(), size_of::<u64>());
382 Self { id: Hash::from_array(*first), offset: u64::from_le_bytes(*second) }
383 }
384}
385
386impl Message for BlobMessage {
387 type IdType = Hash;
388
389 fn id(&self) -> Self::IdType {
390 self.id
391 }
392
393 fn offset(&self) -> u64 {
394 self.offset
395 }
396
397 fn encode_to(&self, dest: &mut [u8]) {
398 self.encode_to_impl(dest.try_into().unwrap());
399 }
400
401 fn decode_from(src: &[u8]) -> Self {
402 Self::decode_from_impl(src.try_into().unwrap())
403 }
404
405 fn is_zeroes(&self) -> bool {
406 self.id == Hash::from_array([0u8; size_of::<Hash>()]) && self.offset == 0
407 }
408
409 fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
410 match node.into_any().downcast::<FxBlob>() {
411 Ok(blob) => Ok(Self { id: blob.root(), offset }),
412 Err(_) => Err(anyhow!("Cannot record non-blob entry.")),
413 }
414 }
415
416 fn is_open_marker(&self) -> bool {
417 self.offset == FILE_OPEN_MARKER
418 }
419}
420
421#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
422struct FileMessage {
423 id: u64,
424 offset: u64,
427}
428
429impl FileMessage {
430 fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
431 let (first, second) = mut_array_refs![dest, size_of::<u64>(), size_of::<u64>()];
432 *first = self.id.to_le_bytes();
433 *second = self.offset.to_le_bytes();
434 }
435
436 fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
437 let (first, second) = array_refs!(src, size_of::<u64>(), size_of::<u64>());
438 Self { id: u64::from_le_bytes(*first), offset: u64::from_le_bytes(*second) }
439 }
440}
441
442impl Message for FileMessage {
443 type IdType = u64;
444
445 fn id(&self) -> Self::IdType {
446 self.id
447 }
448
449 fn offset(&self) -> u64 {
450 self.offset
451 }
452
453 fn encode_to(&self, dest: &mut [u8]) {
454 self.encode_to_impl(dest.try_into().unwrap())
455 }
456
457 fn decode_from(src: &[u8]) -> Self {
458 Self::decode_from_impl(src.try_into().unwrap())
459 }
460
461 fn is_zeroes(&self) -> bool {
462 self.id == 0 && self.offset == 0
463 }
464
465 fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
466 match node.into_any().downcast::<FxFile>() {
467 Ok(file) => Ok(Self { id: file.object_id(), offset }),
468 Err(_) => Err(anyhow!("Cannot record non-file entry")),
469 }
470 }
471
472 fn is_open_marker(&self) -> bool {
473 self.offset == FILE_OPEN_MARKER
474 }
475}
476
477pub trait Recorder: Send + Sync {
480 fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error>;
482
483 fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error>;
485}
486
487struct RecorderImpl<T: Message> {
488 sender: async_channel::Sender<Vec<T>>,
489 buffer: Vec<T>,
490}
491
492impl<T: Message> RecorderImpl<T> {
493 fn new(sender: async_channel::Sender<Vec<T>>) -> Self {
494 Self { sender, buffer: Vec::with_capacity(MESSAGE_CHUNK_SIZE) }
495 }
496}
497
498impl<T: Message> Recorder for RecorderImpl<T> {
499 fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error> {
500 self.buffer.push(T::from_node_request(node, offset)?);
501 if self.buffer.len() >= MESSAGE_CHUNK_SIZE {
502 self.sender.try_send(std::mem::replace(
505 &mut self.buffer,
506 Vec::with_capacity(MESSAGE_CHUNK_SIZE),
507 ))?;
508 }
509 RECORDED.fetch_add(1, Ordering::Relaxed);
510 Ok(())
511 }
512
513 fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error> {
514 self.record(node, FILE_OPEN_MARKER)
515 }
516}
517
518impl<T: Message> Drop for RecorderImpl<T> {
519 fn drop(&mut self) {
520 if self.buffer.len() > 0 {
522 let buffer = std::mem::take(&mut self.buffer);
523 let _ = self.sender.try_send(buffer);
524 }
525 }
526}
527
528struct Request<P: PagerBacked> {
529 file: Arc<P>,
530 offset: u64,
531}
532
533struct ReplayState<T> {
534 replay_threads: future::Shared<BoxFuture<'static, ()>>,
535 _cache_task: fasync::Task<()>,
536 _phantom: PhantomData<T>,
537}
538
539impl<T: RecordedVolume> ReplayState<T> {
540 fn new(handle: Box<dyn ReadObjectHandle>, volume: Arc<FxVolume>, guard: ActiveGuard) -> Self {
541 let (sender, receiver) = async_channel::unbounded::<Request<T::NodeType>>();
542
543 let mut replay_threads = Vec::with_capacity(REPLAY_THREADS);
546 for _ in 0..REPLAY_THREADS {
547 let receiver = receiver.clone();
548 let guard = guard.clone();
551 replay_threads.push(fasync::unblock(move || {
552 let _guard = guard;
553 Self::page_in_thread(receiver);
554 }));
555 }
556 let replay_threads = (Box::pin(async {
557 join_all(replay_threads).await;
558 }) as BoxFuture<'static, ()>)
559 .shared();
560
561 let scope = volume.scope().clone();
562 let cache_task = scope
563 .spawn({
564 async move {
568 let mut task = pin!(
569 async {
570 let mut local_cache: BTreeMap<
574 T::IdType,
575 Option<OpenedNode<T::NodeType>>,
576 > = BTreeMap::new();
577
578 let volume_id = volume.id();
579
580 if let Err(error) = T::new(volume)
581 .read_and_queue(handle, &sender, &mut local_cache)
582 .await
583 {
584 error!(error:?; "Failed to read back profile");
585 }
586 sender.close();
587
588 info!(
589 "Replay for volume {} opened {} of {} objects.",
590 volume_id,
591 local_cache.iter().filter(|(_, e)| e.is_some()).count(),
592 local_cache.len()
593 );
594
595 let () = std::future::pending().await;
597 }
598 .fuse()
599 );
600
601 select! {
602 _ = task => {}
603 _ = guard.on_cancel().fuse() => {}
604 }
605 }
606 })
607 .into();
608
609 Self { replay_threads, _cache_task: cache_task, _phantom: PhantomData }
610 }
611
612 fn page_in_thread(queue: async_channel::Receiver<Request<T::NodeType>>) {
613 while let Ok(request) = queue.recv_blocking() {
614 let _ = request.file.vmo().op_range(
615 zx::VmoOp::PREFETCH,
616 request.offset,
617 zx::system_get_page_size() as u64,
618 );
619 if queue.sender_count() == 0 {
621 return;
622 }
623 }
624 }
625}
626
627#[async_trait]
630pub trait ProfileState: Send + Sync {
631 fn record_new(&mut self, volume: &Arc<FxVolume>, name: &str) -> Box<dyn Recorder>;
635
636 fn replay_profile(
639 &mut self,
640 handle: Box<dyn ReadObjectHandle>,
641 volume: Arc<FxVolume>,
642 guard: ActiveGuard,
643 );
644
645 async fn wait_for_replay_to_finish(&mut self);
648
649 async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error>;
651}
652
653pub fn new_profile_state(is_blob: bool) -> Box<dyn ProfileState> {
654 if is_blob {
655 Box::new(ProfileStateImpl::<BlobVolume>::new())
656 } else {
657 Box::new(ProfileStateImpl::<FileVolume>::new())
658 }
659}
660
661struct ProfileStateImpl<T> {
662 recording: Option<RemoteHandle<Result<(), Error>>>,
663 replay: Option<ReplayState<T>>,
664}
665
666impl<T> ProfileStateImpl<T> {
667 fn new() -> Self {
668 Self { recording: None, replay: None }
669 }
670}
671
672#[async_trait]
673impl<T: RecordedVolume> ProfileState for ProfileStateImpl<T> {
674 fn record_new(&mut self, volume: &Arc<FxVolume>, name: &str) -> Box<dyn Recorder> {
675 let (sender, receiver) = async_channel::unbounded();
676 let volume = volume.clone();
677 let name = name.to_string();
678 self.recording = None;
680 let scope = volume.scope().clone();
681 let (task, remote_handle) = async move {
682 let recording = T::new(volume);
683 recording
684 .record(&name, receiver)
685 .await
686 .inspect_err(|error| warn!(error:?; "Profile recording '{name}' failed"))
687 }
688 .remote_handle();
689 self.recording = Some(remote_handle);
690 scope.spawn(task);
691 Box::new(RecorderImpl::new(sender))
692 }
693
694 fn replay_profile(
695 &mut self,
696 handle: Box<dyn ReadObjectHandle>,
697 volume: Arc<FxVolume>,
698 guard: ActiveGuard,
699 ) {
700 self.replay = Some(ReplayState::new(handle, volume, guard));
701 }
702
703 async fn wait_for_replay_to_finish(&mut self) {
704 if let Some(replay) = &mut self.replay {
705 replay.replay_threads.clone().await;
706 }
707 }
708
709 async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error> {
710 if let Some(recording) = self.recording.take() { recording.await } else { Ok(()) }
711 }
712}
713
714#[cfg(test)]
715mod tests {
716 use super::{
717 BlobMessage, BlobVolume, FileMessage, FileVolume, IO_SIZE, Message, RecordedVolume,
718 Request, new_profile_state,
719 };
720 use crate::fuchsia::file::FxFile;
721 use crate::fuchsia::fxblob::blob::FxBlob;
722 use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture, open_blob_fixture};
723 use crate::fuchsia::node::{FxNode, OpenedNode};
724 use crate::fuchsia::pager::PagerBacked;
725 use crate::fuchsia::testing::{TestFixture, TestFixtureOptions, open_file_checked};
726 use crate::fuchsia::volume::FxVolume;
727 use anyhow::Error;
728 use async_trait::async_trait;
729 use delivery_blob::CompressionMode;
730 use event_listener::{Event, EventListener};
731 use fuchsia_hash::Hash;
732 use fuchsia_sync::Mutex;
733 use fxfs::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
734 use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
735 use fxfs::object_store::{DataObjectHandle, HandleOptions, ObjectDescriptor, ObjectStore};
736 use std::collections::BTreeMap;
737 use std::mem::size_of;
738 use std::sync::Arc;
739 use std::time::Duration;
740 use storage_device::buffer::{BufferRef, MutableBufferRef};
741 use storage_device::buffer_allocator::{BufferAllocator, BufferFuture, BufferSource};
742 use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
743
744 struct FakeReaderWriterInner {
745 data: Vec<u8>,
746 delays: Vec<EventListener>,
747 }
748
749 struct FakeReaderWriter {
750 allocator: BufferAllocator,
751 inner: Arc<Mutex<FakeReaderWriterInner>>,
752 }
753
754 const BLOCK_SIZE: usize = 4096;
755
756 impl FakeReaderWriter {
757 fn new() -> Self {
758 Self {
759 allocator: BufferAllocator::new(BLOCK_SIZE, BufferSource::new(IO_SIZE * 2)),
760 inner: Arc::new(Mutex::new(FakeReaderWriterInner {
761 data: Vec::new(),
762 delays: Vec::new(),
763 })),
764 }
765 }
766
767 fn push_delay(&self, delay: EventListener) {
768 self.inner.lock().delays.insert(0, delay);
769 }
770 }
771
772 impl ObjectHandle for FakeReaderWriter {
773 fn object_id(&self) -> u64 {
774 0
775 }
776
777 fn block_size(&self) -> u64 {
778 self.allocator.block_size() as u64
779 }
780
781 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
782 self.allocator.allocate_buffer(size)
783 }
784 }
785
786 impl WriteObjectHandle for FakeReaderWriter {
787 async fn write_or_append(
788 &self,
789 offset: Option<u64>,
790 buf: BufferRef<'_>,
791 ) -> Result<u64, Error> {
792 assert!(offset.is_none());
794 let delay = self.inner.lock().delays.pop();
795 if let Some(delay) = delay {
796 delay.await;
797 }
798 self.inner.lock().data.extend_from_slice(buf.as_slice());
800 Ok(buf.len() as u64)
801 }
802
803 async fn truncate(&self, _size: u64) -> Result<(), Error> {
804 unreachable!();
805 }
806
807 async fn flush(&self) -> Result<(), Error> {
808 unreachable!();
809 }
810 }
811
812 async fn write_file(fixture: &TestFixture, name: &str, data: &[u8]) -> u64 {
813 let root_dir = fixture.volume().root_dir();
814 let mut transaction = fixture
815 .volume()
816 .volume()
817 .store()
818 .filesystem()
819 .new_transaction(
820 lock_keys![LockKey::object(
821 fixture.volume().volume().store().store_object_id(),
822 root_dir.object_id()
823 )],
824 Options::default(),
825 )
826 .await
827 .expect("Creating transaction for new file");
828 let id = root_dir
829 .directory()
830 .create_child_file(&mut transaction, name)
831 .await
832 .expect("Creating new_file")
833 .object_id();
834 transaction.commit().await.unwrap();
835 let file = open_file_checked(
836 fixture.root(),
837 name,
838 fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_FILE,
839 &Default::default(),
840 )
841 .await;
842 file.write(data).await.unwrap().expect("Writing file");
843 id
844 }
845
846 #[async_trait]
847 impl ReadObjectHandle for FakeReaderWriter {
848 async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
849 let delay = self.inner.lock().delays.pop();
850 if let Some(delay) = delay {
851 delay.await;
852 }
853 let inner = self.inner.lock();
855 assert!(offset as usize <= inner.data.len());
856 let offset_end = std::cmp::min(offset as usize + buf.len(), inner.data.len());
857 let size = offset_end - offset as usize;
858 buf.as_mut_slice()[..size].clone_from_slice(&inner.data[offset as usize..offset_end]);
859 Ok(size)
860 }
861
862 fn get_size(&self) -> u64 {
863 self.inner.lock().data.len() as u64
864 }
865 }
866
867 #[fuchsia::test]
868 async fn test_encode_decode_blob() {
869 let mut buf = [0u8; size_of::<BlobMessage>()];
870 let m = BlobMessage { id: [88u8; 32].into(), offset: 77 };
871 m.encode_to(&mut buf.as_mut_slice());
872 let m2 = BlobMessage::decode_from(&buf);
873 assert_eq!(m, m2);
874 }
875
876 #[fuchsia::test]
877 async fn test_encode_decode_file() {
878 let mut buf = [0u8; size_of::<FileMessage>()];
879 let m = FileMessage { id: 88, offset: 77 };
880 m.encode_to(&mut buf.as_mut_slice());
881 let m2 = FileMessage::decode_from(&buf);
882 assert!(!m2.is_zeroes());
883 assert_eq!(m, m2);
884 }
885
886 const TEST_PROFILE_NAME: &str = "test_profile";
887
888 async fn get_test_profile_handle(volume: &Arc<FxVolume>) -> DataObjectHandle<FxVolume> {
889 let profile_dir = volume.get_profile_directory().await.unwrap();
890 ObjectStore::open_object(
891 volume,
892 profile_dir
893 .lookup(TEST_PROFILE_NAME)
894 .await
895 .expect("lookup failed")
896 .expect("not found")
897 .0,
898 HandleOptions::default(),
899 None,
900 )
901 .await
902 .unwrap()
903 }
904
905 async fn get_test_profile_contents(volume: &Arc<FxVolume>) -> Vec<u8> {
906 get_test_profile_handle(volume).await.contents(1024 * 1024).await.unwrap().to_vec()
907 }
908
909 #[fuchsia::test]
910 async fn test_recording_basic_blob() {
911 let fixture = new_blob_fixture().await;
912 {
913 let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
914 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
915
916 let mut state = new_profile_state(true);
917 let volume = fixture.volume().volume();
918
919 {
920 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
922 recorder.record(blob.clone(), 0).unwrap();
923 recorder.record_open(blob).unwrap();
924 }
925
926 state.wait_for_recording_to_finish().await.unwrap();
927
928 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
929 }
930 fixture.close().await;
931 }
932
933 #[fuchsia::test]
934 async fn test_recording_basic_file() {
935 let fixture = TestFixture::new().await;
936 {
937 let id = write_file(&fixture, "foo", &[88u8]).await;
938 let node = fixture
939 .volume()
940 .volume()
941 .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
942 .await
943 .unwrap();
944
945 let mut state = new_profile_state(false);
946 let volume = fixture.volume().volume();
947
948 {
949 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
951 recorder.record(node.clone(), 0).unwrap();
952 recorder.record_open(node).unwrap();
953 }
954 state.wait_for_recording_to_finish().await.unwrap();
955
956 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
957 }
958 fixture.close().await;
959 }
960
961 #[fuchsia::test]
962 async fn test_recording_filtered_without_open() {
963 let fixture = new_blob_fixture().await;
964 {
965 let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
966 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
967
968 let mut state = new_profile_state(true);
969 let volume = fixture.volume().volume();
970
971 {
972 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
974 recorder.record(blob.clone(), 0).unwrap();
975 }
976 state.wait_for_recording_to_finish().await.unwrap();
977
978 assert_eq!(get_test_profile_contents(volume).await.len(), 0);
979 }
980 fixture.close().await;
981 }
982
983 #[fuchsia::test]
984 async fn test_recording_blob_more_than_block() {
985 let mut state = new_profile_state(true);
986
987 let fixture = new_blob_fixture().await;
988 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
989 let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
990 let hash;
991 let volume = fixture.volume().volume();
992
993 {
994 hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
995 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
996 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
998 recorder.record_open(blob.clone()).unwrap();
999 for i in 0..message_count {
1000 recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1001 }
1002 }
1003 state.wait_for_recording_to_finish().await.unwrap();
1004
1005 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1006
1007 let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1008 let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1009
1010 let volume = fixture.volume().volume().clone();
1011 let task = fasync::Task::spawn(async move {
1012 let handle = Box::new(get_test_profile_handle(&volume).await);
1013 let blob = BlobVolume::new(volume);
1014 blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1015 });
1016
1017 let mut recv_count = 0;
1018 while let Ok(msg) = receiver.recv().await {
1019 assert_eq!(msg.file.root(), hash);
1020 assert_eq!(msg.offset, 4096 * recv_count);
1021 recv_count += 1;
1022 }
1023 task.await;
1024 assert_eq!(recv_count, message_count as u64);
1025
1026 fixture.close().await;
1027 }
1028
1029 #[fuchsia::test]
1030 async fn test_recording_file_more_than_block() {
1031 let mut state = new_profile_state(false);
1032
1033 let fixture = TestFixture::new().await;
1034 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1035 let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1036 let id;
1037 let volume = fixture.volume().volume();
1038 {
1039 id = write_file(&fixture, "foo", &[88u8]).await;
1040 let node = volume
1041 .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1042 .await
1043 .unwrap();
1044 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1046 recorder.record_open(node.clone()).unwrap();
1047 for i in 0..message_count {
1048 recorder.record(node.clone(), 4096 * i as u64).unwrap();
1049 }
1050 }
1051 state.wait_for_recording_to_finish().await.unwrap();
1052
1053 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1054
1055 let mut local_cache: BTreeMap<u64, Option<OpenedNode<FxFile>>> = BTreeMap::new();
1056 let (sender, receiver) = async_channel::unbounded::<Request<FxFile>>();
1057
1058 let volume = fixture.volume().volume().clone();
1059 let task = fasync::Task::spawn(async move {
1060 let handle = Box::new(get_test_profile_handle(&volume).await);
1061 let file = FileVolume::new(volume);
1062 file.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1063 });
1064
1065 let mut recv_count = 0;
1066 while let Ok(msg) = receiver.recv().await {
1067 assert_eq!(msg.file.object_id(), id);
1068 assert_eq!(msg.offset, 4096 * recv_count);
1069 recv_count += 1;
1070 }
1071 task.await;
1072 assert_eq!(recv_count, message_count as u64);
1073
1074 fixture.close().await;
1075 }
1076
1077 #[fuchsia::test]
1078 async fn test_recording_more_than_io_size() {
1079 let fixture = new_blob_fixture().await;
1080
1081 {
1082 let mut state = new_profile_state(true);
1083 let message_count = (IO_SIZE as usize / size_of::<BlobMessage>()) + 1;
1084 let hash;
1085 let volume = fixture.volume().volume();
1086 {
1087 hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
1088 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1089 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1091 recorder.record_open(blob.clone()).unwrap();
1092 for i in 0..message_count {
1093 recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1094 }
1095 }
1096 state.wait_for_recording_to_finish().await.unwrap();
1097 assert_eq!(get_test_profile_contents(volume).await.len(), IO_SIZE + BLOCK_SIZE);
1098
1099 let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1100 let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1101
1102 let volume = volume.clone();
1103 let task = fasync::Task::spawn(async move {
1104 let handle = Box::new(get_test_profile_handle(&volume).await);
1105 let blob = BlobVolume::new(volume);
1106 blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1107 });
1108
1109 let mut recv_count = 0;
1110 while let Ok(msg) = receiver.recv().await {
1111 assert_eq!(msg.file.root(), hash);
1112 assert_eq!(msg.offset, 4096 * recv_count);
1113 recv_count += 1;
1114 }
1115 task.await;
1116 assert_eq!(recv_count, message_count as u64);
1117 }
1118
1119 fixture.close().await;
1120 }
1121
1122 #[fuchsia::test]
1123 async fn test_replay_profile_blob() {
1124 let mut state = new_profile_state(true);
1126
1127 let mut hashes = Vec::new();
1128
1129 let fixture = new_blob_fixture().await;
1130 {
1131 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1132 let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
1133
1134 let volume = fixture.volume().volume();
1135 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1136 for i in 0..message_count {
1138 let hash =
1139 fixture.write_blob(i.to_le_bytes().as_slice(), CompressionMode::Never).await;
1140 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1141 recorder.record_open(blob.clone()).unwrap();
1142 hashes.push(hash);
1143 recorder.record(blob.clone(), 0).unwrap();
1144 }
1145 };
1146 let device = fixture.close().await;
1147 device.ensure_unique();
1148 state.wait_for_recording_to_finish().await.unwrap();
1149
1150 device.reopen(false);
1151 let fixture = open_blob_fixture(device).await;
1152 {
1153 for hash in &hashes {
1156 let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1157 assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1158 }
1159
1160 let volume = fixture.volume().volume();
1161 state.replay_profile(
1162 Box::new(get_test_profile_handle(volume).await),
1163 volume.clone(),
1164 volume.scope().try_active_guard().unwrap(),
1165 );
1166
1167 for hash in &hashes {
1169 let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1170 while blob.vmo().info().unwrap().committed_bytes == 0 {
1171 fasync::Timer::new(Duration::from_millis(25)).await;
1172 }
1173 }
1174 }
1175 fixture.close().await;
1176 }
1177
1178 #[fuchsia::test]
1179 async fn test_replay_profile_file() {
1180 let mut state = new_profile_state(false);
1182
1183 let mut ids = Vec::new();
1184
1185 let fixture = TestFixture::new().await;
1186 {
1187 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1188 let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1189
1190 let volume = fixture.volume().volume();
1191 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1192 for i in 0..message_count {
1194 let id = write_file(&fixture, &i.to_string(), &[88u8]).await;
1195 let node = fixture
1196 .volume()
1197 .volume()
1198 .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1199 .await
1200 .unwrap();
1201 recorder.record_open(node.clone()).unwrap();
1202 ids.push(id);
1203 recorder.record(node.clone(), 0).unwrap();
1204 }
1205 };
1206 let device = fixture.close().await;
1207 device.ensure_unique();
1208 state.wait_for_recording_to_finish().await.unwrap();
1209
1210 device.reopen(false);
1211 let fixture = TestFixture::open(
1212 device,
1213 TestFixtureOptions { encrypted: true, format: false, ..Default::default() },
1214 )
1215 .await;
1216 {
1217 for id in &ids {
1219 let file = fixture
1220 .volume()
1221 .volume()
1222 .get_or_load_node(
1223 *id,
1224 ObjectDescriptor::File,
1225 Some(fixture.volume().root_dir()),
1226 )
1227 .await
1228 .unwrap()
1229 .into_any()
1230 .downcast::<FxFile>()
1231 .unwrap();
1232 assert_eq!(file.vmo().info().unwrap().committed_bytes, 0);
1233 }
1234
1235 let volume = fixture.volume().volume();
1236 state.replay_profile(
1237 Box::new(get_test_profile_handle(volume).await),
1238 volume.clone(),
1239 volume.scope().try_active_guard().unwrap(),
1240 );
1241
1242 for id in &ids {
1244 let file = fixture
1245 .volume()
1246 .volume()
1247 .get_or_load_node(
1248 *id,
1249 ObjectDescriptor::File,
1250 Some(fixture.volume().root_dir()),
1251 )
1252 .await
1253 .unwrap()
1254 .into_any()
1255 .downcast::<FxFile>()
1256 .unwrap();
1257 while file.vmo().info().unwrap().committed_bytes == 0 {
1258 fasync::Timer::new(Duration::from_millis(25)).await;
1259 }
1260 }
1261 state.wait_for_recording_to_finish().await.unwrap();
1262 }
1263 fixture.close().await;
1264 }
1265
1266 #[fuchsia::test]
1267 async fn test_recording_during_replay() {
1268 let mut state = new_profile_state(true);
1269
1270 let hash;
1271 let first_recording;
1272 let fixture = new_blob_fixture().await;
1273 let volume = fixture.volume().volume();
1274
1275 {
1277 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1278 hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1279 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1280 recorder.record_open(blob.clone()).unwrap();
1281 recorder.record(blob.clone(), 0).unwrap();
1282 }
1283
1284 state.wait_for_recording_to_finish().await.unwrap();
1285 first_recording = get_test_profile_contents(volume).await;
1286 assert_ne!(first_recording.len(), 0);
1287 let device = fixture.close().await;
1288 device.ensure_unique();
1289
1290 device.reopen(false);
1291 let fixture = open_blob_fixture(device).await;
1292
1293 {
1294 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1297 assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1298
1299 let volume = fixture.volume().volume();
1301 let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1302 recorder.record(blob.clone(), 4096).unwrap();
1303
1304 let volume = fixture.volume().volume();
1306 state.replay_profile(
1307 Box::new(get_test_profile_handle(volume).await),
1308 volume.clone(),
1309 volume.scope().try_active_guard().unwrap(),
1310 );
1311
1312 {
1314 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1315 while blob.vmo().info().unwrap().committed_bytes == 0 {
1316 fasync::Timer::new(Duration::from_millis(25)).await;
1317 }
1318 }
1319
1320 recorder.record_open(blob.clone()).unwrap();
1323 }
1324
1325 state.wait_for_recording_to_finish().await.unwrap();
1326
1327 let volume = fixture.volume().volume();
1328 let second_recording = get_test_profile_contents(volume).await;
1329 assert_ne!(second_recording.len(), 0);
1330 assert_ne!(&second_recording, &first_recording);
1331
1332 fixture.close().await;
1333 }
1334
1335 #[fuchsia::test]
1338 async fn test_replay_profile_stop_reading_early() {
1339 let mut state = new_profile_state(true);
1340 let fixture = new_blob_fixture().await;
1341
1342 {
1343 let volume = fixture.volume().volume();
1344
1345 let message;
1347 {
1348 let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1349 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1350 message = BlobMessage { id: blob.root(), offset: 0 };
1351 }
1352 state.wait_for_recording_to_finish().await.unwrap();
1353
1354 let replay_handle = Box::new(FakeReaderWriter::new());
1356 let mut buff = vec![0u8; IO_SIZE * 2];
1357 message.encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1358 message.encode_to_impl(
1359 (&mut buff[IO_SIZE..IO_SIZE + size_of::<BlobMessage>()]).try_into().unwrap(),
1360 );
1361
1362 replay_handle.inner.lock().data = buff;
1363 let delay1 = Event::new();
1364 replay_handle.push_delay(delay1.listen());
1365 let delay2 = Event::new();
1366 replay_handle.push_delay(delay2.listen());
1367
1368 state.replay_profile(
1369 replay_handle,
1370 volume.clone(),
1371 volume.scope().try_active_guard().unwrap(),
1372 );
1373
1374 fasync::Task::spawn(async move {
1376 fasync::Timer::new(Duration::from_millis(100)).await;
1378 delay1.notify(usize::MAX);
1379 })
1380 .detach();
1381 }
1382
1383 fixture.close().await;
1386 }
1387
1388 #[fuchsia::test]
1389 async fn test_replay_blob_missing() {
1390 let fixture = new_blob_fixture().await;
1391 let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1394 let mut buff = vec![0u8; IO_SIZE];
1395 {
1396 {
1399 let message = BlobMessage { id: [42u8; 32].into(), offset: 0 };
1400 message
1401 .encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1402 }
1403
1404 {
1406 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1407 let message = BlobMessage { id: blob.root(), offset: 0 };
1408 message.encode_to_impl(
1409 (&mut buff[size_of::<BlobMessage>()..(size_of::<BlobMessage>() * 2)])
1410 .try_into()
1411 .unwrap(),
1412 );
1413 }
1414 }
1415 let device = fixture.close().await;
1416 device.ensure_unique();
1417
1418 device.reopen(false);
1419 let fixture = open_blob_fixture(device).await;
1420 {
1421 let mut state = new_profile_state(true);
1422 let volume = fixture.volume().volume();
1423
1424 let replay_handle = Box::new(FakeReaderWriter::new());
1425 replay_handle.inner.lock().data = buff;
1426
1427 state.replay_profile(
1428 replay_handle,
1429 volume.clone(),
1430 volume.scope().try_active_guard().unwrap(),
1431 );
1432
1433 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1435 while blob.vmo().info().unwrap().committed_bytes == 0 {
1436 fasync::Timer::new(Duration::from_millis(25)).await;
1437 }
1438 }
1439 fixture.close().await;
1440 }
1441
1442 #[fuchsia::test]
1443 async fn test_replay_file_missing_or_tombstoned() {
1444 let fixture = TestFixture::new().await;
1445 let mut buff = vec![0u8; IO_SIZE];
1446 let remaining_file_id;
1449 let tombstoned_file_id;
1450 {
1452 let id = write_file(&fixture, "foo", &[1, 2, 3, 4]).await;
1453 let message = FileMessage { id, offset: 0 };
1454 message.encode_to_impl((&mut buff[0..size_of::<FileMessage>()]).try_into().unwrap());
1455 }
1456 fixture
1458 .root()
1459 .unlink("foo", &fio::UnlinkOptions::default())
1460 .await
1461 .unwrap()
1462 .expect("Unlinking");
1463
1464 {
1466 tombstoned_file_id = write_file(&fixture, "bar", &[1, 2, 3, 4]).await;
1467 let message = FileMessage { id: tombstoned_file_id, offset: 0 };
1468 message.encode_to_impl(
1469 (&mut buff[size_of::<FileMessage>()..(size_of::<FileMessage>() * 2)])
1470 .try_into()
1471 .unwrap(),
1472 );
1473 }
1474
1475 {
1477 remaining_file_id = write_file(&fixture, "baz", &[1, 2, 3, 4]).await;
1478 let message = FileMessage { id: remaining_file_id, offset: 0 };
1479 message.encode_to_impl(
1480 (&mut buff[(size_of::<FileMessage>() * 2)..(size_of::<FileMessage>() * 3)])
1481 .try_into()
1482 .unwrap(),
1483 );
1484 }
1485 let device = fixture.close().await;
1486 device.ensure_unique();
1487
1488 device.reopen(false);
1489 let fixture =
1490 TestFixture::open(device, TestFixtureOptions { format: false, ..Default::default() })
1491 .await;
1492 {
1493 let tombstoned_file = fixture
1496 .volume()
1497 .volume()
1498 .get_or_load_node(tombstoned_file_id, ObjectDescriptor::File, None)
1499 .await
1500 .expect("Opening file object")
1501 .into_any()
1502 .downcast::<FxFile>()
1503 .unwrap();
1504 fixture
1505 .root()
1506 .unlink("bar", &fio::UnlinkOptions::default())
1507 .await
1508 .unwrap()
1509 .expect("Unlinking");
1510
1511 let mut state = new_profile_state(false);
1512 let volume = fixture.volume().volume();
1513
1514 let replay_handle = Box::new(FakeReaderWriter::new());
1515 replay_handle.inner.lock().data = buff;
1516
1517 state.replay_profile(
1518 replay_handle,
1519 volume.clone(),
1520 volume.scope().try_active_guard().unwrap(),
1521 );
1522
1523 let remaining_file = fixture
1525 .volume()
1526 .volume()
1527 .get_or_load_node(remaining_file_id, ObjectDescriptor::File, None)
1528 .await
1529 .expect("Opening file object")
1530 .into_any()
1531 .downcast::<FxFile>()
1532 .unwrap();
1533 while remaining_file.vmo().info().unwrap().committed_bytes == 0 {
1534 fasync::Timer::new(Duration::from_millis(25)).await;
1535 }
1536
1537 assert_eq!(tombstoned_file.vmo().info().unwrap().committed_bytes, 0);
1540 }
1541 fixture.close().await;
1542 }
1543}