Skip to main content

fxfs_platform/fuchsia/
profile.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::file::FxFile;
6use crate::fuchsia::fxblob::BlobDirectory;
7use crate::fuchsia::fxblob::blob::FxBlob;
8use crate::fuchsia::node::{FxNode, OpenedNode};
9use crate::fuchsia::pager::PagerBacked;
10use crate::fuchsia::volume::FxVolume;
11use anyhow::{Context as _, Error, anyhow, ensure};
12use arrayref::{array_refs, mut_array_refs};
13use async_trait::async_trait;
14use fuchsia_async as fasync;
15use fuchsia_hash::Hash;
16use futures::future::{self, BoxFuture, RemoteHandle, join_all};
17use futures::lock::Mutex;
18use futures::{FutureExt, select};
19use fxfs::errors::FxfsError;
20use fxfs::log::*;
21use fxfs::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle};
22use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
23use fxfs::object_store::{
24    HandleOptions, ObjectDescriptor, ObjectStore, Timestamp, VOLUME_DATA_KEY_ID, directory,
25};
26use linked_hash_map::LinkedHashMap;
27use scopeguard::ScopeGuard;
28use std::cmp::{Eq, PartialEq};
29use std::collections::btree_map::{BTreeMap, Entry};
30use std::marker::PhantomData;
31use std::mem::size_of;
32use std::pin::pin;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicU64, Ordering};
35use vfs::execution_scope::ActiveGuard;
36
37const FILE_OPEN_MARKER: u64 = u64::MAX;
38const REPLAY_THREADS: usize = 2;
39// The number of messages to buffer before sending to record. They are chunked up to reduce the
40// number of allocations in the serving threads.
41const MESSAGE_CHUNK_SIZE: usize = 64;
42const IO_SIZE: usize = 1 << 17; // 128KiB. Needs to be a power of 2 and >= block size.
43
44pub static RECORDED: AtomicU64 = AtomicU64::new(0);
45
46trait RecordedVolume: Send + Sync + Sized + Unpin {
47    type IdType: std::fmt::Display + Ord + Send + Sized;
48    type NodeType: PagerBacked;
49    type MessageType: Message<IdType = Self::IdType>;
50
51    fn new(volume: Arc<FxVolume>) -> Self;
52
53    fn volume(&self) -> &Arc<FxVolume>;
54
55    fn open(
56        &self,
57        id: Self::IdType,
58    ) -> impl std::future::Future<Output = Result<OpenedNode<Self::NodeType>, Error>> + Send;
59
60    /// Filters out open markers for files that may not be usable in the profile.
61    fn file_is_replayable(
62        &self,
63        id: &Self::IdType,
64    ) -> impl std::future::Future<Output = bool> + Send;
65
66    fn read_and_queue(
67        &self,
68        handle: Box<dyn ReadObjectHandle>,
69        sender: &async_channel::Sender<Request<Self::NodeType>>,
70        local_cache: &mut BTreeMap<Self::IdType, Option<OpenedNode<Self::NodeType>>>,
71    ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
72        async move {
73            let mut io_buf = handle.allocate_buffer(IO_SIZE).await;
74            let block_size = handle.block_size() as usize;
75            let file_size = handle.get_size() as usize;
76            let mut offset = 0;
77            while offset < file_size {
78                let actual = handle
79                    .read(offset as u64, io_buf.as_mut())
80                    .await
81                    .map_err(|e| e.context(format!("Failed to read at offset: {}", offset)))?;
82                offset += actual;
83                let mut local_offset = 0;
84                let mut next_block = block_size;
85                let mut next_offset = size_of::<Self::MessageType>();
86                while next_offset <= actual {
87                    let msg = Self::MessageType::decode_from(
88                        &io_buf.as_slice()[local_offset..next_offset],
89                    );
90
91                    local_offset = next_offset;
92                    next_offset = local_offset + size_of::<Self::MessageType>();
93                    // Messages don't overlap block boundaries.
94                    if next_offset > next_block {
95                        local_offset = next_block;
96                        next_offset = local_offset + size_of::<Self::MessageType>();
97                        next_block += block_size;
98                    }
99
100                    // Ignore trailing zeroes. This is technically a valid entry but extremely
101                    // unlikely and will only break an optimization.
102                    if msg.is_zeroes() {
103                        break;
104                    }
105
106                    let file = match local_cache.entry(msg.id()) {
107                        Entry::Occupied(entry) => match entry.get() {
108                            Some(opened_file) => (*opened_file).clone(),
109                            // Found a cached error.
110                            None => continue,
111                        },
112                        Entry::Vacant(entry) => match self.open(msg.id()).await {
113                            Err(e) => {
114                                debug!("Failed to open object {} from profile: {:?}", msg.id(), e);
115                                // Cache the error.
116                                entry.insert(None);
117                                continue;
118                            }
119                            Ok(opened_file) => {
120                                let file_clone = (*opened_file).clone();
121                                entry.insert(Some(opened_file));
122                                file_clone
123                            }
124                        },
125                    };
126
127                    sender.send(Request { file, offset: msg.offset() }).await?;
128                }
129            }
130            Ok(())
131        }
132    }
133
134    fn record(
135        &self,
136        name: &str,
137        receiver: async_channel::Receiver<Vec<Self::MessageType>>,
138    ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
139        async move {
140            let mut recorded_offsets = LinkedHashMap::<Self::MessageType, ()>::new();
141            let mut recorded_opens = BTreeMap::<Self::IdType, bool>::new();
142            while let Ok(buffer) = receiver.recv().await {
143                for message in buffer {
144                    if message.is_open_marker() {
145                        if let Entry::Vacant(entry) = recorded_opens.entry(message.id()) {
146                            let usable = self.file_is_replayable(entry.key()).await;
147                            entry.insert(usable);
148                        }
149                    } else {
150                        recorded_offsets.insert(message, ());
151                    }
152                }
153            }
154
155            // Start a recording handle. Put it in the graveyard in case we can't properly complete
156            // it.
157            let store = self.volume().store();
158            let fs = store.filesystem();
159            let mut transaction =
160                fs.clone().new_transaction(lock_keys![], Options::default()).await?;
161            let handle = ObjectStore::create_object(
162                self.volume(),
163                &mut transaction,
164                HandleOptions::default(),
165                None,
166            )
167            .await?;
168            store.add_to_graveyard(&mut transaction, handle.object_id());
169            transaction.commit().await?;
170
171            let clean_up = scopeguard::guard((), |_| {
172                fs.graveyard().queue_tombstone_object(store.store_object_id(), handle.object_id())
173            });
174
175            let block_size = handle.block_size() as usize;
176            let mut offset = 0;
177            let mut io_buf = handle.allocate_buffer(IO_SIZE).await;
178            let mut next_block = block_size;
179            while let Some((message, _)) = recorded_offsets.pop_front() {
180                // If a file opening was never recorded, or it is not usable drop the message.
181                if !recorded_opens.get(&message.id()).copied().unwrap_or(false) {
182                    continue;
183                }
184
185                let mut next_offset = offset + size_of::<Self::MessageType>();
186                if next_offset > next_block {
187                    // Zero the remainder of the block. Stopping on block boundaries allows us to
188                    // resize the I/O without supporting reading/writing half messages to a buffer.
189                    io_buf.as_mut_slice()[offset..next_block].fill(0);
190                    if next_block >= IO_SIZE {
191                        // The buffer is full.  Write it out.
192                        handle
193                            .write_or_append(None, io_buf.as_ref())
194                            .await
195                            .context("Failed to write profile block")?;
196                        offset = 0;
197                        next_offset = size_of::<Self::MessageType>();
198                        next_block = block_size;
199                    } else {
200                        offset = next_block;
201                        next_offset = offset + size_of::<Self::MessageType>();
202                        next_block += block_size;
203                    }
204                }
205                message.encode_to(&mut io_buf.as_mut_slice()[offset..next_offset]);
206                offset = next_offset;
207            }
208            if offset > 0 {
209                io_buf.as_mut_slice()[offset..next_block].fill(0);
210                handle
211                    .write_or_append(None, io_buf.subslice(0..next_block))
212                    .await
213                    .context("Failed to write profile block")?;
214            }
215
216            let profile_dir = self.volume().get_profile_directory().await?;
217
218            let mut lock_keys =
219                lock_keys![LockKey::object(store.store_object_id(), profile_dir.object_id())];
220            let mut old_id = INVALID_OBJECT_ID;
221            let mut transaction = loop {
222                let transaction = fs.clone().new_transaction(lock_keys, Options::default()).await?;
223                if let Some((id, descriptor, _)) = profile_dir.lookup(name).await? {
224                    ensure!(matches!(descriptor, ObjectDescriptor::File), FxfsError::Inconsistent);
225                    if id == old_id {
226                        break transaction;
227                    }
228                    lock_keys = lock_keys![
229                        LockKey::object(store.store_object_id(), profile_dir.object_id()),
230                        LockKey::object(store.store_object_id(), id)
231                    ];
232                    old_id = id;
233                } else {
234                    old_id = INVALID_OBJECT_ID;
235                    break transaction;
236                }
237            };
238
239            store.remove_from_graveyard(&mut transaction, handle.object_id());
240            directory::replace_child_with_object(
241                &mut transaction,
242                Some((handle.object_id(), ObjectDescriptor::File)),
243                (&profile_dir, name),
244                0,
245                Timestamp::now(),
246            )
247            .await?;
248            transaction.commit().await?;
249
250            ScopeGuard::into_inner(clean_up);
251            if old_id != INVALID_OBJECT_ID {
252                fs.graveyard().queue_tombstone_object(store.store_object_id(), old_id);
253            }
254
255            Ok(())
256        }
257    }
258}
259
260struct BlobVolume {
261    volume: Arc<FxVolume>,
262    // Cache the open blob directory here. The Mutex is just to make this Send, but it is not
263    // actually used concurrently.
264    root_dir: Mutex<Option<Arc<BlobDirectory>>>,
265}
266
267impl RecordedVolume for BlobVolume {
268    type IdType = Hash;
269    type NodeType = FxBlob;
270    type MessageType = BlobMessage;
271
272    fn new(volume: Arc<FxVolume>) -> Self {
273        Self { volume, root_dir: Mutex::new(None) }
274    }
275
276    fn volume(&self) -> &Arc<FxVolume> {
277        &self.volume
278    }
279
280    async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
281        let mut root_dir = self.root_dir.lock().await;
282        if root_dir.is_none() {
283            *root_dir = Some(
284                self.volume
285                    .get_or_load_node(
286                        self.volume.store().root_directory_object_id(),
287                        ObjectDescriptor::Directory,
288                        None,
289                    )
290                    .await?
291                    .into_any()
292                    .downcast::<BlobDirectory>()
293                    .map_err(|_| FxfsError::Inconsistent)?,
294            );
295        };
296        root_dir
297            .as_ref()
298            .unwrap()
299            .open_blob(&id.into())
300            .await?
301            .ok_or_else(|| FxfsError::NotFound.into())
302    }
303
304    async fn file_is_replayable(&self, _id: &Self::IdType) -> bool {
305        // There is nothing is filter out in blob volumes.
306        true
307    }
308}
309
310struct FileVolume {
311    volume: Arc<FxVolume>,
312}
313
314impl RecordedVolume for FileVolume {
315    type IdType = u64;
316    type NodeType = FxFile;
317    type MessageType = FileMessage;
318
319    fn new(volume: Arc<FxVolume>) -> Self {
320        Self { volume }
321    }
322
323    fn volume(&self) -> &Arc<FxVolume> {
324        &self.volume
325    }
326
327    async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
328        self.volume
329            .get_or_load_node(id, ObjectDescriptor::File, None)
330            .await?
331            .into_any()
332            .downcast::<FxFile>()
333            .map_err(|_| anyhow!("Non-file opened"))?
334            .into_opened_node()
335            .ok_or_else(|| anyhow!("File being purged"))
336    }
337
338    async fn file_is_replayable(&self, id: &Self::IdType) -> bool {
339        match self.volume.store().get_keys(*id).await {
340            // If any keys are not the volume key id, then the file may not be readable later.
341            // If there's more than one, then at least one is not the volume key.
342            Ok(keys)
343                if keys.is_empty()
344                    || (keys.len() == 1 && keys.first().unwrap().0 == VOLUME_DATA_KEY_ID) =>
345            {
346                true
347            }
348            _ => false,
349        }
350    }
351}
352
353trait Message: Eq + PartialEq + Sized + Send + Sync + std::hash::Hash + 'static {
354    type IdType: std::fmt::Display + Ord + Send + Sized;
355
356    fn id(&self) -> Self::IdType;
357    fn offset(&self) -> u64;
358    fn encode_to(&self, dest: &mut [u8]);
359    fn decode_from(src: &[u8]) -> Self;
360    fn is_zeroes(&self) -> bool;
361    fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error>;
362    fn is_open_marker(&self) -> bool;
363}
364
365#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
366struct BlobMessage {
367    id: Hash,
368    // Don't bother with offset+length. The kernel is going split up and align it one way and then
369    // we're going to change it all with read-ahead/read-around.
370    offset: u64,
371}
372
373impl BlobMessage {
374    fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
375        let (first, second) = mut_array_refs![dest, size_of::<Hash>(), size_of::<u64>()];
376        *first = self.id.into();
377        *second = self.offset.to_le_bytes();
378    }
379
380    fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
381        let (first, second) = array_refs!(src, size_of::<Hash>(), size_of::<u64>());
382        Self { id: Hash::from_array(*first), offset: u64::from_le_bytes(*second) }
383    }
384}
385
386impl Message for BlobMessage {
387    type IdType = Hash;
388
389    fn id(&self) -> Self::IdType {
390        self.id
391    }
392
393    fn offset(&self) -> u64 {
394        self.offset
395    }
396
397    fn encode_to(&self, dest: &mut [u8]) {
398        self.encode_to_impl(dest.try_into().unwrap());
399    }
400
401    fn decode_from(src: &[u8]) -> Self {
402        Self::decode_from_impl(src.try_into().unwrap())
403    }
404
405    fn is_zeroes(&self) -> bool {
406        self.id == Hash::from_array([0u8; size_of::<Hash>()]) && self.offset == 0
407    }
408
409    fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
410        match node.into_any().downcast::<FxBlob>() {
411            Ok(blob) => Ok(Self { id: blob.root(), offset }),
412            Err(_) => Err(anyhow!("Cannot record non-blob entry.")),
413        }
414    }
415
416    fn is_open_marker(&self) -> bool {
417        self.offset == FILE_OPEN_MARKER
418    }
419}
420
421#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
422struct FileMessage {
423    id: u64,
424    // Don't bother with offset+length. The kernel is going split up and align it one way and then
425    // we're going to change it all with read-ahead/read-around.
426    offset: u64,
427}
428
429impl FileMessage {
430    fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
431        let (first, second) = mut_array_refs![dest, size_of::<u64>(), size_of::<u64>()];
432        *first = self.id.to_le_bytes();
433        *second = self.offset.to_le_bytes();
434    }
435
436    fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
437        let (first, second) = array_refs!(src, size_of::<u64>(), size_of::<u64>());
438        Self { id: u64::from_le_bytes(*first), offset: u64::from_le_bytes(*second) }
439    }
440}
441
442impl Message for FileMessage {
443    type IdType = u64;
444
445    fn id(&self) -> Self::IdType {
446        self.id
447    }
448
449    fn offset(&self) -> u64 {
450        self.offset
451    }
452
453    fn encode_to(&self, dest: &mut [u8]) {
454        self.encode_to_impl(dest.try_into().unwrap())
455    }
456
457    fn decode_from(src: &[u8]) -> Self {
458        Self::decode_from_impl(src.try_into().unwrap())
459    }
460
461    fn is_zeroes(&self) -> bool {
462        self.id == 0 && self.offset == 0
463    }
464
465    fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
466        match node.into_any().downcast::<FxFile>() {
467            Ok(file) => Ok(Self { id: file.object_id(), offset }),
468            Err(_) => Err(anyhow!("Cannot record non-file entry")),
469        }
470    }
471
472    fn is_open_marker(&self) -> bool {
473        self.offset == FILE_OPEN_MARKER
474    }
475}
476
477/// Takes messages to be written into the current profile. This should be dropped before the
478/// recording is stopped to ensure that all messages have been flushed to the writer thread.
479pub trait Recorder: Send + Sync {
480    /// Record a page in request, for the given identifier and offset.
481    fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error>;
482
483    /// Record file opens to gather what files were actually used during the recording.
484    fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error>;
485}
486
487struct RecorderImpl<T: Message> {
488    sender: async_channel::Sender<Vec<T>>,
489    buffer: Vec<T>,
490}
491
492impl<T: Message> RecorderImpl<T> {
493    fn new(sender: async_channel::Sender<Vec<T>>) -> Self {
494        Self { sender, buffer: Vec::with_capacity(MESSAGE_CHUNK_SIZE) }
495    }
496}
497
498impl<T: Message> Recorder for RecorderImpl<T> {
499    fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error> {
500        self.buffer.push(T::from_node_request(node, offset)?);
501        if self.buffer.len() >= MESSAGE_CHUNK_SIZE {
502            // try_send to avoid async await, we use an unbounded channel anyways so any failure
503            // here should only be if the channel is closed, which is permanent anyways.
504            self.sender.try_send(std::mem::replace(
505                &mut self.buffer,
506                Vec::with_capacity(MESSAGE_CHUNK_SIZE),
507            ))?;
508        }
509        RECORDED.fetch_add(1, Ordering::Relaxed);
510        Ok(())
511    }
512
513    fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error> {
514        self.record(node, FILE_OPEN_MARKER)
515    }
516}
517
518impl<T: Message> Drop for RecorderImpl<T> {
519    fn drop(&mut self) {
520        // Best effort sending what messages have already been queued.
521        if self.buffer.len() > 0 {
522            let buffer = std::mem::take(&mut self.buffer);
523            let _ = self.sender.try_send(buffer);
524        }
525    }
526}
527
528struct Request<P: PagerBacked> {
529    file: Arc<P>,
530    offset: u64,
531}
532
533struct ReplayState<T> {
534    replay_threads: future::Shared<BoxFuture<'static, ()>>,
535    _cache_task: fasync::Task<()>,
536    _phantom: PhantomData<T>,
537}
538
539impl<T: RecordedVolume> ReplayState<T> {
540    fn new(handle: Box<dyn ReadObjectHandle>, volume: Arc<FxVolume>, guard: ActiveGuard) -> Self {
541        let (sender, receiver) = async_channel::unbounded::<Request<T::NodeType>>();
542
543        // Create async_channel. An async thread reads and populates the channel, then N threads
544        // consume it and touch pages.
545        let mut replay_threads = Vec::with_capacity(REPLAY_THREADS);
546        for _ in 0..REPLAY_THREADS {
547            let receiver = receiver.clone();
548            // The replay threads can have references to files so we make sure they have a guard
549            // so that shutdown will wait till they have been joined.
550            let guard = guard.clone();
551            replay_threads.push(fasync::unblock(move || {
552                let _guard = guard;
553                Self::page_in_thread(receiver);
554            }));
555        }
556        let replay_threads = (Box::pin(async {
557            join_all(replay_threads).await;
558        }) as BoxFuture<'static, ()>)
559            .shared();
560
561        let scope = volume.scope().clone();
562        let cache_task = scope
563            .spawn({
564                // The replay threads hold active guards, so we must watch for cancellation.  When
565                // cancelled, we'll drop the sender which will cause the replay threads to drop
566                // their guards, which will allow shutdown to proceed.
567                async move {
568                    let mut task = pin!(
569                        async {
570                            // Hold the items in cache until replay is stopped. Optional as None
571                            // indicates that the file could not be opened, and we want to cache that
572                            // failure.
573                            let mut local_cache: BTreeMap<
574                                T::IdType,
575                                Option<OpenedNode<T::NodeType>>,
576                            > = BTreeMap::new();
577
578                            let volume_id = volume.id();
579
580                            if let Err(error) = T::new(volume)
581                                .read_and_queue(handle, &sender, &mut local_cache)
582                                .await
583                            {
584                                error!(error:?; "Failed to read back profile");
585                            }
586                            sender.close();
587
588                            info!(
589                                "Replay for volume {} opened {} of {} objects.",
590                                volume_id,
591                                local_cache.iter().filter(|(_, e)| e.is_some()).count(),
592                                local_cache.len()
593                            );
594
595                            // Keep the cache alive until dropped.
596                            let () = std::future::pending().await;
597                        }
598                        .fuse()
599                    );
600
601                    select! {
602                        _ = task => {}
603                        _ = guard.on_cancel().fuse() => {}
604                    }
605                }
606            })
607            .into();
608
609        Self { replay_threads, _cache_task: cache_task, _phantom: PhantomData }
610    }
611
612    fn page_in_thread(queue: async_channel::Receiver<Request<T::NodeType>>) {
613        while let Ok(request) = queue.recv_blocking() {
614            let _ = request.file.vmo().op_range(
615                zx::VmoOp::PREFETCH,
616                request.offset,
617                zx::system_get_page_size() as u64,
618            );
619            // If the volume is shutdown, the sender will be dropped.
620            if queue.sender_count() == 0 {
621                return;
622            }
623        }
624    }
625}
626
627/// Holds the current profile recording and/or replay state, and provides methods for state
628/// transitions.
629#[async_trait]
630pub trait ProfileState: Send + Sync {
631    /// Creates a new recording and returns the `Recorder` object to record to. The recording
632    /// finalizes when the associated `Recorder` is dropped.  Stops any recording currently in
633    /// progress.
634    fn record_new(&mut self, volume: &Arc<FxVolume>, name: &str) -> Box<dyn Recorder>;
635
636    /// Reads given handle to parse a profile and replay it by requesting pages via
637    /// ZX_VMO_OP_PREFETCH in blocking background threads. Stops any replay currently in progress.
638    fn replay_profile(
639        &mut self,
640        handle: Box<dyn ReadObjectHandle>,
641        volume: Arc<FxVolume>,
642        guard: ActiveGuard,
643    );
644
645    /// Waits for replay to finish, but does not drop the cache.  The cache will be dropped when
646    /// the ProfileState impl is dropped.  This is fine to call multiple times.
647    async fn wait_for_replay_to_finish(&mut self);
648
649    /// Waits for the recording to finish.
650    async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error>;
651}
652
653pub fn new_profile_state(is_blob: bool) -> Box<dyn ProfileState> {
654    if is_blob {
655        Box::new(ProfileStateImpl::<BlobVolume>::new())
656    } else {
657        Box::new(ProfileStateImpl::<FileVolume>::new())
658    }
659}
660
661struct ProfileStateImpl<T> {
662    recording: Option<RemoteHandle<Result<(), Error>>>,
663    replay: Option<ReplayState<T>>,
664}
665
666impl<T> ProfileStateImpl<T> {
667    fn new() -> Self {
668        Self { recording: None, replay: None }
669    }
670}
671
672#[async_trait]
673impl<T: RecordedVolume> ProfileState for ProfileStateImpl<T> {
674    fn record_new(&mut self, volume: &Arc<FxVolume>, name: &str) -> Box<dyn Recorder> {
675        let (sender, receiver) = async_channel::unbounded();
676        let volume = volume.clone();
677        let name = name.to_string();
678        // Cancel the previous recording (if any).
679        self.recording = None;
680        let scope = volume.scope().clone();
681        let (task, remote_handle) = async move {
682            let recording = T::new(volume);
683            recording
684                .record(&name, receiver)
685                .await
686                .inspect_err(|error| warn!(error:?; "Profile recording '{name}' failed"))
687        }
688        .remote_handle();
689        self.recording = Some(remote_handle);
690        scope.spawn(task);
691        Box::new(RecorderImpl::new(sender))
692    }
693
694    fn replay_profile(
695        &mut self,
696        handle: Box<dyn ReadObjectHandle>,
697        volume: Arc<FxVolume>,
698        guard: ActiveGuard,
699    ) {
700        self.replay = Some(ReplayState::new(handle, volume, guard));
701    }
702
703    async fn wait_for_replay_to_finish(&mut self) {
704        if let Some(replay) = &mut self.replay {
705            replay.replay_threads.clone().await;
706        }
707    }
708
709    async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error> {
710        if let Some(recording) = self.recording.take() { recording.await } else { Ok(()) }
711    }
712}
713
714#[cfg(test)]
715mod tests {
716    use super::{
717        BlobMessage, BlobVolume, FileMessage, FileVolume, IO_SIZE, Message, RecordedVolume,
718        Request, new_profile_state,
719    };
720    use crate::fuchsia::file::FxFile;
721    use crate::fuchsia::fxblob::blob::FxBlob;
722    use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture, open_blob_fixture};
723    use crate::fuchsia::node::{FxNode, OpenedNode};
724    use crate::fuchsia::pager::PagerBacked;
725    use crate::fuchsia::testing::{TestFixture, TestFixtureOptions, open_file_checked};
726    use crate::fuchsia::volume::FxVolume;
727    use anyhow::Error;
728    use async_trait::async_trait;
729    use delivery_blob::CompressionMode;
730    use event_listener::{Event, EventListener};
731    use fuchsia_hash::Hash;
732    use fuchsia_sync::Mutex;
733    use fxfs::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
734    use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
735    use fxfs::object_store::{DataObjectHandle, HandleOptions, ObjectDescriptor, ObjectStore};
736    use std::collections::BTreeMap;
737    use std::mem::size_of;
738    use std::sync::Arc;
739    use std::time::Duration;
740    use storage_device::buffer::{BufferRef, MutableBufferRef};
741    use storage_device::buffer_allocator::{BufferAllocator, BufferFuture, BufferSource};
742    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
743
744    struct FakeReaderWriterInner {
745        data: Vec<u8>,
746        delays: Vec<EventListener>,
747    }
748
749    struct FakeReaderWriter {
750        allocator: BufferAllocator,
751        inner: Arc<Mutex<FakeReaderWriterInner>>,
752    }
753
754    const BLOCK_SIZE: usize = 4096;
755
756    impl FakeReaderWriter {
757        fn new() -> Self {
758            Self {
759                allocator: BufferAllocator::new(BLOCK_SIZE, BufferSource::new(IO_SIZE * 2)),
760                inner: Arc::new(Mutex::new(FakeReaderWriterInner {
761                    data: Vec::new(),
762                    delays: Vec::new(),
763                })),
764            }
765        }
766
767        fn push_delay(&self, delay: EventListener) {
768            self.inner.lock().delays.insert(0, delay);
769        }
770    }
771
772    impl ObjectHandle for FakeReaderWriter {
773        fn object_id(&self) -> u64 {
774            0
775        }
776
777        fn block_size(&self) -> u64 {
778            self.allocator.block_size() as u64
779        }
780
781        fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
782            self.allocator.allocate_buffer(size)
783        }
784    }
785
786    impl WriteObjectHandle for FakeReaderWriter {
787        async fn write_or_append(
788            &self,
789            offset: Option<u64>,
790            buf: BufferRef<'_>,
791        ) -> Result<u64, Error> {
792            // We only append for now.
793            assert!(offset.is_none());
794            let delay = self.inner.lock().delays.pop();
795            if let Some(delay) = delay {
796                delay.await;
797            }
798            // This relocking has a TOCTOU flavour, but it shouldn't matter for this application.
799            self.inner.lock().data.extend_from_slice(buf.as_slice());
800            Ok(buf.len() as u64)
801        }
802
803        async fn truncate(&self, _size: u64) -> Result<(), Error> {
804            unreachable!();
805        }
806
807        async fn flush(&self) -> Result<(), Error> {
808            unreachable!();
809        }
810    }
811
812    async fn write_file(fixture: &TestFixture, name: &str, data: &[u8]) -> u64 {
813        let root_dir = fixture.volume().root_dir();
814        let mut transaction = fixture
815            .volume()
816            .volume()
817            .store()
818            .filesystem()
819            .new_transaction(
820                lock_keys![LockKey::object(
821                    fixture.volume().volume().store().store_object_id(),
822                    root_dir.object_id()
823                )],
824                Options::default(),
825            )
826            .await
827            .expect("Creating transaction for new file");
828        let id = root_dir
829            .directory()
830            .create_child_file(&mut transaction, name)
831            .await
832            .expect("Creating new_file")
833            .object_id();
834        transaction.commit().await.unwrap();
835        let file = open_file_checked(
836            fixture.root(),
837            name,
838            fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_FILE,
839            &Default::default(),
840        )
841        .await;
842        file.write(data).await.unwrap().expect("Writing file");
843        id
844    }
845
846    #[async_trait]
847    impl ReadObjectHandle for FakeReaderWriter {
848        async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
849            let delay = self.inner.lock().delays.pop();
850            if let Some(delay) = delay {
851                delay.await;
852            }
853            // This relocking has a TOCTOU flavour, but it shouldn't matter for this application.
854            let inner = self.inner.lock();
855            assert!(offset as usize <= inner.data.len());
856            let offset_end = std::cmp::min(offset as usize + buf.len(), inner.data.len());
857            let size = offset_end - offset as usize;
858            buf.as_mut_slice()[..size].clone_from_slice(&inner.data[offset as usize..offset_end]);
859            Ok(size)
860        }
861
862        fn get_size(&self) -> u64 {
863            self.inner.lock().data.len() as u64
864        }
865    }
866
867    #[fuchsia::test]
868    async fn test_encode_decode_blob() {
869        let mut buf = [0u8; size_of::<BlobMessage>()];
870        let m = BlobMessage { id: [88u8; 32].into(), offset: 77 };
871        m.encode_to(&mut buf.as_mut_slice());
872        let m2 = BlobMessage::decode_from(&buf);
873        assert_eq!(m, m2);
874    }
875
876    #[fuchsia::test]
877    async fn test_encode_decode_file() {
878        let mut buf = [0u8; size_of::<FileMessage>()];
879        let m = FileMessage { id: 88, offset: 77 };
880        m.encode_to(&mut buf.as_mut_slice());
881        let m2 = FileMessage::decode_from(&buf);
882        assert!(!m2.is_zeroes());
883        assert_eq!(m, m2);
884    }
885
886    const TEST_PROFILE_NAME: &str = "test_profile";
887
888    async fn get_test_profile_handle(volume: &Arc<FxVolume>) -> DataObjectHandle<FxVolume> {
889        let profile_dir = volume.get_profile_directory().await.unwrap();
890        ObjectStore::open_object(
891            volume,
892            profile_dir
893                .lookup(TEST_PROFILE_NAME)
894                .await
895                .expect("lookup failed")
896                .expect("not found")
897                .0,
898            HandleOptions::default(),
899            None,
900        )
901        .await
902        .unwrap()
903    }
904
905    async fn get_test_profile_contents(volume: &Arc<FxVolume>) -> Vec<u8> {
906        get_test_profile_handle(volume).await.contents(1024 * 1024).await.unwrap().to_vec()
907    }
908
909    #[fuchsia::test]
910    async fn test_recording_basic_blob() {
911        let fixture = new_blob_fixture().await;
912        {
913            let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
914            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
915
916            let mut state = new_profile_state(true);
917            let volume = fixture.volume().volume();
918
919            {
920                // Drop recorder when finished writing to flush data.
921                let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
922                recorder.record(blob.clone(), 0).unwrap();
923                recorder.record_open(blob).unwrap();
924            }
925
926            state.wait_for_recording_to_finish().await.unwrap();
927
928            assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
929        }
930        fixture.close().await;
931    }
932
933    #[fuchsia::test]
934    async fn test_recording_basic_file() {
935        let fixture = TestFixture::new().await;
936        {
937            let id = write_file(&fixture, "foo", &[88u8]).await;
938            let node = fixture
939                .volume()
940                .volume()
941                .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
942                .await
943                .unwrap();
944
945            let mut state = new_profile_state(false);
946            let volume = fixture.volume().volume();
947
948            {
949                // Drop recorder when finished writing to flush data.
950                let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
951                recorder.record(node.clone(), 0).unwrap();
952                recorder.record_open(node).unwrap();
953            }
954            state.wait_for_recording_to_finish().await.unwrap();
955
956            assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
957        }
958        fixture.close().await;
959    }
960
961    #[fuchsia::test]
962    async fn test_recording_filtered_without_open() {
963        let fixture = new_blob_fixture().await;
964        {
965            let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
966            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
967
968            let mut state = new_profile_state(true);
969            let volume = fixture.volume().volume();
970
971            {
972                // Drop recorder when finished writing to flush data.
973                let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
974                recorder.record(blob.clone(), 0).unwrap();
975            }
976            state.wait_for_recording_to_finish().await.unwrap();
977
978            assert_eq!(get_test_profile_contents(volume).await.len(), 0);
979        }
980        fixture.close().await;
981    }
982
983    #[fuchsia::test]
984    async fn test_recording_blob_more_than_block() {
985        let mut state = new_profile_state(true);
986
987        let fixture = new_blob_fixture().await;
988        assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
989        let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
990        let hash;
991        let volume = fixture.volume().volume();
992
993        {
994            hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
995            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
996            // Drop recorder when finished writing to flush data.
997            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
998            recorder.record_open(blob.clone()).unwrap();
999            for i in 0..message_count {
1000                recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1001            }
1002        }
1003        state.wait_for_recording_to_finish().await.unwrap();
1004
1005        assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1006
1007        let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1008        let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1009
1010        let volume = fixture.volume().volume().clone();
1011        let task = fasync::Task::spawn(async move {
1012            let handle = Box::new(get_test_profile_handle(&volume).await);
1013            let blob = BlobVolume::new(volume);
1014            blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1015        });
1016
1017        let mut recv_count = 0;
1018        while let Ok(msg) = receiver.recv().await {
1019            assert_eq!(msg.file.root(), hash);
1020            assert_eq!(msg.offset, 4096 * recv_count);
1021            recv_count += 1;
1022        }
1023        task.await;
1024        assert_eq!(recv_count, message_count as u64);
1025
1026        fixture.close().await;
1027    }
1028
1029    #[fuchsia::test]
1030    async fn test_recording_file_more_than_block() {
1031        let mut state = new_profile_state(false);
1032
1033        let fixture = TestFixture::new().await;
1034        assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1035        let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1036        let id;
1037        let volume = fixture.volume().volume();
1038        {
1039            id = write_file(&fixture, "foo", &[88u8]).await;
1040            let node = volume
1041                .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1042                .await
1043                .unwrap();
1044            // Drop recorder when finished writing to flush data.
1045            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1046            recorder.record_open(node.clone()).unwrap();
1047            for i in 0..message_count {
1048                recorder.record(node.clone(), 4096 * i as u64).unwrap();
1049            }
1050        }
1051        state.wait_for_recording_to_finish().await.unwrap();
1052
1053        assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1054
1055        let mut local_cache: BTreeMap<u64, Option<OpenedNode<FxFile>>> = BTreeMap::new();
1056        let (sender, receiver) = async_channel::unbounded::<Request<FxFile>>();
1057
1058        let volume = fixture.volume().volume().clone();
1059        let task = fasync::Task::spawn(async move {
1060            let handle = Box::new(get_test_profile_handle(&volume).await);
1061            let file = FileVolume::new(volume);
1062            file.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1063        });
1064
1065        let mut recv_count = 0;
1066        while let Ok(msg) = receiver.recv().await {
1067            assert_eq!(msg.file.object_id(), id);
1068            assert_eq!(msg.offset, 4096 * recv_count);
1069            recv_count += 1;
1070        }
1071        task.await;
1072        assert_eq!(recv_count, message_count as u64);
1073
1074        fixture.close().await;
1075    }
1076
1077    #[fuchsia::test]
1078    async fn test_recording_more_than_io_size() {
1079        let fixture = new_blob_fixture().await;
1080
1081        {
1082            let mut state = new_profile_state(true);
1083            let message_count = (IO_SIZE as usize / size_of::<BlobMessage>()) + 1;
1084            let hash;
1085            let volume = fixture.volume().volume();
1086            {
1087                hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
1088                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1089                // Drop recorder when finished writing to flush data.
1090                let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1091                recorder.record_open(blob.clone()).unwrap();
1092                for i in 0..message_count {
1093                    recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1094                }
1095            }
1096            state.wait_for_recording_to_finish().await.unwrap();
1097            assert_eq!(get_test_profile_contents(volume).await.len(), IO_SIZE + BLOCK_SIZE);
1098
1099            let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1100            let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1101
1102            let volume = volume.clone();
1103            let task = fasync::Task::spawn(async move {
1104                let handle = Box::new(get_test_profile_handle(&volume).await);
1105                let blob = BlobVolume::new(volume);
1106                blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1107            });
1108
1109            let mut recv_count = 0;
1110            while let Ok(msg) = receiver.recv().await {
1111                assert_eq!(msg.file.root(), hash);
1112                assert_eq!(msg.offset, 4096 * recv_count);
1113                recv_count += 1;
1114            }
1115            task.await;
1116            assert_eq!(recv_count, message_count as u64);
1117        }
1118
1119        fixture.close().await;
1120    }
1121
1122    #[fuchsia::test]
1123    async fn test_replay_profile_blob() {
1124        // Create all the files that we need first, then restart the filesystem to clear cache.
1125        let mut state = new_profile_state(true);
1126
1127        let mut hashes = Vec::new();
1128
1129        let fixture = new_blob_fixture().await;
1130        {
1131            assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1132            let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
1133
1134            let volume = fixture.volume().volume();
1135            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1136            // Page in the zero offsets only to avoid readahead strangeness.
1137            for i in 0..message_count {
1138                let hash =
1139                    fixture.write_blob(i.to_le_bytes().as_slice(), CompressionMode::Never).await;
1140                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1141                recorder.record_open(blob.clone()).unwrap();
1142                hashes.push(hash);
1143                recorder.record(blob.clone(), 0).unwrap();
1144            }
1145        };
1146        let device = fixture.close().await;
1147        device.ensure_unique();
1148        state.wait_for_recording_to_finish().await.unwrap();
1149
1150        device.reopen(false);
1151        let fixture = open_blob_fixture(device).await;
1152        {
1153            // Need to get the root vmo to check committed bytes.
1154            // Ensure that nothing is paged in right now.
1155            for hash in &hashes {
1156                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1157                assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1158            }
1159
1160            let volume = fixture.volume().volume();
1161            state.replay_profile(
1162                Box::new(get_test_profile_handle(volume).await),
1163                volume.clone(),
1164                volume.scope().try_active_guard().unwrap(),
1165            );
1166
1167            // Await all data being played back by checking that things have paged in.
1168            for hash in &hashes {
1169                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1170                while blob.vmo().info().unwrap().committed_bytes == 0 {
1171                    fasync::Timer::new(Duration::from_millis(25)).await;
1172                }
1173            }
1174        }
1175        fixture.close().await;
1176    }
1177
1178    #[fuchsia::test]
1179    async fn test_replay_profile_file() {
1180        // Create all the files that we need first, then restart the filesystem to clear cache.
1181        let mut state = new_profile_state(false);
1182
1183        let mut ids = Vec::new();
1184
1185        let fixture = TestFixture::new().await;
1186        {
1187            assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1188            let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1189
1190            let volume = fixture.volume().volume();
1191            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1192            // Page in the zero offsets only to avoid readahead strangeness.
1193            for i in 0..message_count {
1194                let id = write_file(&fixture, &i.to_string(), &[88u8]).await;
1195                let node = fixture
1196                    .volume()
1197                    .volume()
1198                    .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1199                    .await
1200                    .unwrap();
1201                recorder.record_open(node.clone()).unwrap();
1202                ids.push(id);
1203                recorder.record(node.clone(), 0).unwrap();
1204            }
1205        };
1206        let device = fixture.close().await;
1207        device.ensure_unique();
1208        state.wait_for_recording_to_finish().await.unwrap();
1209
1210        device.reopen(false);
1211        let fixture = TestFixture::open(
1212            device,
1213            TestFixtureOptions { encrypted: true, format: false, ..Default::default() },
1214        )
1215        .await;
1216        {
1217            // Ensure that nothing is paged in right now.
1218            for id in &ids {
1219                let file = fixture
1220                    .volume()
1221                    .volume()
1222                    .get_or_load_node(
1223                        *id,
1224                        ObjectDescriptor::File,
1225                        Some(fixture.volume().root_dir()),
1226                    )
1227                    .await
1228                    .unwrap()
1229                    .into_any()
1230                    .downcast::<FxFile>()
1231                    .unwrap();
1232                assert_eq!(file.vmo().info().unwrap().committed_bytes, 0);
1233            }
1234
1235            let volume = fixture.volume().volume();
1236            state.replay_profile(
1237                Box::new(get_test_profile_handle(volume).await),
1238                volume.clone(),
1239                volume.scope().try_active_guard().unwrap(),
1240            );
1241
1242            // Await all data being played back by checking that things have paged in.
1243            for id in &ids {
1244                let file = fixture
1245                    .volume()
1246                    .volume()
1247                    .get_or_load_node(
1248                        *id,
1249                        ObjectDescriptor::File,
1250                        Some(fixture.volume().root_dir()),
1251                    )
1252                    .await
1253                    .unwrap()
1254                    .into_any()
1255                    .downcast::<FxFile>()
1256                    .unwrap();
1257                while file.vmo().info().unwrap().committed_bytes == 0 {
1258                    fasync::Timer::new(Duration::from_millis(25)).await;
1259                }
1260            }
1261            state.wait_for_recording_to_finish().await.unwrap();
1262        }
1263        fixture.close().await;
1264    }
1265
1266    #[fuchsia::test]
1267    async fn test_recording_during_replay() {
1268        let mut state = new_profile_state(true);
1269
1270        let hash;
1271        let first_recording;
1272        let fixture = new_blob_fixture().await;
1273        let volume = fixture.volume().volume();
1274
1275        // First make a simple recording.
1276        {
1277            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1278            hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1279            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1280            recorder.record_open(blob.clone()).unwrap();
1281            recorder.record(blob.clone(), 0).unwrap();
1282        }
1283
1284        state.wait_for_recording_to_finish().await.unwrap();
1285        first_recording = get_test_profile_contents(volume).await;
1286        assert_ne!(first_recording.len(), 0);
1287        let device = fixture.close().await;
1288        device.ensure_unique();
1289
1290        device.reopen(false);
1291        let fixture = open_blob_fixture(device).await;
1292
1293        {
1294            // Need to get the root vmo to check committed bytes.
1295            // Ensure that nothing is paged in right now.
1296            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1297            assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1298
1299            // Start recording
1300            let volume = fixture.volume().volume();
1301            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1302            recorder.record(blob.clone(), 4096).unwrap();
1303
1304            // Replay the original recording.
1305            let volume = fixture.volume().volume();
1306            state.replay_profile(
1307                Box::new(get_test_profile_handle(volume).await),
1308                volume.clone(),
1309                volume.scope().try_active_guard().unwrap(),
1310            );
1311
1312            // Await all data being played back by checking that things have paged in.
1313            {
1314                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1315                while blob.vmo().info().unwrap().committed_bytes == 0 {
1316                    fasync::Timer::new(Duration::from_millis(25)).await;
1317                }
1318            }
1319
1320            // Record the open after the replay. Needs both the before and after action to
1321            // capture anything ensuring that the two procedures overlapped.
1322            recorder.record_open(blob.clone()).unwrap();
1323        }
1324
1325        state.wait_for_recording_to_finish().await.unwrap();
1326
1327        let volume = fixture.volume().volume();
1328        let second_recording = get_test_profile_contents(volume).await;
1329        assert_ne!(second_recording.len(), 0);
1330        assert_ne!(&second_recording, &first_recording);
1331
1332        fixture.close().await;
1333    }
1334
1335    // Doesn't ensure that anything reads back properly, just that everything shuts down when
1336    // stopped early.
1337    #[fuchsia::test]
1338    async fn test_replay_profile_stop_reading_early() {
1339        let mut state = new_profile_state(true);
1340        let fixture = new_blob_fixture().await;
1341
1342        {
1343            let volume = fixture.volume().volume();
1344
1345            // Create the file that we need first.
1346            let message;
1347            {
1348                let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1349                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1350                message = BlobMessage { id: blob.root(), offset: 0 };
1351            }
1352            state.wait_for_recording_to_finish().await.unwrap();
1353
1354            // Make a profile long enough to require 2 reads.
1355            let replay_handle = Box::new(FakeReaderWriter::new());
1356            let mut buff = vec![0u8; IO_SIZE * 2];
1357            message.encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1358            message.encode_to_impl(
1359                (&mut buff[IO_SIZE..IO_SIZE + size_of::<BlobMessage>()]).try_into().unwrap(),
1360            );
1361
1362            replay_handle.inner.lock().data = buff;
1363            let delay1 = Event::new();
1364            replay_handle.push_delay(delay1.listen());
1365            let delay2 = Event::new();
1366            replay_handle.push_delay(delay2.listen());
1367
1368            state.replay_profile(
1369                replay_handle,
1370                volume.clone(),
1371                volume.scope().try_active_guard().unwrap(),
1372            );
1373
1374            // Delay the first read long enough so that the stop can be triggered during it.
1375            fasync::Task::spawn(async move {
1376                // Let the profiler wait on this a little.
1377                fasync::Timer::new(Duration::from_millis(100)).await;
1378                delay1.notify(usize::MAX);
1379            })
1380            .detach();
1381        }
1382
1383        // The reader should block indefinitely (we never notify delay2), but that shouldn't block
1384        // termination.
1385        fixture.close().await;
1386    }
1387
1388    #[fuchsia::test]
1389    async fn test_replay_blob_missing() {
1390        let fixture = new_blob_fixture().await;
1391        // Create the blob that comes after the missing blob. Ensure it still gets
1392        // recorded.
1393        let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1394        let mut buff = vec![0u8; IO_SIZE];
1395        {
1396            // First encode the blob that is missing. Just make it up. This will be skipped during
1397            // replay.
1398            {
1399                let message = BlobMessage { id: [42u8; 32].into(), offset: 0 };
1400                message
1401                    .encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1402            }
1403
1404            // Create the blob that won't be missing and encode that.
1405            {
1406                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1407                let message = BlobMessage { id: blob.root(), offset: 0 };
1408                message.encode_to_impl(
1409                    (&mut buff[size_of::<BlobMessage>()..(size_of::<BlobMessage>() * 2)])
1410                        .try_into()
1411                        .unwrap(),
1412                );
1413            }
1414        }
1415        let device = fixture.close().await;
1416        device.ensure_unique();
1417
1418        device.reopen(false);
1419        let fixture = open_blob_fixture(device).await;
1420        {
1421            let mut state = new_profile_state(true);
1422            let volume = fixture.volume().volume();
1423
1424            let replay_handle = Box::new(FakeReaderWriter::new());
1425            replay_handle.inner.lock().data = buff;
1426
1427            state.replay_profile(
1428                replay_handle,
1429                volume.clone(),
1430                volume.scope().try_active_guard().unwrap(),
1431            );
1432
1433            // Wait for the replay to populate the page.
1434            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1435            while blob.vmo().info().unwrap().committed_bytes == 0 {
1436                fasync::Timer::new(Duration::from_millis(25)).await;
1437            }
1438        }
1439        fixture.close().await;
1440    }
1441
1442    #[fuchsia::test]
1443    async fn test_replay_file_missing_or_tombstoned() {
1444        let fixture = TestFixture::new().await;
1445        let mut buff = vec![0u8; IO_SIZE];
1446        // Create the blob that comes after the missing blob. Ensure it still gets
1447        // recorded.
1448        let remaining_file_id;
1449        let tombstoned_file_id;
1450        // First encode the file that is missing.
1451        {
1452            let id = write_file(&fixture, "foo", &[1, 2, 3, 4]).await;
1453            let message = FileMessage { id, offset: 0 };
1454            message.encode_to_impl((&mut buff[0..size_of::<FileMessage>()]).try_into().unwrap());
1455        }
1456        // Remove the file now.
1457        fixture
1458            .root()
1459            .unlink("foo", &fio::UnlinkOptions::default())
1460            .await
1461            .unwrap()
1462            .expect("Unlinking");
1463
1464        // Encode the file that will be tombstoned during replay.
1465        {
1466            tombstoned_file_id = write_file(&fixture, "bar", &[1, 2, 3, 4]).await;
1467            let message = FileMessage { id: tombstoned_file_id, offset: 0 };
1468            message.encode_to_impl(
1469                (&mut buff[size_of::<FileMessage>()..(size_of::<FileMessage>() * 2)])
1470                    .try_into()
1471                    .unwrap(),
1472            );
1473        }
1474
1475        // Encode the file that will remain and be replayed last.
1476        {
1477            remaining_file_id = write_file(&fixture, "baz", &[1, 2, 3, 4]).await;
1478            let message = FileMessage { id: remaining_file_id, offset: 0 };
1479            message.encode_to_impl(
1480                (&mut buff[(size_of::<FileMessage>() * 2)..(size_of::<FileMessage>() * 3)])
1481                    .try_into()
1482                    .unwrap(),
1483            );
1484        }
1485        let device = fixture.close().await;
1486        device.ensure_unique();
1487
1488        device.reopen(false);
1489        let fixture =
1490            TestFixture::open(device, TestFixtureOptions { format: false, ..Default::default() })
1491                .await;
1492        {
1493            // Get a ref to the Arc on the file, then unlink it. Since the open count is zero it
1494            // should get marked for tombstone right away.
1495            let tombstoned_file = fixture
1496                .volume()
1497                .volume()
1498                .get_or_load_node(tombstoned_file_id, ObjectDescriptor::File, None)
1499                .await
1500                .expect("Opening file object")
1501                .into_any()
1502                .downcast::<FxFile>()
1503                .unwrap();
1504            fixture
1505                .root()
1506                .unlink("bar", &fio::UnlinkOptions::default())
1507                .await
1508                .unwrap()
1509                .expect("Unlinking");
1510
1511            let mut state = new_profile_state(false);
1512            let volume = fixture.volume().volume();
1513
1514            let replay_handle = Box::new(FakeReaderWriter::new());
1515            replay_handle.inner.lock().data = buff;
1516
1517            state.replay_profile(
1518                replay_handle,
1519                volume.clone(),
1520                volume.scope().try_active_guard().unwrap(),
1521            );
1522
1523            // Wait for the replay to populate the page.
1524            let remaining_file = fixture
1525                .volume()
1526                .volume()
1527                .get_or_load_node(remaining_file_id, ObjectDescriptor::File, None)
1528                .await
1529                .expect("Opening file object")
1530                .into_any()
1531                .downcast::<FxFile>()
1532                .unwrap();
1533            while remaining_file.vmo().info().unwrap().committed_bytes == 0 {
1534                fasync::Timer::new(Duration::from_millis(25)).await;
1535            }
1536
1537            // The tombstoned file should not have anything committed because it shouldn't be able
1538            // to open.
1539            assert_eq!(tombstoned_file.vmo().info().unwrap().committed_bytes, 0);
1540        }
1541        fixture.close().await;
1542    }
1543}