1use crate::fuchsia::file::FxFile;
6use crate::fuchsia::fxblob::BlobDirectory;
7use crate::fuchsia::fxblob::blob::FxBlob;
8use crate::fuchsia::node::{FxNode, OpenedNode};
9use crate::fuchsia::pager::PagerBacked;
10use crate::fuchsia::volume::FxVolume;
11use anyhow::{Context as _, Error, anyhow, ensure};
12use arrayref::{array_refs, mut_array_refs};
13use async_trait::async_trait;
14use fuchsia_async as fasync;
15use fuchsia_hash::Hash;
16use futures::future::{self, BoxFuture, RemoteHandle, join_all};
17use futures::lock::Mutex;
18use futures::{FutureExt, select};
19use fxfs::errors::FxfsError;
20use fxfs::log::*;
21use fxfs::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle};
22use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
23use fxfs::object_store::{
24 DataObjectHandle, HandleOptions, ObjectDescriptor, ObjectStore, Timestamp, VOLUME_DATA_KEY_ID,
25 directory,
26};
27use linked_hash_map::LinkedHashMap;
28use scopeguard::ScopeGuard;
29use std::cmp::{Eq, PartialEq};
30use std::collections::btree_map::{BTreeMap, Entry};
31use std::marker::PhantomData;
32use std::mem::size_of;
33use std::pin::pin;
34use std::sync::Arc;
35use std::sync::atomic::{AtomicU64, Ordering};
36use vfs::execution_scope::ActiveGuard;
37
38const FILE_OPEN_MARKER: u64 = u64::MAX;
39const REPLAY_THREADS: usize = 2;
40const MESSAGE_CHUNK_SIZE: usize = 64;
43const IO_SIZE: usize = 1 << 17; pub static RECORDED: AtomicU64 = AtomicU64::new(0);
46
47pub trait RecordingHandle: Send + Sync {
49 fn append<'a>(
51 &'a self,
52 buf: storage_device::buffer::BufferRef<'a>,
53 ) -> futures::future::BoxFuture<'a, Result<u64, Error>>;
54
55 fn allocate_buffer(
56 &self,
57 size: usize,
58 ) -> futures::future::BoxFuture<'_, storage_device::buffer::Buffer<'_>>;
59
60 fn block_size(&self) -> usize;
61
62 fn commit(self: Box<Self>) -> futures::future::BoxFuture<'static, Result<(), Error>>;
64
65 fn abort_cleanup(self: Box<Self>);
68}
69
70pub struct FileRecordingHandle {
72 name: String,
73 volume: Arc<FxVolume>,
74 handle: DataObjectHandle<FxVolume>,
75}
76
77impl FileRecordingHandle {
78 pub async fn new(name: &str, volume: Arc<FxVolume>) -> Result<Self, Error> {
79 let store = volume.store();
80 let mut transaction = store.new_transaction(lock_keys![], Options::default()).await?;
81 let handle =
82 ObjectStore::create_object(&volume, &mut transaction, HandleOptions::default(), None)
83 .await?;
84 store.add_to_graveyard(&mut transaction, handle.object_id());
85 transaction.commit().await?;
86
87 Ok(Self { name: name.to_string(), volume, handle })
88 }
89
90 async fn commit_impl(&self) -> Result<(), Error> {
91 let store = self.volume.store();
92 let fs = store.filesystem();
93 let profile_dir = self.volume.get_profile_directory().await?;
94
95 let mut lock_keys =
96 lock_keys![LockKey::object(store.store_object_id(), profile_dir.object_id())];
97 let mut old_id = INVALID_OBJECT_ID;
98 let mut transaction = loop {
99 let transaction = store.new_transaction(lock_keys, Options::default()).await?;
100 if let Some((id, descriptor, _)) = profile_dir.lookup(&self.name).await? {
101 ensure!(matches!(descriptor, ObjectDescriptor::File), FxfsError::Inconsistent);
102 if id == old_id {
103 break transaction;
104 }
105 lock_keys = lock_keys![
106 LockKey::object(store.store_object_id(), profile_dir.object_id()),
107 LockKey::object(store.store_object_id(), id)
108 ];
109 old_id = id;
110 } else {
111 old_id = INVALID_OBJECT_ID;
112 break transaction;
113 }
114 };
115
116 store.remove_from_graveyard(&mut transaction, self.handle.object_id());
117 directory::replace_child_with_object(
118 &mut transaction,
119 Some((self.handle.object_id(), ObjectDescriptor::File)),
120 (&profile_dir, &self.name),
121 0,
122 false,
123 Timestamp::now(),
124 )
125 .await?;
126 transaction.commit().await?;
127
128 if old_id != INVALID_OBJECT_ID {
129 fs.graveyard().queue_tombstone_object(store.store_object_id(), old_id);
130 }
131
132 Ok(())
133 }
134}
135
136impl RecordingHandle for FileRecordingHandle {
137 fn append<'a>(
138 &'a self,
139 buf: storage_device::buffer::BufferRef<'a>,
140 ) -> futures::future::BoxFuture<'a, Result<u64, Error>> {
141 async move { self.handle.write_or_append(None, buf).await.map_err(Into::into) }.boxed()
142 }
143
144 fn allocate_buffer(
145 &self,
146 size: usize,
147 ) -> futures::future::BoxFuture<'_, storage_device::buffer::Buffer<'_>> {
148 self.handle.allocate_buffer(size).boxed()
149 }
150
151 fn block_size(&self) -> usize {
152 self.handle.block_size() as usize
153 }
154
155 fn commit(self: Box<Self>) -> futures::future::BoxFuture<'static, Result<(), Error>> {
156 async move {
157 let store = self.volume.store();
158 self.commit_impl().await.inspect_err(|_| {
159 store
160 .filesystem()
161 .graveyard()
162 .queue_tombstone_object(store.store_object_id(), self.handle.object_id());
163 })
164 }
165 .boxed()
166 }
167
168 fn abort_cleanup(self: Box<Self>) {
169 let this = *self;
170 this.volume
171 .store()
172 .filesystem()
173 .graveyard()
174 .queue_tombstone_object(this.volume.store().store_object_id(), this.handle.object_id());
175 }
176}
177
178trait RecordedVolume: Send + Sync + Sized + Unpin {
179 type IdType: std::fmt::Display + Ord + Send + Sized;
180 type NodeType: PagerBacked;
181 type MessageType: Message<IdType = Self::IdType>;
182
183 fn new(volume: Arc<FxVolume>) -> Self;
184
185 fn open(
186 &self,
187 id: Self::IdType,
188 ) -> impl std::future::Future<Output = Result<OpenedNode<Self::NodeType>, Error>> + Send;
189
190 fn file_is_replayable(
192 &self,
193 id: &Self::IdType,
194 ) -> impl std::future::Future<Output = bool> + Send;
195
196 fn read_and_queue(
197 &self,
198 handle: Box<dyn ReadObjectHandle>,
199 sender: &async_channel::Sender<Request<Self::NodeType>>,
200 local_cache: &mut BTreeMap<Self::IdType, Option<OpenedNode<Self::NodeType>>>,
201 ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
202 async move {
203 let mut io_buf = handle.allocate_buffer(IO_SIZE).await;
204 let block_size = handle.block_size() as usize;
205 let file_size = handle.get_size() as usize;
206 let mut offset = 0;
207 while offset < file_size {
208 let actual = handle
209 .read(offset as u64, io_buf.as_mut())
210 .await
211 .map_err(|e| e.context(format!("Failed to read at offset: {}", offset)))?;
212 offset += actual;
213 let mut local_offset = 0;
214 let mut next_block = block_size;
215 let mut next_offset = size_of::<Self::MessageType>();
216 while next_offset <= actual {
217 let msg = Self::MessageType::decode_from(
218 &io_buf.as_slice()[local_offset..next_offset],
219 );
220
221 local_offset = next_offset;
222 next_offset = local_offset + size_of::<Self::MessageType>();
223 if next_offset > next_block {
225 local_offset = next_block;
226 next_offset = local_offset + size_of::<Self::MessageType>();
227 next_block += block_size;
228 }
229
230 if msg.is_zeroes() {
233 break;
234 }
235
236 let file = match local_cache.entry(msg.id()) {
237 Entry::Occupied(entry) => match entry.get() {
238 Some(opened_file) => (*opened_file).clone(),
239 None => continue,
241 },
242 Entry::Vacant(entry) => match self.open(msg.id()).await {
243 Err(e) => {
244 debug!("Failed to open object {} from profile: {:?}", msg.id(), e);
245 entry.insert(None);
247 continue;
248 }
249 Ok(opened_file) => {
250 let file_clone = opened_file.clone();
251 entry.insert(Some(opened_file));
252 file_clone
253 }
254 },
255 };
256
257 sender.send(Request { file, offset: msg.offset() }).await?;
258 }
259 }
260 Ok(())
261 }
262 }
263
264 fn record(
265 &self,
266 recording_handle: Box<dyn RecordingHandle>,
267 receiver: async_channel::Receiver<Vec<Self::MessageType>>,
268 ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
269 let recording_handle = scopeguard::guard(recording_handle, |recording_handle| {
271 recording_handle.abort_cleanup();
272 });
273
274 async move {
275 let mut recorded_offsets = LinkedHashMap::<Self::MessageType, ()>::new();
276 let mut recorded_opens = BTreeMap::<Self::IdType, bool>::new();
277 while let Ok(buffer) = receiver.recv().await {
278 for message in buffer {
279 if message.is_open_marker() {
280 if let Entry::Vacant(entry) = recorded_opens.entry(message.id()) {
281 let usable = self.file_is_replayable(entry.key()).await;
282 entry.insert(usable);
283 }
284 } else {
285 recorded_offsets.insert(message, ());
286 }
287 }
288 }
289
290 let block_size = recording_handle.block_size();
291 let mut offset = 0;
292 let mut io_buf = recording_handle.allocate_buffer(IO_SIZE).await;
293 let mut next_block = block_size;
294 while let Some((message, _)) = recorded_offsets.pop_front() {
295 if !recorded_opens.get(&message.id()).copied().unwrap_or(false) {
297 continue;
298 }
299
300 let mut next_offset = offset + size_of::<Self::MessageType>();
301 if next_offset > next_block {
302 io_buf.as_mut_slice()[offset..next_block].fill(0);
305 if next_block >= IO_SIZE {
306 recording_handle
307 .append(io_buf.as_ref())
308 .await
309 .context("Failed to write profile block")?;
310 offset = 0;
311 next_offset = size_of::<Self::MessageType>();
312 next_block = block_size;
313 } else {
314 offset = next_block;
315 next_offset = offset + size_of::<Self::MessageType>();
316 next_block += block_size;
317 }
318 }
319 message.encode_to(&mut io_buf.as_mut_slice()[offset..next_offset]);
320 offset = next_offset;
321 }
322 if offset > 0 {
323 io_buf.as_mut_slice()[offset..next_block].fill(0);
324 recording_handle
325 .append(io_buf.subslice(0..next_block))
326 .await
327 .context("Failed to write profile block")?;
328 }
329
330 std::mem::drop(io_buf);
331 let recording_handle = ScopeGuard::into_inner(recording_handle);
333 recording_handle.commit().await?;
334
335 Ok(())
336 }
337 }
338}
339
340struct BlobVolume {
341 volume: Arc<FxVolume>,
342 root_dir: Mutex<Option<Arc<BlobDirectory>>>,
345}
346
347impl RecordedVolume for BlobVolume {
348 type IdType = Hash;
349 type NodeType = FxBlob;
350 type MessageType = BlobMessage;
351
352 fn new(volume: Arc<FxVolume>) -> Self {
353 Self { volume, root_dir: Mutex::new(None) }
354 }
355
356 async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
357 let mut root_dir = self.root_dir.lock().await;
358 if root_dir.is_none() {
359 *root_dir = Some(
360 self.volume
361 .get_or_load_node(
362 self.volume.store().root_directory_object_id(),
363 ObjectDescriptor::Directory,
364 None,
365 )
366 .await?
367 .into_any()
368 .downcast::<BlobDirectory>()
369 .map_err(|_| FxfsError::Inconsistent)?,
370 );
371 };
372 root_dir
373 .as_ref()
374 .unwrap()
375 .open_blob(&id.into())
376 .await?
377 .ok_or_else(|| FxfsError::NotFound.into())
378 }
379
380 async fn file_is_replayable(&self, _id: &Self::IdType) -> bool {
381 true
383 }
384}
385
386struct FileVolume {
387 volume: Arc<FxVolume>,
388}
389
390impl RecordedVolume for FileVolume {
391 type IdType = u64;
392 type NodeType = FxFile;
393 type MessageType = FileMessage;
394
395 fn new(volume: Arc<FxVolume>) -> Self {
396 Self { volume }
397 }
398
399 async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
400 self.volume
401 .get_or_load_node(id, ObjectDescriptor::File, None)
402 .await?
403 .into_any()
404 .downcast::<FxFile>()
405 .map_err(|_| anyhow!("Non-file opened"))?
406 .into_opened_node()
407 .ok_or_else(|| anyhow!("File being purged"))
408 }
409
410 async fn file_is_replayable(&self, id: &Self::IdType) -> bool {
411 match self.volume.store().get_keys(*id).await {
412 Ok(keys)
415 if keys.is_empty()
416 || (keys.len() == 1 && keys.first().unwrap().0 == VOLUME_DATA_KEY_ID) =>
417 {
418 true
419 }
420 _ => false,
421 }
422 }
423}
424
425trait Message: Eq + PartialEq + Sized + Send + Sync + std::hash::Hash + 'static {
426 type IdType: std::fmt::Display + Ord + Send + Sized;
427
428 fn id(&self) -> Self::IdType;
429 fn offset(&self) -> u64;
430 fn encode_to(&self, dest: &mut [u8]);
431 fn decode_from(src: &[u8]) -> Self;
432 fn is_zeroes(&self) -> bool;
433 fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error>;
434 fn is_open_marker(&self) -> bool;
435}
436
437#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
438struct BlobMessage {
439 id: Hash,
440 offset: u64,
443}
444
445impl BlobMessage {
446 fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
447 let (first, second) = mut_array_refs![dest, size_of::<Hash>(), size_of::<u64>()];
448 *first = self.id.into();
449 *second = self.offset.to_le_bytes();
450 }
451
452 fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
453 let (first, second) = array_refs!(src, size_of::<Hash>(), size_of::<u64>());
454 Self { id: Hash::from_array(*first), offset: u64::from_le_bytes(*second) }
455 }
456}
457
458impl Message for BlobMessage {
459 type IdType = Hash;
460
461 fn id(&self) -> Self::IdType {
462 self.id
463 }
464
465 fn offset(&self) -> u64 {
466 self.offset
467 }
468
469 fn encode_to(&self, dest: &mut [u8]) {
470 self.encode_to_impl(dest.try_into().unwrap());
471 }
472
473 fn decode_from(src: &[u8]) -> Self {
474 Self::decode_from_impl(src.try_into().unwrap())
475 }
476
477 fn is_zeroes(&self) -> bool {
478 self.id == Hash::from_array([0u8; size_of::<Hash>()]) && self.offset == 0
479 }
480
481 fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
482 match node.into_any().downcast::<FxBlob>() {
483 Ok(blob) => Ok(Self { id: blob.root(), offset }),
484 Err(_) => Err(anyhow!("Cannot record non-blob entry.")),
485 }
486 }
487
488 fn is_open_marker(&self) -> bool {
489 self.offset == FILE_OPEN_MARKER
490 }
491}
492
493#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
494struct FileMessage {
495 id: u64,
496 offset: u64,
499}
500
501impl FileMessage {
502 fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
503 let (first, second) = mut_array_refs![dest, size_of::<u64>(), size_of::<u64>()];
504 *first = self.id.to_le_bytes();
505 *second = self.offset.to_le_bytes();
506 }
507
508 fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
509 let (first, second) = array_refs!(src, size_of::<u64>(), size_of::<u64>());
510 Self { id: u64::from_le_bytes(*first), offset: u64::from_le_bytes(*second) }
511 }
512}
513
514impl Message for FileMessage {
515 type IdType = u64;
516
517 fn id(&self) -> Self::IdType {
518 self.id
519 }
520
521 fn offset(&self) -> u64 {
522 self.offset
523 }
524
525 fn encode_to(&self, dest: &mut [u8]) {
526 self.encode_to_impl(dest.try_into().unwrap())
527 }
528
529 fn decode_from(src: &[u8]) -> Self {
530 Self::decode_from_impl(src.try_into().unwrap())
531 }
532
533 fn is_zeroes(&self) -> bool {
534 self.id == 0 && self.offset == 0
535 }
536
537 fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
538 match node.into_any().downcast::<FxFile>() {
539 Ok(file) => Ok(Self { id: file.object_id(), offset }),
540 Err(_) => Err(anyhow!("Cannot record non-file entry")),
541 }
542 }
543
544 fn is_open_marker(&self) -> bool {
545 self.offset == FILE_OPEN_MARKER
546 }
547}
548
549pub trait Recorder: Send + Sync {
552 fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error>;
554
555 fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error>;
557}
558
559struct RecorderImpl<T: Message> {
560 sender: async_channel::Sender<Vec<T>>,
561 buffer: Vec<T>,
562}
563
564impl<T: Message> RecorderImpl<T> {
565 fn new(sender: async_channel::Sender<Vec<T>>) -> Self {
566 Self { sender, buffer: Vec::with_capacity(MESSAGE_CHUNK_SIZE) }
567 }
568}
569
570impl<T: Message> Recorder for RecorderImpl<T> {
571 fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error> {
572 self.buffer.push(T::from_node_request(node, offset)?);
573 if self.buffer.len() >= MESSAGE_CHUNK_SIZE {
574 self.sender.try_send(std::mem::replace(
577 &mut self.buffer,
578 Vec::with_capacity(MESSAGE_CHUNK_SIZE),
579 ))?;
580 }
581 RECORDED.fetch_add(1, Ordering::Relaxed);
582 Ok(())
583 }
584
585 fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error> {
586 self.record(node, FILE_OPEN_MARKER)
587 }
588}
589
590impl<T: Message> Drop for RecorderImpl<T> {
591 fn drop(&mut self) {
592 if self.buffer.len() > 0 {
594 let buffer = std::mem::take(&mut self.buffer);
595 let _ = self.sender.try_send(buffer);
596 }
597 }
598}
599
600struct Request<P: PagerBacked> {
601 file: Arc<P>,
602 offset: u64,
603}
604
605struct ReplayState<T> {
606 replay_threads: future::Shared<BoxFuture<'static, ()>>,
607 _cache_task: fasync::Task<()>,
608 _phantom: PhantomData<T>,
609}
610
611impl<T: RecordedVolume> ReplayState<T> {
612 fn new(handle: Box<dyn ReadObjectHandle>, volume: Arc<FxVolume>, guard: ActiveGuard) -> Self {
613 let (sender, receiver) = async_channel::unbounded::<Request<T::NodeType>>();
614
615 let mut replay_threads = Vec::with_capacity(REPLAY_THREADS);
618 for _ in 0..REPLAY_THREADS {
619 let receiver = receiver.clone();
620 let guard = guard.clone();
623 replay_threads.push(fasync::unblock(move || {
624 let _guard = guard;
625 Self::page_in_thread(receiver);
626 }));
627 }
628 let replay_threads = (Box::pin(async {
629 join_all(replay_threads).await;
630 }) as BoxFuture<'static, ()>)
631 .shared();
632
633 let scope = volume.scope().clone();
634 let cache_task = scope
635 .spawn({
636 async move {
640 let mut task = pin!(
641 async {
642 let mut local_cache: BTreeMap<
646 T::IdType,
647 Option<OpenedNode<T::NodeType>>,
648 > = BTreeMap::new();
649
650 let volume_id = volume.id();
651
652 if let Err(error) = T::new(volume)
653 .read_and_queue(handle, &sender, &mut local_cache)
654 .await
655 {
656 error!(error:?; "Failed to read back profile");
657 }
658 sender.close();
659
660 info!(
661 "Replay for volume {} opened {} of {} objects.",
662 volume_id,
663 local_cache.iter().filter(|(_, e)| e.is_some()).count(),
664 local_cache.len()
665 );
666
667 let () = std::future::pending().await;
669 }
670 .fuse()
671 );
672
673 select! {
674 _ = task => {}
675 _ = guard.on_cancel().fuse() => {}
676 }
677 }
678 })
679 .into();
680
681 Self { replay_threads, _cache_task: cache_task, _phantom: PhantomData }
682 }
683
684 fn page_in_thread(queue: async_channel::Receiver<Request<T::NodeType>>) {
685 while let Ok(request) = queue.recv_blocking() {
686 let res = request.file.vmo().op_range(
687 zx::VmoOp::PREFETCH,
688 request.offset,
689 zx::system_get_page_size() as u64,
690 );
691 if let Err(e) = res {
692 warn!("Failed to prefetch page: {:?}", e);
693 }
694 if queue.sender_count() == 0 {
696 return;
697 }
698 }
699 }
700}
701
702#[async_trait]
705pub trait ProfileState: Send + Sync {
706 fn record_new(
710 &mut self,
711 volume: &Arc<FxVolume>,
712 recording_handle: Box<dyn RecordingHandle>,
713 ) -> Box<dyn Recorder>;
714
715 fn replay_profile(
718 &mut self,
719 handle: Box<dyn ReadObjectHandle>,
720 volume: Arc<FxVolume>,
721 guard: ActiveGuard,
722 );
723
724 async fn wait_for_replay_to_finish(&mut self);
727
728 async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error>;
730}
731
732pub fn new_profile_state(is_blob: bool) -> Box<dyn ProfileState> {
733 if is_blob {
734 Box::new(ProfileStateImpl::<BlobVolume>::new())
735 } else {
736 Box::new(ProfileStateImpl::<FileVolume>::new())
737 }
738}
739
740struct ProfileStateImpl<T> {
741 recording: Option<RemoteHandle<Result<(), Error>>>,
742 replay: Option<ReplayState<T>>,
743}
744
745impl<T> ProfileStateImpl<T> {
746 fn new() -> Self {
747 Self { recording: None, replay: None }
748 }
749}
750
751#[async_trait]
752impl<T: RecordedVolume> ProfileState for ProfileStateImpl<T> {
753 fn record_new(
754 &mut self,
755 volume: &Arc<FxVolume>,
756 recording_handle: Box<dyn RecordingHandle>,
757 ) -> Box<dyn Recorder> {
758 let (sender, receiver) = async_channel::unbounded();
759 let volume = volume.clone();
760 self.recording = None;
762 let scope = volume.scope().clone();
763 let (task, remote_handle) = async move {
764 let recording = T::new(volume);
765 recording
766 .record(recording_handle, receiver)
767 .await
768 .inspect_err(|error| warn!(error:?; "Profile recording failed"))
769 }
770 .remote_handle();
771 self.recording = Some(remote_handle);
772 scope.spawn(task);
773 Box::new(RecorderImpl::new(sender))
774 }
775
776 fn replay_profile(
777 &mut self,
778 handle: Box<dyn ReadObjectHandle>,
779 volume: Arc<FxVolume>,
780 guard: ActiveGuard,
781 ) {
782 self.replay = Some(ReplayState::new(handle, volume, guard));
783 }
784
785 async fn wait_for_replay_to_finish(&mut self) {
786 if let Some(replay) = &mut self.replay {
787 replay.replay_threads.clone().await;
788 }
789 }
790
791 async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error> {
792 if let Some(recording) = self.recording.take() { recording.await } else { Ok(()) }
793 }
794}
795
796#[cfg(test)]
797mod tests {
798 use super::{
799 BlobMessage, BlobVolume, FileMessage, FileRecordingHandle, FileVolume, IO_SIZE, Message,
800 RecordedVolume, Request, new_profile_state,
801 };
802 use crate::fuchsia::file::FxFile;
803 use crate::fuchsia::fxblob::blob::FxBlob;
804 use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture, open_blob_fixture};
805 use crate::fuchsia::node::{FxNode, OpenedNode};
806 use crate::fuchsia::pager::PagerBacked;
807 use crate::fuchsia::testing::{TestFixture, TestFixtureOptions, open_file_checked};
808 use crate::fuchsia::volume::FxVolume;
809 use anyhow::Error;
810 use async_trait::async_trait;
811 use delivery_blob::CompressionMode;
812 use event_listener::{Event, EventListener};
813 use fidl_fuchsia_io as fio;
814 use fuchsia_async as fasync;
815 use fuchsia_hash::Hash;
816 use fuchsia_sync::Mutex;
817 use fxfs::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
818 use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
819 use fxfs::object_store::{DataObjectHandle, HandleOptions, ObjectDescriptor, ObjectStore};
820 use std::collections::BTreeMap;
821 use std::mem::size_of;
822 use std::sync::Arc;
823 use std::time::Duration;
824 use storage_device::buffer::{BufferRef, MutableBufferRef};
825 use storage_device::buffer_allocator::{BufferAllocator, BufferFuture, BufferSource};
826
827 struct FakeReaderWriterInner {
828 data: Vec<u8>,
829 delays: Vec<EventListener>,
830 }
831
832 struct FakeReaderWriter {
833 allocator: BufferAllocator,
834 inner: Arc<Mutex<FakeReaderWriterInner>>,
835 }
836
837 const BLOCK_SIZE: usize = 4096;
838
839 impl FakeReaderWriter {
840 fn new() -> Self {
841 Self {
842 allocator: BufferAllocator::new(BLOCK_SIZE, BufferSource::new(IO_SIZE * 2)),
843 inner: Arc::new(Mutex::new(FakeReaderWriterInner {
844 data: Vec::new(),
845 delays: Vec::new(),
846 })),
847 }
848 }
849
850 fn push_delay(&self, delay: EventListener) {
851 self.inner.lock().delays.insert(0, delay);
852 }
853 }
854
855 impl ObjectHandle for FakeReaderWriter {
856 fn object_id(&self) -> u64 {
857 0
858 }
859
860 fn block_size(&self) -> u64 {
861 self.allocator.block_size() as u64
862 }
863
864 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
865 self.allocator.allocate_buffer(size)
866 }
867 }
868
869 impl WriteObjectHandle for FakeReaderWriter {
870 async fn write_or_append(
871 &self,
872 offset: Option<u64>,
873 buf: BufferRef<'_>,
874 ) -> Result<u64, Error> {
875 assert!(offset.is_none());
877 let delay = self.inner.lock().delays.pop();
878 if let Some(delay) = delay {
879 delay.await;
880 }
881 self.inner.lock().data.extend_from_slice(buf.as_slice());
883 Ok(buf.len() as u64)
884 }
885
886 async fn truncate(&self, _size: u64) -> Result<(), Error> {
887 unreachable!();
888 }
889
890 async fn flush(&self) -> Result<(), Error> {
891 unreachable!();
892 }
893 }
894
895 async fn write_file(fixture: &TestFixture, name: &str, data: &[u8]) -> u64 {
896 let root_dir = fixture.volume().root_dir();
897 let mut transaction = fixture
898 .volume()
899 .volume()
900 .store()
901 .new_transaction(
902 lock_keys![LockKey::object(
903 fixture.volume().volume().store().store_object_id(),
904 root_dir.object_id()
905 )],
906 Options::default(),
907 )
908 .await
909 .expect("Creating transaction for new file");
910 let id = root_dir
911 .directory()
912 .create_child_file(&mut transaction, name)
913 .await
914 .expect("Creating new_file")
915 .object_id();
916 transaction.commit().await.unwrap();
917 let file = open_file_checked(
918 fixture.root(),
919 name,
920 fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_FILE,
921 &Default::default(),
922 )
923 .await;
924 file.write(data).await.unwrap().expect("Writing file");
925 id
926 }
927
928 #[async_trait]
929 impl ReadObjectHandle for FakeReaderWriter {
930 async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
931 let delay = self.inner.lock().delays.pop();
932 if let Some(delay) = delay {
933 delay.await;
934 }
935 let inner = self.inner.lock();
937 assert!(offset as usize <= inner.data.len());
938 let offset_end = std::cmp::min(offset as usize + buf.len(), inner.data.len());
939 let size = offset_end - offset as usize;
940 buf.as_mut_slice()[..size].clone_from_slice(&inner.data[offset as usize..offset_end]);
941 Ok(size)
942 }
943
944 fn get_size(&self) -> u64 {
945 self.inner.lock().data.len() as u64
946 }
947 }
948
949 #[fuchsia::test]
950 async fn test_encode_decode_blob() {
951 let mut buf = [0u8; size_of::<BlobMessage>()];
952 let m = BlobMessage { id: [88u8; 32].into(), offset: 77 };
953 m.encode_to(&mut buf.as_mut_slice());
954 let m2 = BlobMessage::decode_from(&buf);
955 assert_eq!(m, m2);
956 }
957
958 #[fuchsia::test]
959 async fn test_encode_decode_file() {
960 let mut buf = [0u8; size_of::<FileMessage>()];
961 let m = FileMessage { id: 88, offset: 77 };
962 m.encode_to(&mut buf.as_mut_slice());
963 let m2 = FileMessage::decode_from(&buf);
964 assert!(!m2.is_zeroes());
965 assert_eq!(m, m2);
966 }
967
968 const TEST_PROFILE_NAME: &str = "test_profile";
969
970 async fn get_test_profile_handle(volume: &Arc<FxVolume>) -> DataObjectHandle<FxVolume> {
971 let profile_dir = volume.get_profile_directory().await.unwrap();
972 ObjectStore::open_object(
973 volume,
974 profile_dir
975 .lookup(TEST_PROFILE_NAME)
976 .await
977 .expect("lookup failed")
978 .expect("not found")
979 .0,
980 HandleOptions::default(),
981 None,
982 )
983 .await
984 .unwrap()
985 }
986
987 async fn get_test_profile_contents(volume: &Arc<FxVolume>) -> Vec<u8> {
988 get_test_profile_handle(volume).await.contents(1024 * 1024).await.unwrap().to_vec()
989 }
990
991 #[fuchsia::test]
992 async fn test_recording_basic_blob() {
993 let fixture = new_blob_fixture().await;
994 {
995 let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
996 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
997
998 let mut state = new_profile_state(true);
999 let volume = fixture.volume().volume();
1000
1001 {
1002 let handle =
1004 FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1005 let mut recorder = state.record_new(volume, Box::new(handle));
1006 recorder.record(blob.clone(), 0).unwrap();
1007 recorder.record_open(blob).unwrap();
1008 }
1009
1010 state.wait_for_recording_to_finish().await.unwrap();
1011
1012 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
1013 }
1014 fixture.close().await;
1015 }
1016
1017 #[fuchsia::test]
1018 async fn test_recording_basic_file() {
1019 let fixture = TestFixture::new().await;
1020 {
1021 let id = write_file(&fixture, "foo", &[88u8]).await;
1022 let node = fixture
1023 .volume()
1024 .volume()
1025 .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1026 .await
1027 .unwrap();
1028
1029 let mut state = new_profile_state(false);
1030 let volume = fixture.volume().volume();
1031
1032 {
1033 let handle =
1035 FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1036 let mut recorder = state.record_new(volume, Box::new(handle));
1037 recorder.record(node.clone(), 0).unwrap();
1038 recorder.record_open(node).unwrap();
1039 }
1040 state.wait_for_recording_to_finish().await.unwrap();
1041
1042 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
1043 }
1044 fixture.close().await;
1045 }
1046
1047 #[fuchsia::test]
1048 async fn test_recording_filtered_without_open() {
1049 let fixture = new_blob_fixture().await;
1050 {
1051 let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
1052 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1053
1054 let mut state = new_profile_state(true);
1055 let volume = fixture.volume().volume();
1056
1057 {
1058 let handle =
1060 FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1061 let mut recorder = state.record_new(volume, Box::new(handle));
1062 recorder.record(blob.clone(), 0).unwrap();
1063 }
1064 state.wait_for_recording_to_finish().await.unwrap();
1065
1066 assert_eq!(get_test_profile_contents(volume).await.len(), 0);
1067 }
1068 fixture.close().await;
1069 }
1070
1071 #[fuchsia::test]
1072 async fn test_recording_blob_more_than_block() {
1073 let mut state = new_profile_state(true);
1074
1075 let fixture = new_blob_fixture().await;
1076 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1077 let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
1078 let hash;
1079 let volume = fixture.volume().volume();
1080
1081 {
1082 hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
1083 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1084 let handle = FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1086 let mut recorder = state.record_new(volume, Box::new(handle));
1087 recorder.record_open(blob.clone()).unwrap();
1088 for i in 0..message_count {
1089 recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1090 }
1091 }
1092 state.wait_for_recording_to_finish().await.unwrap();
1093
1094 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1095
1096 let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1097 let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1098
1099 let volume = fixture.volume().volume().clone();
1100 let task = fasync::Task::spawn(async move {
1101 let handle = Box::new(get_test_profile_handle(&volume).await);
1102 let blob = BlobVolume::new(volume);
1103 blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1104 });
1105
1106 let mut recv_count = 0;
1107 while let Ok(msg) = receiver.recv().await {
1108 assert_eq!(msg.file.root(), hash);
1109 assert_eq!(msg.offset, 4096 * recv_count);
1110 recv_count += 1;
1111 }
1112 task.await;
1113 assert_eq!(recv_count, message_count as u64);
1114
1115 fixture.close().await;
1116 }
1117
1118 #[fuchsia::test]
1119 async fn test_recording_file_more_than_block() {
1120 let mut state = new_profile_state(false);
1121
1122 let fixture = TestFixture::new().await;
1123 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1124 let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1125 let id;
1126 let volume = fixture.volume().volume();
1127 {
1128 id = write_file(&fixture, "foo", &[88u8]).await;
1129 let node = volume
1130 .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1131 .await
1132 .unwrap();
1133 let handle = FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1135 let mut recorder = state.record_new(volume, Box::new(handle));
1136 recorder.record_open(node.clone()).unwrap();
1137 for i in 0..message_count {
1138 recorder.record(node.clone(), 4096 * i as u64).unwrap();
1139 }
1140 }
1141 state.wait_for_recording_to_finish().await.unwrap();
1142
1143 assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1144
1145 let mut local_cache: BTreeMap<u64, Option<OpenedNode<FxFile>>> = BTreeMap::new();
1146 let (sender, receiver) = async_channel::unbounded::<Request<FxFile>>();
1147
1148 let volume = fixture.volume().volume().clone();
1149 let task = fasync::Task::spawn(async move {
1150 let handle = Box::new(get_test_profile_handle(&volume).await);
1151 let file = FileVolume::new(volume);
1152 file.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1153 });
1154
1155 let mut recv_count = 0;
1156 while let Ok(msg) = receiver.recv().await {
1157 assert_eq!(msg.file.object_id(), id);
1158 assert_eq!(msg.offset, 4096 * recv_count);
1159 recv_count += 1;
1160 }
1161 task.await;
1162 assert_eq!(recv_count, message_count as u64);
1163
1164 fixture.close().await;
1165 }
1166
1167 #[fuchsia::test]
1168 async fn test_recording_more_than_io_size() {
1169 let fixture = new_blob_fixture().await;
1170
1171 {
1172 let mut state = new_profile_state(true);
1173 let message_count = (IO_SIZE as usize / size_of::<BlobMessage>()) + 1;
1174 let hash;
1175 let volume = fixture.volume().volume();
1176 {
1177 hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
1178 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1179 let handle =
1181 FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1182 let mut recorder = state.record_new(volume, Box::new(handle));
1183 recorder.record_open(blob.clone()).unwrap();
1184 for i in 0..message_count {
1185 recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1186 }
1187 }
1188 state.wait_for_recording_to_finish().await.unwrap();
1189 assert_eq!(get_test_profile_contents(volume).await.len(), IO_SIZE + BLOCK_SIZE);
1190
1191 let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1192 let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1193
1194 let volume = volume.clone();
1195 let task = fasync::Task::spawn(async move {
1196 let handle = Box::new(get_test_profile_handle(&volume).await);
1197 let blob = BlobVolume::new(volume);
1198 blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1199 });
1200
1201 let mut recv_count = 0;
1202 while let Ok(msg) = receiver.recv().await {
1203 assert_eq!(msg.file.root(), hash);
1204 assert_eq!(msg.offset, 4096 * recv_count);
1205 recv_count += 1;
1206 }
1207 task.await;
1208 assert_eq!(recv_count, message_count as u64);
1209 }
1210
1211 fixture.close().await;
1212 }
1213
1214 #[fuchsia::test]
1215 async fn test_replay_profile_blob() {
1216 let mut state = new_profile_state(true);
1218
1219 let mut hashes = Vec::new();
1220
1221 let fixture = new_blob_fixture().await;
1222 {
1223 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1224 let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
1225
1226 let volume = fixture.volume().volume();
1227 let handle = FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1228 let mut recorder = state.record_new(volume, Box::new(handle));
1229 for i in 0..message_count {
1231 let hash =
1232 fixture.write_blob(i.to_string().as_bytes(), CompressionMode::Never).await;
1233 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1234 recorder.record_open(blob.clone()).unwrap();
1235 hashes.push(hash);
1236 recorder.record(blob.clone(), 0).unwrap();
1237 }
1238 };
1239 let device = fixture.close().await;
1240 device.ensure_unique();
1241 state.wait_for_recording_to_finish().await.unwrap();
1242
1243 device.reopen(false);
1244 let fixture = open_blob_fixture(device).await;
1245 {
1246 for hash in &hashes {
1249 let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1250 assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1251 }
1252
1253 let volume = fixture.volume().volume();
1254 state.replay_profile(
1255 Box::new(get_test_profile_handle(volume).await),
1256 volume.clone(),
1257 volume.scope().try_active_guard().unwrap(),
1258 );
1259
1260 for hash in &hashes {
1262 let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1263 while blob.vmo().info().unwrap().committed_bytes == 0 {
1264 fasync::Timer::new(Duration::from_millis(25)).await;
1265 }
1266 }
1267 }
1268 fixture.close().await;
1269 }
1270
1271 #[fuchsia::test]
1272 async fn test_replay_profile_file() {
1273 let mut state = new_profile_state(false);
1275
1276 let mut ids = Vec::new();
1277
1278 let fixture = TestFixture::new().await;
1279 {
1280 assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1281 let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1282
1283 let volume = fixture.volume().volume();
1284 let handle = FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1285 let mut recorder = state.record_new(volume, Box::new(handle));
1286 for i in 0..message_count {
1288 let id = write_file(&fixture, &i.to_string(), &[88u8]).await;
1289 let node = fixture
1290 .volume()
1291 .volume()
1292 .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1293 .await
1294 .unwrap();
1295 recorder.record_open(node.clone()).unwrap();
1296 ids.push(id);
1297 recorder.record(node.clone(), 0).unwrap();
1298 }
1299 };
1300 let device = fixture.close().await;
1301 device.ensure_unique();
1302 state.wait_for_recording_to_finish().await.unwrap();
1303
1304 device.reopen(false);
1305 let fixture = TestFixture::open(
1306 device,
1307 TestFixtureOptions { encrypted: true, format: false, ..Default::default() },
1308 )
1309 .await;
1310 {
1311 for id in &ids {
1313 let file = fixture
1314 .volume()
1315 .volume()
1316 .get_or_load_node(
1317 *id,
1318 ObjectDescriptor::File,
1319 Some(fixture.volume().root_dir()),
1320 )
1321 .await
1322 .unwrap()
1323 .into_any()
1324 .downcast::<FxFile>()
1325 .unwrap();
1326 assert_eq!(file.vmo().info().unwrap().committed_bytes, 0);
1327 }
1328
1329 let volume = fixture.volume().volume();
1330 state.replay_profile(
1331 Box::new(get_test_profile_handle(volume).await),
1332 volume.clone(),
1333 volume.scope().try_active_guard().unwrap(),
1334 );
1335
1336 for id in &ids {
1338 let file = fixture
1339 .volume()
1340 .volume()
1341 .get_or_load_node(
1342 *id,
1343 ObjectDescriptor::File,
1344 Some(fixture.volume().root_dir()),
1345 )
1346 .await
1347 .unwrap()
1348 .into_any()
1349 .downcast::<FxFile>()
1350 .unwrap();
1351 while file.vmo().info().unwrap().committed_bytes == 0 {
1352 fasync::Timer::new(Duration::from_millis(25)).await;
1353 }
1354 }
1355 state.wait_for_recording_to_finish().await.unwrap();
1356 }
1357 fixture.close().await;
1358 }
1359
1360 #[fuchsia::test]
1361 async fn test_recording_during_replay() {
1362 let mut state = new_profile_state(true);
1363
1364 let hash;
1365 let first_recording;
1366 let fixture = new_blob_fixture().await;
1367 let volume = fixture.volume().volume();
1368
1369 {
1371 let handle = FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1372 let mut recorder = state.record_new(volume, Box::new(handle));
1373 hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1374 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1375 recorder.record_open(blob.clone()).unwrap();
1376 recorder.record(blob.clone(), 0).unwrap();
1377 }
1378
1379 state.wait_for_recording_to_finish().await.unwrap();
1380 first_recording = get_test_profile_contents(volume).await;
1381 assert_ne!(first_recording.len(), 0);
1382 let device = fixture.close().await;
1383 device.ensure_unique();
1384
1385 device.reopen(false);
1386 let fixture = open_blob_fixture(device).await;
1387
1388 {
1389 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1392 assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1393
1394 let volume = fixture.volume().volume();
1396 let handle = FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1397 let mut recorder = state.record_new(volume, Box::new(handle));
1398 recorder.record(blob.clone(), 4096).unwrap();
1399
1400 let volume = fixture.volume().volume();
1402 state.replay_profile(
1403 Box::new(get_test_profile_handle(volume).await),
1404 volume.clone(),
1405 volume.scope().try_active_guard().unwrap(),
1406 );
1407
1408 {
1410 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1411 while blob.vmo().info().unwrap().committed_bytes == 0 {
1412 fasync::Timer::new(Duration::from_millis(25)).await;
1413 }
1414 }
1415
1416 recorder.record_open(blob.clone()).unwrap();
1419 }
1420
1421 state.wait_for_recording_to_finish().await.unwrap();
1422
1423 let volume = fixture.volume().volume();
1424 let second_recording = get_test_profile_contents(volume).await;
1425 assert_ne!(second_recording.len(), 0);
1426 assert_ne!(&second_recording, &first_recording);
1427
1428 fixture.close().await;
1429 }
1430
1431 #[fuchsia::test]
1434 async fn test_replay_profile_stop_reading_early() {
1435 let mut state = new_profile_state(true);
1436 let fixture = new_blob_fixture().await;
1437
1438 {
1439 let volume = fixture.volume().volume();
1440
1441 let message;
1443 {
1444 let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1445 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1446 message = BlobMessage { id: blob.root(), offset: 0 };
1447 }
1448 state.wait_for_recording_to_finish().await.unwrap();
1449
1450 let replay_handle = Box::new(FakeReaderWriter::new());
1452 let mut buff = vec![0u8; IO_SIZE * 2];
1453 message.encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1454 message.encode_to_impl(
1455 (&mut buff[IO_SIZE..IO_SIZE + size_of::<BlobMessage>()]).try_into().unwrap(),
1456 );
1457
1458 replay_handle.inner.lock().data = buff;
1459 let delay1 = Event::new();
1460 replay_handle.push_delay(delay1.listen());
1461 let delay2 = Event::new();
1462 replay_handle.push_delay(delay2.listen());
1463
1464 state.replay_profile(
1465 replay_handle,
1466 volume.clone(),
1467 volume.scope().try_active_guard().unwrap(),
1468 );
1469
1470 fasync::Task::spawn(async move {
1472 fasync::Timer::new(Duration::from_millis(100)).await;
1474 delay1.notify(usize::MAX);
1475 })
1476 .detach();
1477 }
1478
1479 fixture.close().await;
1482 }
1483
1484 #[fuchsia::test]
1485 async fn test_replay_blob_missing() {
1486 let fixture = new_blob_fixture().await;
1487 let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1490 let mut buff = vec![0u8; IO_SIZE];
1491 {
1492 {
1495 let message = BlobMessage { id: [42u8; 32].into(), offset: 0 };
1496 message
1497 .encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1498 }
1499
1500 {
1502 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1503 let message = BlobMessage { id: blob.root(), offset: 0 };
1504 message.encode_to_impl(
1505 (&mut buff[size_of::<BlobMessage>()..(size_of::<BlobMessage>() * 2)])
1506 .try_into()
1507 .unwrap(),
1508 );
1509 }
1510 }
1511 let device = fixture.close().await;
1512 device.ensure_unique();
1513
1514 device.reopen(false);
1515 let fixture = open_blob_fixture(device).await;
1516 {
1517 let mut state = new_profile_state(true);
1518 let volume = fixture.volume().volume();
1519
1520 let replay_handle = Box::new(FakeReaderWriter::new());
1521 replay_handle.inner.lock().data = buff;
1522
1523 state.replay_profile(
1524 replay_handle,
1525 volume.clone(),
1526 volume.scope().try_active_guard().unwrap(),
1527 );
1528
1529 let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1531 while blob.vmo().info().unwrap().committed_bytes == 0 {
1532 fasync::Timer::new(Duration::from_millis(25)).await;
1533 }
1534 }
1535 fixture.close().await;
1536 }
1537
1538 #[fuchsia::test]
1539 async fn test_replay_file_missing_or_tombstoned() {
1540 let fixture = TestFixture::new().await;
1541 let mut buff = vec![0u8; IO_SIZE];
1542 let remaining_file_id;
1545 let tombstoned_file_id;
1546 {
1548 let id = write_file(&fixture, "foo", &[1, 2, 3, 4]).await;
1549 let message = FileMessage { id, offset: 0 };
1550 message.encode_to_impl((&mut buff[0..size_of::<FileMessage>()]).try_into().unwrap());
1551 }
1552 fixture
1554 .root()
1555 .unlink("foo", &fio::UnlinkOptions::default())
1556 .await
1557 .unwrap()
1558 .expect("Unlinking");
1559
1560 {
1562 tombstoned_file_id = write_file(&fixture, "bar", &[1, 2, 3, 4]).await;
1563 let message = FileMessage { id: tombstoned_file_id, offset: 0 };
1564 message.encode_to_impl(
1565 (&mut buff[size_of::<FileMessage>()..(size_of::<FileMessage>() * 2)])
1566 .try_into()
1567 .unwrap(),
1568 );
1569 }
1570
1571 {
1573 remaining_file_id = write_file(&fixture, "baz", &[1, 2, 3, 4]).await;
1574 let message = FileMessage { id: remaining_file_id, offset: 0 };
1575 message.encode_to_impl(
1576 (&mut buff[(size_of::<FileMessage>() * 2)..(size_of::<FileMessage>() * 3)])
1577 .try_into()
1578 .unwrap(),
1579 );
1580 }
1581 let device = fixture.close().await;
1582 device.ensure_unique();
1583
1584 device.reopen(false);
1585 let fixture =
1586 TestFixture::open(device, TestFixtureOptions { format: false, ..Default::default() })
1587 .await;
1588 {
1589 let tombstoned_file = fixture
1592 .volume()
1593 .volume()
1594 .get_or_load_node(tombstoned_file_id, ObjectDescriptor::File, None)
1595 .await
1596 .expect("Opening file object")
1597 .into_any()
1598 .downcast::<FxFile>()
1599 .unwrap();
1600 fixture
1601 .root()
1602 .unlink("bar", &fio::UnlinkOptions::default())
1603 .await
1604 .unwrap()
1605 .expect("Unlinking");
1606
1607 let mut state = new_profile_state(false);
1608 let volume = fixture.volume().volume();
1609
1610 let replay_handle = Box::new(FakeReaderWriter::new());
1611 replay_handle.inner.lock().data = buff;
1612
1613 state.replay_profile(
1614 replay_handle,
1615 volume.clone(),
1616 volume.scope().try_active_guard().unwrap(),
1617 );
1618
1619 let remaining_file = fixture
1621 .volume()
1622 .volume()
1623 .get_or_load_node(remaining_file_id, ObjectDescriptor::File, None)
1624 .await
1625 .expect("Opening file object")
1626 .into_any()
1627 .downcast::<FxFile>()
1628 .unwrap();
1629 while remaining_file.vmo().info().unwrap().committed_bytes == 0 {
1630 fasync::Timer::new(Duration::from_millis(25)).await;
1631 }
1632
1633 assert_eq!(tombstoned_file.vmo().info().unwrap().committed_bytes, 0);
1636 }
1637 fixture.close().await;
1638 }
1639}