Skip to main content

fxfs_platform/fuchsia/
profile.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::file::FxFile;
6use crate::fuchsia::fxblob::BlobDirectory;
7use crate::fuchsia::fxblob::blob::FxBlob;
8use crate::fuchsia::node::{FxNode, OpenedNode};
9use crate::fuchsia::pager::PagerBacked;
10use crate::fuchsia::volume::FxVolume;
11use anyhow::{Context as _, Error, anyhow, ensure};
12use arrayref::{array_refs, mut_array_refs};
13use async_trait::async_trait;
14use fuchsia_async as fasync;
15use fuchsia_hash::Hash;
16use futures::future::{self, BoxFuture, RemoteHandle, join_all};
17use futures::lock::Mutex;
18use futures::{FutureExt, select};
19use fxfs::errors::FxfsError;
20use fxfs::log::*;
21use fxfs::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle};
22use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
23use fxfs::object_store::{
24    HandleOptions, ObjectDescriptor, ObjectStore, Timestamp, VOLUME_DATA_KEY_ID, directory,
25};
26use linked_hash_map::LinkedHashMap;
27use scopeguard::ScopeGuard;
28use std::cmp::{Eq, PartialEq};
29use std::collections::btree_map::{BTreeMap, Entry};
30use std::marker::PhantomData;
31use std::mem::size_of;
32use std::pin::pin;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicU64, Ordering};
35use vfs::execution_scope::ActiveGuard;
36
37const FILE_OPEN_MARKER: u64 = u64::MAX;
38const REPLAY_THREADS: usize = 2;
39// The number of messages to buffer before sending to record. They are chunked up to reduce the
40// number of allocations in the serving threads.
41const MESSAGE_CHUNK_SIZE: usize = 64;
42const IO_SIZE: usize = 1 << 17; // 128KiB. Needs to be a power of 2 and >= block size.
43
44pub static RECORDED: AtomicU64 = AtomicU64::new(0);
45
46trait RecordedVolume: Send + Sync + Sized + Unpin {
47    type IdType: std::fmt::Display + Ord + Send + Sized;
48    type NodeType: PagerBacked;
49    type MessageType: Message<IdType = Self::IdType>;
50
51    fn new(volume: Arc<FxVolume>) -> Self;
52
53    fn volume(&self) -> &Arc<FxVolume>;
54
55    fn open(
56        &self,
57        id: Self::IdType,
58    ) -> impl std::future::Future<Output = Result<OpenedNode<Self::NodeType>, Error>> + Send;
59
60    /// Filters out open markers for files that may not be usable in the profile.
61    fn file_is_replayable(
62        &self,
63        id: &Self::IdType,
64    ) -> impl std::future::Future<Output = bool> + Send;
65
66    fn read_and_queue(
67        &self,
68        handle: Box<dyn ReadObjectHandle>,
69        sender: &async_channel::Sender<Request<Self::NodeType>>,
70        local_cache: &mut BTreeMap<Self::IdType, Option<OpenedNode<Self::NodeType>>>,
71    ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
72        async move {
73            let mut io_buf = handle.allocate_buffer(IO_SIZE).await;
74            let block_size = handle.block_size() as usize;
75            let file_size = handle.get_size() as usize;
76            let mut offset = 0;
77            while offset < file_size {
78                let actual = handle
79                    .read(offset as u64, io_buf.as_mut())
80                    .await
81                    .map_err(|e| e.context(format!("Failed to read at offset: {}", offset)))?;
82                offset += actual;
83                let mut local_offset = 0;
84                let mut next_block = block_size;
85                let mut next_offset = size_of::<Self::MessageType>();
86                while next_offset <= actual {
87                    let msg = Self::MessageType::decode_from(
88                        &io_buf.as_slice()[local_offset..next_offset],
89                    );
90
91                    local_offset = next_offset;
92                    next_offset = local_offset + size_of::<Self::MessageType>();
93                    // Messages don't overlap block boundaries.
94                    if next_offset > next_block {
95                        local_offset = next_block;
96                        next_offset = local_offset + size_of::<Self::MessageType>();
97                        next_block += block_size;
98                    }
99
100                    // Ignore trailing zeroes. This is technically a valid entry but extremely
101                    // unlikely and will only break an optimization.
102                    if msg.is_zeroes() {
103                        break;
104                    }
105
106                    let file = match local_cache.entry(msg.id()) {
107                        Entry::Occupied(entry) => match entry.get() {
108                            Some(opened_file) => (*opened_file).clone(),
109                            // Found a cached error.
110                            None => continue,
111                        },
112                        Entry::Vacant(entry) => match self.open(msg.id()).await {
113                            Err(e) => {
114                                debug!("Failed to open object {} from profile: {:?}", msg.id(), e);
115                                // Cache the error.
116                                entry.insert(None);
117                                continue;
118                            }
119                            Ok(opened_file) => {
120                                let file_clone = (*opened_file).clone();
121                                entry.insert(Some(opened_file));
122                                file_clone
123                            }
124                        },
125                    };
126
127                    sender.send(Request { file, offset: msg.offset() }).await?;
128                }
129            }
130            Ok(())
131        }
132    }
133
134    fn record(
135        &self,
136        name: &str,
137        receiver: async_channel::Receiver<Vec<Self::MessageType>>,
138    ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
139        async move {
140            let mut recorded_offsets = LinkedHashMap::<Self::MessageType, ()>::new();
141            let mut recorded_opens = BTreeMap::<Self::IdType, bool>::new();
142            while let Ok(buffer) = receiver.recv().await {
143                for message in buffer {
144                    if message.is_open_marker() {
145                        if let Entry::Vacant(entry) = recorded_opens.entry(message.id()) {
146                            let usable = self.file_is_replayable(entry.key()).await;
147                            entry.insert(usable);
148                        }
149                    } else {
150                        recorded_offsets.insert(message, ());
151                    }
152                }
153            }
154
155            // Start a recording handle. Put it in the graveyard in case we can't properly complete
156            // it.
157            let store = self.volume().store();
158            let fs = store.filesystem();
159            let mut transaction =
160                fs.clone().new_transaction(lock_keys![], Options::default()).await?;
161            let handle = ObjectStore::create_object(
162                self.volume(),
163                &mut transaction,
164                HandleOptions::default(),
165                None,
166            )
167            .await?;
168            store.add_to_graveyard(&mut transaction, handle.object_id());
169            transaction.commit().await?;
170
171            let clean_up = scopeguard::guard((), |_| {
172                fs.graveyard().queue_tombstone_object(store.store_object_id(), handle.object_id())
173            });
174
175            let block_size = handle.block_size() as usize;
176            let mut offset = 0;
177            let mut io_buf = handle.allocate_buffer(IO_SIZE).await;
178            let mut next_block = block_size;
179            while let Some((message, _)) = recorded_offsets.pop_front() {
180                // If a file opening was never recorded, or it is not usable drop the message.
181                if !recorded_opens.get(&message.id()).copied().unwrap_or(false) {
182                    continue;
183                }
184
185                let mut next_offset = offset + size_of::<Self::MessageType>();
186                if next_offset > next_block {
187                    // Zero the remainder of the block. Stopping on block boundaries allows us to
188                    // resize the I/O without supporting reading/writing half messages to a buffer.
189                    io_buf.as_mut_slice()[offset..next_block].fill(0);
190                    if next_block >= IO_SIZE {
191                        // The buffer is full.  Write it out.
192                        handle
193                            .write_or_append(None, io_buf.as_ref())
194                            .await
195                            .context("Failed to write profile block")?;
196                        offset = 0;
197                        next_offset = size_of::<Self::MessageType>();
198                        next_block = block_size;
199                    } else {
200                        offset = next_block;
201                        next_offset = offset + size_of::<Self::MessageType>();
202                        next_block += block_size;
203                    }
204                }
205                message.encode_to(&mut io_buf.as_mut_slice()[offset..next_offset]);
206                offset = next_offset;
207            }
208            if offset > 0 {
209                io_buf.as_mut_slice()[offset..next_block].fill(0);
210                handle
211                    .write_or_append(None, io_buf.subslice(0..next_block))
212                    .await
213                    .context("Failed to write profile block")?;
214            }
215
216            let profile_dir = self.volume().get_profile_directory().await?;
217
218            let mut lock_keys =
219                lock_keys![LockKey::object(store.store_object_id(), profile_dir.object_id())];
220            let mut old_id = INVALID_OBJECT_ID;
221            let mut transaction = loop {
222                let transaction = fs.clone().new_transaction(lock_keys, Options::default()).await?;
223                if let Some((id, descriptor, _)) = profile_dir.lookup(name).await? {
224                    ensure!(matches!(descriptor, ObjectDescriptor::File), FxfsError::Inconsistent);
225                    if id == old_id {
226                        break transaction;
227                    }
228                    lock_keys = lock_keys![
229                        LockKey::object(store.store_object_id(), profile_dir.object_id()),
230                        LockKey::object(store.store_object_id(), id)
231                    ];
232                    old_id = id;
233                } else {
234                    old_id = INVALID_OBJECT_ID;
235                    break transaction;
236                }
237            };
238
239            store.remove_from_graveyard(&mut transaction, handle.object_id());
240            directory::replace_child_with_object(
241                &mut transaction,
242                Some((handle.object_id(), ObjectDescriptor::File)),
243                (&profile_dir, name),
244                0,
245                Timestamp::now(),
246            )
247            .await?;
248            transaction.commit().await?;
249
250            ScopeGuard::into_inner(clean_up);
251            if old_id != INVALID_OBJECT_ID {
252                fs.graveyard().queue_tombstone_object(store.store_object_id(), old_id);
253            }
254
255            Ok(())
256        }
257    }
258}
259
260struct BlobVolume {
261    volume: Arc<FxVolume>,
262    // Cache the open blob directory here. The Mutex is just to make this Send, but it is not
263    // actually used concurrently.
264    root_dir: Mutex<Option<Arc<BlobDirectory>>>,
265}
266
267impl RecordedVolume for BlobVolume {
268    type IdType = Hash;
269    type NodeType = FxBlob;
270    type MessageType = BlobMessage;
271
272    fn new(volume: Arc<FxVolume>) -> Self {
273        Self { volume, root_dir: Mutex::new(None) }
274    }
275
276    fn volume(&self) -> &Arc<FxVolume> {
277        &self.volume
278    }
279
280    async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
281        let mut root_dir = self.root_dir.lock().await;
282        if root_dir.is_none() {
283            *root_dir = Some(
284                self.volume
285                    .get_or_load_node(
286                        self.volume.store().root_directory_object_id(),
287                        ObjectDescriptor::Directory,
288                        None,
289                    )
290                    .await?
291                    .into_any()
292                    .downcast::<BlobDirectory>()
293                    .map_err(|_| FxfsError::Inconsistent)?,
294            );
295        };
296        root_dir
297            .as_ref()
298            .unwrap()
299            .open_blob(&id.into())
300            .await?
301            .ok_or_else(|| FxfsError::NotFound.into())
302    }
303
304    async fn file_is_replayable(&self, _id: &Self::IdType) -> bool {
305        // There is nothing is filter out in blob volumes.
306        true
307    }
308}
309
310struct FileVolume {
311    volume: Arc<FxVolume>,
312}
313
314impl RecordedVolume for FileVolume {
315    type IdType = u64;
316    type NodeType = FxFile;
317    type MessageType = FileMessage;
318
319    fn new(volume: Arc<FxVolume>) -> Self {
320        Self { volume }
321    }
322
323    fn volume(&self) -> &Arc<FxVolume> {
324        &self.volume
325    }
326
327    async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
328        self.volume
329            .get_or_load_node(id, ObjectDescriptor::File, None)
330            .await?
331            .into_any()
332            .downcast::<FxFile>()
333            .map_err(|_| anyhow!("Non-file opened"))?
334            .into_opened_node()
335            .ok_or_else(|| anyhow!("File being purged"))
336    }
337
338    async fn file_is_replayable(&self, id: &Self::IdType) -> bool {
339        match self.volume.store().get_keys(*id).await {
340            // If any keys are not the volume key id, then the file may not be readable later.
341            // If there's more than one, then at least one is not the volume key.
342            Ok(keys)
343                if keys.is_empty()
344                    || (keys.len() == 1 && keys.first().unwrap().0 == VOLUME_DATA_KEY_ID) =>
345            {
346                true
347            }
348            _ => false,
349        }
350    }
351}
352
353trait Message: Eq + PartialEq + Sized + Send + Sync + std::hash::Hash + 'static {
354    type IdType: std::fmt::Display + Ord + Send + Sized;
355
356    fn id(&self) -> Self::IdType;
357    fn offset(&self) -> u64;
358    fn encode_to(&self, dest: &mut [u8]);
359    fn decode_from(src: &[u8]) -> Self;
360    fn is_zeroes(&self) -> bool;
361    fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error>;
362    fn is_open_marker(&self) -> bool;
363}
364
365#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
366struct BlobMessage {
367    id: Hash,
368    // Don't bother with offset+length. The kernel is going split up and align it one way and then
369    // we're going to change it all with read-ahead/read-around.
370    offset: u64,
371}
372
373impl BlobMessage {
374    fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
375        let (first, second) = mut_array_refs![dest, size_of::<Hash>(), size_of::<u64>()];
376        *first = self.id.into();
377        *second = self.offset.to_le_bytes();
378    }
379
380    fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
381        let (first, second) = array_refs!(src, size_of::<Hash>(), size_of::<u64>());
382        Self { id: Hash::from_array(*first), offset: u64::from_le_bytes(*second) }
383    }
384}
385
386impl Message for BlobMessage {
387    type IdType = Hash;
388
389    fn id(&self) -> Self::IdType {
390        self.id
391    }
392
393    fn offset(&self) -> u64 {
394        self.offset
395    }
396
397    fn encode_to(&self, dest: &mut [u8]) {
398        self.encode_to_impl(dest.try_into().unwrap());
399    }
400
401    fn decode_from(src: &[u8]) -> Self {
402        Self::decode_from_impl(src.try_into().unwrap())
403    }
404
405    fn is_zeroes(&self) -> bool {
406        self.id == Hash::from_array([0u8; size_of::<Hash>()]) && self.offset == 0
407    }
408
409    fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
410        match node.into_any().downcast::<FxBlob>() {
411            Ok(blob) => Ok(Self { id: blob.root(), offset }),
412            Err(_) => Err(anyhow!("Cannot record non-blob entry.")),
413        }
414    }
415
416    fn is_open_marker(&self) -> bool {
417        self.offset == FILE_OPEN_MARKER
418    }
419}
420
421#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
422struct FileMessage {
423    id: u64,
424    // Don't bother with offset+length. The kernel is going split up and align it one way and then
425    // we're going to change it all with read-ahead/read-around.
426    offset: u64,
427}
428
429impl FileMessage {
430    fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
431        let (first, second) = mut_array_refs![dest, size_of::<u64>(), size_of::<u64>()];
432        *first = self.id.to_le_bytes();
433        *second = self.offset.to_le_bytes();
434    }
435
436    fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
437        let (first, second) = array_refs!(src, size_of::<u64>(), size_of::<u64>());
438        Self { id: u64::from_le_bytes(*first), offset: u64::from_le_bytes(*second) }
439    }
440}
441
442impl Message for FileMessage {
443    type IdType = u64;
444
445    fn id(&self) -> Self::IdType {
446        self.id
447    }
448
449    fn offset(&self) -> u64 {
450        self.offset
451    }
452
453    fn encode_to(&self, dest: &mut [u8]) {
454        self.encode_to_impl(dest.try_into().unwrap())
455    }
456
457    fn decode_from(src: &[u8]) -> Self {
458        Self::decode_from_impl(src.try_into().unwrap())
459    }
460
461    fn is_zeroes(&self) -> bool {
462        self.id == 0 && self.offset == 0
463    }
464
465    fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
466        match node.into_any().downcast::<FxFile>() {
467            Ok(file) => Ok(Self { id: file.object_id(), offset }),
468            Err(_) => Err(anyhow!("Cannot record non-file entry")),
469        }
470    }
471
472    fn is_open_marker(&self) -> bool {
473        self.offset == FILE_OPEN_MARKER
474    }
475}
476
477/// Takes messages to be written into the current profile. This should be dropped before the
478/// recording is stopped to ensure that all messages have been flushed to the writer thread.
479pub trait Recorder: Send + Sync {
480    /// Record a page in request, for the given identifier and offset.
481    fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error>;
482
483    /// Record file opens to gather what files were actually used during the recording.
484    fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error>;
485}
486
487struct RecorderImpl<T: Message> {
488    sender: async_channel::Sender<Vec<T>>,
489    buffer: Vec<T>,
490}
491
492impl<T: Message> RecorderImpl<T> {
493    fn new(sender: async_channel::Sender<Vec<T>>) -> Self {
494        Self { sender, buffer: Vec::with_capacity(MESSAGE_CHUNK_SIZE) }
495    }
496}
497
498impl<T: Message> Recorder for RecorderImpl<T> {
499    fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error> {
500        self.buffer.push(T::from_node_request(node, offset)?);
501        if self.buffer.len() >= MESSAGE_CHUNK_SIZE {
502            // try_send to avoid async await, we use an unbounded channel anyways so any failure
503            // here should only be if the channel is closed, which is permanent anyways.
504            self.sender.try_send(std::mem::replace(
505                &mut self.buffer,
506                Vec::with_capacity(MESSAGE_CHUNK_SIZE),
507            ))?;
508        }
509        RECORDED.fetch_add(1, Ordering::Relaxed);
510        Ok(())
511    }
512
513    fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error> {
514        self.record(node, FILE_OPEN_MARKER)
515    }
516}
517
518impl<T: Message> Drop for RecorderImpl<T> {
519    fn drop(&mut self) {
520        // Best effort sending what messages have already been queued.
521        if self.buffer.len() > 0 {
522            let buffer = std::mem::take(&mut self.buffer);
523            let _ = self.sender.try_send(buffer);
524        }
525    }
526}
527
528struct Request<P: PagerBacked> {
529    file: Arc<P>,
530    offset: u64,
531}
532
533struct ReplayState<T> {
534    replay_threads: future::Shared<BoxFuture<'static, ()>>,
535    _cache_task: fasync::Task<()>,
536    _phantom: PhantomData<T>,
537}
538
539impl<T: RecordedVolume> ReplayState<T> {
540    fn new(handle: Box<dyn ReadObjectHandle>, volume: Arc<FxVolume>, guard: ActiveGuard) -> Self {
541        let (sender, receiver) = async_channel::unbounded::<Request<T::NodeType>>();
542
543        // Create async_channel. An async thread reads and populates the channel, then N threads
544        // consume it and touch pages.
545        let mut replay_threads = Vec::with_capacity(REPLAY_THREADS);
546        for _ in 0..REPLAY_THREADS {
547            let receiver = receiver.clone();
548            // The replay threads can have references to files so we make sure they have a guard
549            // so that shutdown will wait till they have been joined.
550            let guard = guard.clone();
551            replay_threads.push(fasync::unblock(move || {
552                let _guard = guard;
553                Self::page_in_thread(receiver);
554            }));
555        }
556        let replay_threads = (Box::pin(async {
557            join_all(replay_threads).await;
558        }) as BoxFuture<'static, ()>)
559            .shared();
560
561        let scope = volume.scope().clone();
562        let cache_task = scope
563            .spawn({
564                // The replay threads hold active guards, so we must watch for cancellation.  When
565                // cancelled, we'll drop the sender which will cause the replay threads to drop
566                // their guards, which will allow shutdown to proceed.
567                async move {
568                    let mut task = pin!(
569                        async {
570                            // Hold the items in cache until replay is stopped. Optional as None
571                            // indicates that the file could not be opened, and we want to cache that
572                            // failure.
573                            let mut local_cache: BTreeMap<
574                                T::IdType,
575                                Option<OpenedNode<T::NodeType>>,
576                            > = BTreeMap::new();
577
578                            let volume_id = volume.id();
579
580                            if let Err(error) = T::new(volume)
581                                .read_and_queue(handle, &sender, &mut local_cache)
582                                .await
583                            {
584                                error!(error:?; "Failed to read back profile");
585                            }
586                            sender.close();
587
588                            info!(
589                                "Replay for volume {} opened {} of {} objects.",
590                                volume_id,
591                                local_cache.iter().filter(|(_, e)| e.is_some()).count(),
592                                local_cache.len()
593                            );
594
595                            // Keep the cache alive until dropped.
596                            let () = std::future::pending().await;
597                        }
598                        .fuse()
599                    );
600
601                    select! {
602                        _ = task => {}
603                        _ = guard.on_cancel().fuse() => {}
604                    }
605                }
606            })
607            .into();
608
609        Self { replay_threads, _cache_task: cache_task, _phantom: PhantomData }
610    }
611
612    fn page_in_thread(queue: async_channel::Receiver<Request<T::NodeType>>) {
613        while let Ok(request) = queue.recv_blocking() {
614            let res = request.file.vmo().op_range(
615                zx::VmoOp::PREFETCH,
616                request.offset,
617                zx::system_get_page_size() as u64,
618            );
619            if let Err(e) = res {
620                warn!("Failed to prefetch page: {:?}", e);
621            }
622            // If the volume is shutdown, the sender will be dropped.
623            if queue.sender_count() == 0 {
624                return;
625            }
626        }
627    }
628}
629
630/// Holds the current profile recording and/or replay state, and provides methods for state
631/// transitions.
632#[async_trait]
633pub trait ProfileState: Send + Sync {
634    /// Creates a new recording and returns the `Recorder` object to record to. The recording
635    /// finalizes when the associated `Recorder` is dropped.  Stops any recording currently in
636    /// progress.
637    fn record_new(&mut self, volume: &Arc<FxVolume>, name: &str) -> Box<dyn Recorder>;
638
639    /// Reads given handle to parse a profile and replay it by requesting pages via
640    /// ZX_VMO_OP_PREFETCH in blocking background threads. Stops any replay currently in progress.
641    fn replay_profile(
642        &mut self,
643        handle: Box<dyn ReadObjectHandle>,
644        volume: Arc<FxVolume>,
645        guard: ActiveGuard,
646    );
647
648    /// Waits for replay to finish, but does not drop the cache.  The cache will be dropped when
649    /// the ProfileState impl is dropped.  This is fine to call multiple times.
650    async fn wait_for_replay_to_finish(&mut self);
651
652    /// Waits for the recording to finish.
653    async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error>;
654}
655
656pub fn new_profile_state(is_blob: bool) -> Box<dyn ProfileState> {
657    if is_blob {
658        Box::new(ProfileStateImpl::<BlobVolume>::new())
659    } else {
660        Box::new(ProfileStateImpl::<FileVolume>::new())
661    }
662}
663
664struct ProfileStateImpl<T> {
665    recording: Option<RemoteHandle<Result<(), Error>>>,
666    replay: Option<ReplayState<T>>,
667}
668
669impl<T> ProfileStateImpl<T> {
670    fn new() -> Self {
671        Self { recording: None, replay: None }
672    }
673}
674
675#[async_trait]
676impl<T: RecordedVolume> ProfileState for ProfileStateImpl<T> {
677    fn record_new(&mut self, volume: &Arc<FxVolume>, name: &str) -> Box<dyn Recorder> {
678        let (sender, receiver) = async_channel::unbounded();
679        let volume = volume.clone();
680        let name = name.to_string();
681        // Cancel the previous recording (if any).
682        self.recording = None;
683        let scope = volume.scope().clone();
684        let (task, remote_handle) = async move {
685            let recording = T::new(volume);
686            recording
687                .record(&name, receiver)
688                .await
689                .inspect_err(|error| warn!(error:?; "Profile recording '{name}' failed"))
690        }
691        .remote_handle();
692        self.recording = Some(remote_handle);
693        scope.spawn(task);
694        Box::new(RecorderImpl::new(sender))
695    }
696
697    fn replay_profile(
698        &mut self,
699        handle: Box<dyn ReadObjectHandle>,
700        volume: Arc<FxVolume>,
701        guard: ActiveGuard,
702    ) {
703        self.replay = Some(ReplayState::new(handle, volume, guard));
704    }
705
706    async fn wait_for_replay_to_finish(&mut self) {
707        if let Some(replay) = &mut self.replay {
708            replay.replay_threads.clone().await;
709        }
710    }
711
712    async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error> {
713        if let Some(recording) = self.recording.take() { recording.await } else { Ok(()) }
714    }
715}
716
717#[cfg(test)]
718mod tests {
719    use super::{
720        BlobMessage, BlobVolume, FileMessage, FileVolume, IO_SIZE, Message, RecordedVolume,
721        Request, new_profile_state,
722    };
723    use crate::fuchsia::file::FxFile;
724    use crate::fuchsia::fxblob::blob::FxBlob;
725    use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture, open_blob_fixture};
726    use crate::fuchsia::node::{FxNode, OpenedNode};
727    use crate::fuchsia::pager::PagerBacked;
728    use crate::fuchsia::testing::{TestFixture, TestFixtureOptions, open_file_checked};
729    use crate::fuchsia::volume::FxVolume;
730    use anyhow::Error;
731    use async_trait::async_trait;
732    use delivery_blob::CompressionMode;
733    use event_listener::{Event, EventListener};
734    use fuchsia_hash::Hash;
735    use fuchsia_sync::Mutex;
736    use fxfs::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
737    use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
738    use fxfs::object_store::{DataObjectHandle, HandleOptions, ObjectDescriptor, ObjectStore};
739    use std::collections::BTreeMap;
740    use std::mem::size_of;
741    use std::sync::Arc;
742    use std::time::Duration;
743    use storage_device::buffer::{BufferRef, MutableBufferRef};
744    use storage_device::buffer_allocator::{BufferAllocator, BufferFuture, BufferSource};
745    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
746
747    struct FakeReaderWriterInner {
748        data: Vec<u8>,
749        delays: Vec<EventListener>,
750    }
751
752    struct FakeReaderWriter {
753        allocator: BufferAllocator,
754        inner: Arc<Mutex<FakeReaderWriterInner>>,
755    }
756
757    const BLOCK_SIZE: usize = 4096;
758
759    impl FakeReaderWriter {
760        fn new() -> Self {
761            Self {
762                allocator: BufferAllocator::new(BLOCK_SIZE, BufferSource::new(IO_SIZE * 2)),
763                inner: Arc::new(Mutex::new(FakeReaderWriterInner {
764                    data: Vec::new(),
765                    delays: Vec::new(),
766                })),
767            }
768        }
769
770        fn push_delay(&self, delay: EventListener) {
771            self.inner.lock().delays.insert(0, delay);
772        }
773    }
774
775    impl ObjectHandle for FakeReaderWriter {
776        fn object_id(&self) -> u64 {
777            0
778        }
779
780        fn block_size(&self) -> u64 {
781            self.allocator.block_size() as u64
782        }
783
784        fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
785            self.allocator.allocate_buffer(size)
786        }
787    }
788
789    impl WriteObjectHandle for FakeReaderWriter {
790        async fn write_or_append(
791            &self,
792            offset: Option<u64>,
793            buf: BufferRef<'_>,
794        ) -> Result<u64, Error> {
795            // We only append for now.
796            assert!(offset.is_none());
797            let delay = self.inner.lock().delays.pop();
798            if let Some(delay) = delay {
799                delay.await;
800            }
801            // This relocking has a TOCTOU flavour, but it shouldn't matter for this application.
802            self.inner.lock().data.extend_from_slice(buf.as_slice());
803            Ok(buf.len() as u64)
804        }
805
806        async fn truncate(&self, _size: u64) -> Result<(), Error> {
807            unreachable!();
808        }
809
810        async fn flush(&self) -> Result<(), Error> {
811            unreachable!();
812        }
813    }
814
815    async fn write_file(fixture: &TestFixture, name: &str, data: &[u8]) -> u64 {
816        let root_dir = fixture.volume().root_dir();
817        let mut transaction = fixture
818            .volume()
819            .volume()
820            .store()
821            .filesystem()
822            .new_transaction(
823                lock_keys![LockKey::object(
824                    fixture.volume().volume().store().store_object_id(),
825                    root_dir.object_id()
826                )],
827                Options::default(),
828            )
829            .await
830            .expect("Creating transaction for new file");
831        let id = root_dir
832            .directory()
833            .create_child_file(&mut transaction, name)
834            .await
835            .expect("Creating new_file")
836            .object_id();
837        transaction.commit().await.unwrap();
838        let file = open_file_checked(
839            fixture.root(),
840            name,
841            fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_FILE,
842            &Default::default(),
843        )
844        .await;
845        file.write(data).await.unwrap().expect("Writing file");
846        id
847    }
848
849    #[async_trait]
850    impl ReadObjectHandle for FakeReaderWriter {
851        async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
852            let delay = self.inner.lock().delays.pop();
853            if let Some(delay) = delay {
854                delay.await;
855            }
856            // This relocking has a TOCTOU flavour, but it shouldn't matter for this application.
857            let inner = self.inner.lock();
858            assert!(offset as usize <= inner.data.len());
859            let offset_end = std::cmp::min(offset as usize + buf.len(), inner.data.len());
860            let size = offset_end - offset as usize;
861            buf.as_mut_slice()[..size].clone_from_slice(&inner.data[offset as usize..offset_end]);
862            Ok(size)
863        }
864
865        fn get_size(&self) -> u64 {
866            self.inner.lock().data.len() as u64
867        }
868    }
869
870    #[fuchsia::test]
871    async fn test_encode_decode_blob() {
872        let mut buf = [0u8; size_of::<BlobMessage>()];
873        let m = BlobMessage { id: [88u8; 32].into(), offset: 77 };
874        m.encode_to(&mut buf.as_mut_slice());
875        let m2 = BlobMessage::decode_from(&buf);
876        assert_eq!(m, m2);
877    }
878
879    #[fuchsia::test]
880    async fn test_encode_decode_file() {
881        let mut buf = [0u8; size_of::<FileMessage>()];
882        let m = FileMessage { id: 88, offset: 77 };
883        m.encode_to(&mut buf.as_mut_slice());
884        let m2 = FileMessage::decode_from(&buf);
885        assert!(!m2.is_zeroes());
886        assert_eq!(m, m2);
887    }
888
889    const TEST_PROFILE_NAME: &str = "test_profile";
890
891    async fn get_test_profile_handle(volume: &Arc<FxVolume>) -> DataObjectHandle<FxVolume> {
892        let profile_dir = volume.get_profile_directory().await.unwrap();
893        ObjectStore::open_object(
894            volume,
895            profile_dir
896                .lookup(TEST_PROFILE_NAME)
897                .await
898                .expect("lookup failed")
899                .expect("not found")
900                .0,
901            HandleOptions::default(),
902            None,
903        )
904        .await
905        .unwrap()
906    }
907
908    async fn get_test_profile_contents(volume: &Arc<FxVolume>) -> Vec<u8> {
909        get_test_profile_handle(volume).await.contents(1024 * 1024).await.unwrap().to_vec()
910    }
911
912    #[fuchsia::test]
913    async fn test_recording_basic_blob() {
914        let fixture = new_blob_fixture().await;
915        {
916            let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
917            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
918
919            let mut state = new_profile_state(true);
920            let volume = fixture.volume().volume();
921
922            {
923                // Drop recorder when finished writing to flush data.
924                let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
925                recorder.record(blob.clone(), 0).unwrap();
926                recorder.record_open(blob).unwrap();
927            }
928
929            state.wait_for_recording_to_finish().await.unwrap();
930
931            assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
932        }
933        fixture.close().await;
934    }
935
936    #[fuchsia::test]
937    async fn test_recording_basic_file() {
938        let fixture = TestFixture::new().await;
939        {
940            let id = write_file(&fixture, "foo", &[88u8]).await;
941            let node = fixture
942                .volume()
943                .volume()
944                .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
945                .await
946                .unwrap();
947
948            let mut state = new_profile_state(false);
949            let volume = fixture.volume().volume();
950
951            {
952                // Drop recorder when finished writing to flush data.
953                let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
954                recorder.record(node.clone(), 0).unwrap();
955                recorder.record_open(node).unwrap();
956            }
957            state.wait_for_recording_to_finish().await.unwrap();
958
959            assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
960        }
961        fixture.close().await;
962    }
963
964    #[fuchsia::test]
965    async fn test_recording_filtered_without_open() {
966        let fixture = new_blob_fixture().await;
967        {
968            let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
969            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
970
971            let mut state = new_profile_state(true);
972            let volume = fixture.volume().volume();
973
974            {
975                // Drop recorder when finished writing to flush data.
976                let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
977                recorder.record(blob.clone(), 0).unwrap();
978            }
979            state.wait_for_recording_to_finish().await.unwrap();
980
981            assert_eq!(get_test_profile_contents(volume).await.len(), 0);
982        }
983        fixture.close().await;
984    }
985
986    #[fuchsia::test]
987    async fn test_recording_blob_more_than_block() {
988        let mut state = new_profile_state(true);
989
990        let fixture = new_blob_fixture().await;
991        assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
992        let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
993        let hash;
994        let volume = fixture.volume().volume();
995
996        {
997            hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
998            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
999            // Drop recorder when finished writing to flush data.
1000            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1001            recorder.record_open(blob.clone()).unwrap();
1002            for i in 0..message_count {
1003                recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1004            }
1005        }
1006        state.wait_for_recording_to_finish().await.unwrap();
1007
1008        assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1009
1010        let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1011        let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1012
1013        let volume = fixture.volume().volume().clone();
1014        let task = fasync::Task::spawn(async move {
1015            let handle = Box::new(get_test_profile_handle(&volume).await);
1016            let blob = BlobVolume::new(volume);
1017            blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1018        });
1019
1020        let mut recv_count = 0;
1021        while let Ok(msg) = receiver.recv().await {
1022            assert_eq!(msg.file.root(), hash);
1023            assert_eq!(msg.offset, 4096 * recv_count);
1024            recv_count += 1;
1025        }
1026        task.await;
1027        assert_eq!(recv_count, message_count as u64);
1028
1029        fixture.close().await;
1030    }
1031
1032    #[fuchsia::test]
1033    async fn test_recording_file_more_than_block() {
1034        let mut state = new_profile_state(false);
1035
1036        let fixture = TestFixture::new().await;
1037        assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1038        let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1039        let id;
1040        let volume = fixture.volume().volume();
1041        {
1042            id = write_file(&fixture, "foo", &[88u8]).await;
1043            let node = volume
1044                .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1045                .await
1046                .unwrap();
1047            // Drop recorder when finished writing to flush data.
1048            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1049            recorder.record_open(node.clone()).unwrap();
1050            for i in 0..message_count {
1051                recorder.record(node.clone(), 4096 * i as u64).unwrap();
1052            }
1053        }
1054        state.wait_for_recording_to_finish().await.unwrap();
1055
1056        assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1057
1058        let mut local_cache: BTreeMap<u64, Option<OpenedNode<FxFile>>> = BTreeMap::new();
1059        let (sender, receiver) = async_channel::unbounded::<Request<FxFile>>();
1060
1061        let volume = fixture.volume().volume().clone();
1062        let task = fasync::Task::spawn(async move {
1063            let handle = Box::new(get_test_profile_handle(&volume).await);
1064            let file = FileVolume::new(volume);
1065            file.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1066        });
1067
1068        let mut recv_count = 0;
1069        while let Ok(msg) = receiver.recv().await {
1070            assert_eq!(msg.file.object_id(), id);
1071            assert_eq!(msg.offset, 4096 * recv_count);
1072            recv_count += 1;
1073        }
1074        task.await;
1075        assert_eq!(recv_count, message_count as u64);
1076
1077        fixture.close().await;
1078    }
1079
1080    #[fuchsia::test]
1081    async fn test_recording_more_than_io_size() {
1082        let fixture = new_blob_fixture().await;
1083
1084        {
1085            let mut state = new_profile_state(true);
1086            let message_count = (IO_SIZE as usize / size_of::<BlobMessage>()) + 1;
1087            let hash;
1088            let volume = fixture.volume().volume();
1089            {
1090                hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
1091                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1092                // Drop recorder when finished writing to flush data.
1093                let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1094                recorder.record_open(blob.clone()).unwrap();
1095                for i in 0..message_count {
1096                    recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1097                }
1098            }
1099            state.wait_for_recording_to_finish().await.unwrap();
1100            assert_eq!(get_test_profile_contents(volume).await.len(), IO_SIZE + BLOCK_SIZE);
1101
1102            let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1103            let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1104
1105            let volume = volume.clone();
1106            let task = fasync::Task::spawn(async move {
1107                let handle = Box::new(get_test_profile_handle(&volume).await);
1108                let blob = BlobVolume::new(volume);
1109                blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1110            });
1111
1112            let mut recv_count = 0;
1113            while let Ok(msg) = receiver.recv().await {
1114                assert_eq!(msg.file.root(), hash);
1115                assert_eq!(msg.offset, 4096 * recv_count);
1116                recv_count += 1;
1117            }
1118            task.await;
1119            assert_eq!(recv_count, message_count as u64);
1120        }
1121
1122        fixture.close().await;
1123    }
1124
1125    #[fuchsia::test]
1126    async fn test_replay_profile_blob() {
1127        // Create all the files that we need first, then restart the filesystem to clear cache.
1128        let mut state = new_profile_state(true);
1129
1130        let mut hashes = Vec::new();
1131
1132        let fixture = new_blob_fixture().await;
1133        {
1134            assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1135            let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
1136
1137            let volume = fixture.volume().volume();
1138            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1139            // Page in the zero offsets only to avoid readahead strangeness.
1140            for i in 0..message_count {
1141                let hash =
1142                    fixture.write_blob(i.to_le_bytes().as_slice(), CompressionMode::Never).await;
1143                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1144                recorder.record_open(blob.clone()).unwrap();
1145                hashes.push(hash);
1146                recorder.record(blob.clone(), 0).unwrap();
1147            }
1148        };
1149        let device = fixture.close().await;
1150        device.ensure_unique();
1151        state.wait_for_recording_to_finish().await.unwrap();
1152
1153        device.reopen(false);
1154        let fixture = open_blob_fixture(device).await;
1155        {
1156            // Need to get the root vmo to check committed bytes.
1157            // Ensure that nothing is paged in right now.
1158            for hash in &hashes {
1159                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1160                assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1161            }
1162
1163            let volume = fixture.volume().volume();
1164            state.replay_profile(
1165                Box::new(get_test_profile_handle(volume).await),
1166                volume.clone(),
1167                volume.scope().try_active_guard().unwrap(),
1168            );
1169
1170            // Await all data being played back by checking that things have paged in.
1171            for hash in &hashes {
1172                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1173                while blob.vmo().info().unwrap().committed_bytes == 0 {
1174                    fasync::Timer::new(Duration::from_millis(25)).await;
1175                }
1176            }
1177        }
1178        fixture.close().await;
1179    }
1180
1181    #[fuchsia::test]
1182    async fn test_replay_profile_file() {
1183        // Create all the files that we need first, then restart the filesystem to clear cache.
1184        let mut state = new_profile_state(false);
1185
1186        let mut ids = Vec::new();
1187
1188        let fixture = TestFixture::new().await;
1189        {
1190            assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1191            let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1192
1193            let volume = fixture.volume().volume();
1194            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1195            // Page in the zero offsets only to avoid readahead strangeness.
1196            for i in 0..message_count {
1197                let id = write_file(&fixture, &i.to_string(), &[88u8]).await;
1198                let node = fixture
1199                    .volume()
1200                    .volume()
1201                    .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1202                    .await
1203                    .unwrap();
1204                recorder.record_open(node.clone()).unwrap();
1205                ids.push(id);
1206                recorder.record(node.clone(), 0).unwrap();
1207            }
1208        };
1209        let device = fixture.close().await;
1210        device.ensure_unique();
1211        state.wait_for_recording_to_finish().await.unwrap();
1212
1213        device.reopen(false);
1214        let fixture = TestFixture::open(
1215            device,
1216            TestFixtureOptions { encrypted: true, format: false, ..Default::default() },
1217        )
1218        .await;
1219        {
1220            // Ensure that nothing is paged in right now.
1221            for id in &ids {
1222                let file = fixture
1223                    .volume()
1224                    .volume()
1225                    .get_or_load_node(
1226                        *id,
1227                        ObjectDescriptor::File,
1228                        Some(fixture.volume().root_dir()),
1229                    )
1230                    .await
1231                    .unwrap()
1232                    .into_any()
1233                    .downcast::<FxFile>()
1234                    .unwrap();
1235                assert_eq!(file.vmo().info().unwrap().committed_bytes, 0);
1236            }
1237
1238            let volume = fixture.volume().volume();
1239            state.replay_profile(
1240                Box::new(get_test_profile_handle(volume).await),
1241                volume.clone(),
1242                volume.scope().try_active_guard().unwrap(),
1243            );
1244
1245            // Await all data being played back by checking that things have paged in.
1246            for id in &ids {
1247                let file = fixture
1248                    .volume()
1249                    .volume()
1250                    .get_or_load_node(
1251                        *id,
1252                        ObjectDescriptor::File,
1253                        Some(fixture.volume().root_dir()),
1254                    )
1255                    .await
1256                    .unwrap()
1257                    .into_any()
1258                    .downcast::<FxFile>()
1259                    .unwrap();
1260                while file.vmo().info().unwrap().committed_bytes == 0 {
1261                    fasync::Timer::new(Duration::from_millis(25)).await;
1262                }
1263            }
1264            state.wait_for_recording_to_finish().await.unwrap();
1265        }
1266        fixture.close().await;
1267    }
1268
1269    #[fuchsia::test]
1270    async fn test_recording_during_replay() {
1271        let mut state = new_profile_state(true);
1272
1273        let hash;
1274        let first_recording;
1275        let fixture = new_blob_fixture().await;
1276        let volume = fixture.volume().volume();
1277
1278        // First make a simple recording.
1279        {
1280            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1281            hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1282            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1283            recorder.record_open(blob.clone()).unwrap();
1284            recorder.record(blob.clone(), 0).unwrap();
1285        }
1286
1287        state.wait_for_recording_to_finish().await.unwrap();
1288        first_recording = get_test_profile_contents(volume).await;
1289        assert_ne!(first_recording.len(), 0);
1290        let device = fixture.close().await;
1291        device.ensure_unique();
1292
1293        device.reopen(false);
1294        let fixture = open_blob_fixture(device).await;
1295
1296        {
1297            // Need to get the root vmo to check committed bytes.
1298            // Ensure that nothing is paged in right now.
1299            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1300            assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1301
1302            // Start recording
1303            let volume = fixture.volume().volume();
1304            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1305            recorder.record(blob.clone(), 4096).unwrap();
1306
1307            // Replay the original recording.
1308            let volume = fixture.volume().volume();
1309            state.replay_profile(
1310                Box::new(get_test_profile_handle(volume).await),
1311                volume.clone(),
1312                volume.scope().try_active_guard().unwrap(),
1313            );
1314
1315            // Await all data being played back by checking that things have paged in.
1316            {
1317                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1318                while blob.vmo().info().unwrap().committed_bytes == 0 {
1319                    fasync::Timer::new(Duration::from_millis(25)).await;
1320                }
1321            }
1322
1323            // Record the open after the replay. Needs both the before and after action to
1324            // capture anything ensuring that the two procedures overlapped.
1325            recorder.record_open(blob.clone()).unwrap();
1326        }
1327
1328        state.wait_for_recording_to_finish().await.unwrap();
1329
1330        let volume = fixture.volume().volume();
1331        let second_recording = get_test_profile_contents(volume).await;
1332        assert_ne!(second_recording.len(), 0);
1333        assert_ne!(&second_recording, &first_recording);
1334
1335        fixture.close().await;
1336    }
1337
1338    // Doesn't ensure that anything reads back properly, just that everything shuts down when
1339    // stopped early.
1340    #[fuchsia::test]
1341    async fn test_replay_profile_stop_reading_early() {
1342        let mut state = new_profile_state(true);
1343        let fixture = new_blob_fixture().await;
1344
1345        {
1346            let volume = fixture.volume().volume();
1347
1348            // Create the file that we need first.
1349            let message;
1350            {
1351                let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1352                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1353                message = BlobMessage { id: blob.root(), offset: 0 };
1354            }
1355            state.wait_for_recording_to_finish().await.unwrap();
1356
1357            // Make a profile long enough to require 2 reads.
1358            let replay_handle = Box::new(FakeReaderWriter::new());
1359            let mut buff = vec![0u8; IO_SIZE * 2];
1360            message.encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1361            message.encode_to_impl(
1362                (&mut buff[IO_SIZE..IO_SIZE + size_of::<BlobMessage>()]).try_into().unwrap(),
1363            );
1364
1365            replay_handle.inner.lock().data = buff;
1366            let delay1 = Event::new();
1367            replay_handle.push_delay(delay1.listen());
1368            let delay2 = Event::new();
1369            replay_handle.push_delay(delay2.listen());
1370
1371            state.replay_profile(
1372                replay_handle,
1373                volume.clone(),
1374                volume.scope().try_active_guard().unwrap(),
1375            );
1376
1377            // Delay the first read long enough so that the stop can be triggered during it.
1378            fasync::Task::spawn(async move {
1379                // Let the profiler wait on this a little.
1380                fasync::Timer::new(Duration::from_millis(100)).await;
1381                delay1.notify(usize::MAX);
1382            })
1383            .detach();
1384        }
1385
1386        // The reader should block indefinitely (we never notify delay2), but that shouldn't block
1387        // termination.
1388        fixture.close().await;
1389    }
1390
1391    #[fuchsia::test]
1392    async fn test_replay_blob_missing() {
1393        let fixture = new_blob_fixture().await;
1394        // Create the blob that comes after the missing blob. Ensure it still gets
1395        // recorded.
1396        let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1397        let mut buff = vec![0u8; IO_SIZE];
1398        {
1399            // First encode the blob that is missing. Just make it up. This will be skipped during
1400            // replay.
1401            {
1402                let message = BlobMessage { id: [42u8; 32].into(), offset: 0 };
1403                message
1404                    .encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1405            }
1406
1407            // Create the blob that won't be missing and encode that.
1408            {
1409                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1410                let message = BlobMessage { id: blob.root(), offset: 0 };
1411                message.encode_to_impl(
1412                    (&mut buff[size_of::<BlobMessage>()..(size_of::<BlobMessage>() * 2)])
1413                        .try_into()
1414                        .unwrap(),
1415                );
1416            }
1417        }
1418        let device = fixture.close().await;
1419        device.ensure_unique();
1420
1421        device.reopen(false);
1422        let fixture = open_blob_fixture(device).await;
1423        {
1424            let mut state = new_profile_state(true);
1425            let volume = fixture.volume().volume();
1426
1427            let replay_handle = Box::new(FakeReaderWriter::new());
1428            replay_handle.inner.lock().data = buff;
1429
1430            state.replay_profile(
1431                replay_handle,
1432                volume.clone(),
1433                volume.scope().try_active_guard().unwrap(),
1434            );
1435
1436            // Wait for the replay to populate the page.
1437            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1438            while blob.vmo().info().unwrap().committed_bytes == 0 {
1439                fasync::Timer::new(Duration::from_millis(25)).await;
1440            }
1441        }
1442        fixture.close().await;
1443    }
1444
1445    #[fuchsia::test]
1446    async fn test_replay_file_missing_or_tombstoned() {
1447        let fixture = TestFixture::new().await;
1448        let mut buff = vec![0u8; IO_SIZE];
1449        // Create the blob that comes after the missing blob. Ensure it still gets
1450        // recorded.
1451        let remaining_file_id;
1452        let tombstoned_file_id;
1453        // First encode the file that is missing.
1454        {
1455            let id = write_file(&fixture, "foo", &[1, 2, 3, 4]).await;
1456            let message = FileMessage { id, offset: 0 };
1457            message.encode_to_impl((&mut buff[0..size_of::<FileMessage>()]).try_into().unwrap());
1458        }
1459        // Remove the file now.
1460        fixture
1461            .root()
1462            .unlink("foo", &fio::UnlinkOptions::default())
1463            .await
1464            .unwrap()
1465            .expect("Unlinking");
1466
1467        // Encode the file that will be tombstoned during replay.
1468        {
1469            tombstoned_file_id = write_file(&fixture, "bar", &[1, 2, 3, 4]).await;
1470            let message = FileMessage { id: tombstoned_file_id, offset: 0 };
1471            message.encode_to_impl(
1472                (&mut buff[size_of::<FileMessage>()..(size_of::<FileMessage>() * 2)])
1473                    .try_into()
1474                    .unwrap(),
1475            );
1476        }
1477
1478        // Encode the file that will remain and be replayed last.
1479        {
1480            remaining_file_id = write_file(&fixture, "baz", &[1, 2, 3, 4]).await;
1481            let message = FileMessage { id: remaining_file_id, offset: 0 };
1482            message.encode_to_impl(
1483                (&mut buff[(size_of::<FileMessage>() * 2)..(size_of::<FileMessage>() * 3)])
1484                    .try_into()
1485                    .unwrap(),
1486            );
1487        }
1488        let device = fixture.close().await;
1489        device.ensure_unique();
1490
1491        device.reopen(false);
1492        let fixture =
1493            TestFixture::open(device, TestFixtureOptions { format: false, ..Default::default() })
1494                .await;
1495        {
1496            // Get a ref to the Arc on the file, then unlink it. Since the open count is zero it
1497            // should get marked for tombstone right away.
1498            let tombstoned_file = fixture
1499                .volume()
1500                .volume()
1501                .get_or_load_node(tombstoned_file_id, ObjectDescriptor::File, None)
1502                .await
1503                .expect("Opening file object")
1504                .into_any()
1505                .downcast::<FxFile>()
1506                .unwrap();
1507            fixture
1508                .root()
1509                .unlink("bar", &fio::UnlinkOptions::default())
1510                .await
1511                .unwrap()
1512                .expect("Unlinking");
1513
1514            let mut state = new_profile_state(false);
1515            let volume = fixture.volume().volume();
1516
1517            let replay_handle = Box::new(FakeReaderWriter::new());
1518            replay_handle.inner.lock().data = buff;
1519
1520            state.replay_profile(
1521                replay_handle,
1522                volume.clone(),
1523                volume.scope().try_active_guard().unwrap(),
1524            );
1525
1526            // Wait for the replay to populate the page.
1527            let remaining_file = fixture
1528                .volume()
1529                .volume()
1530                .get_or_load_node(remaining_file_id, ObjectDescriptor::File, None)
1531                .await
1532                .expect("Opening file object")
1533                .into_any()
1534                .downcast::<FxFile>()
1535                .unwrap();
1536            while remaining_file.vmo().info().unwrap().committed_bytes == 0 {
1537                fasync::Timer::new(Duration::from_millis(25)).await;
1538            }
1539
1540            // The tombstoned file should not have anything committed because it shouldn't be able
1541            // to open.
1542            assert_eq!(tombstoned_file.vmo().info().unwrap().committed_bytes, 0);
1543        }
1544        fixture.close().await;
1545    }
1546}