Skip to main content

fxfs_platform_testing/fuchsia/
profile.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::file::FxFile;
6use crate::fuchsia::fxblob::BlobDirectory;
7use crate::fuchsia::fxblob::blob::FxBlob;
8use crate::fuchsia::node::{FxNode, OpenedNode};
9use crate::fuchsia::pager::PagerBacked;
10use crate::fuchsia::volume::FxVolume;
11use anyhow::{Context as _, Error, anyhow, ensure};
12use arrayref::{array_refs, mut_array_refs};
13use async_trait::async_trait;
14use fuchsia_async as fasync;
15use fuchsia_hash::Hash;
16use futures::future::{self, BoxFuture, RemoteHandle, join_all};
17use futures::lock::Mutex;
18use futures::{FutureExt, select};
19use fxfs::errors::FxfsError;
20use fxfs::log::*;
21use fxfs::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle};
22use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
23use fxfs::object_store::{
24    HandleOptions, ObjectDescriptor, ObjectStore, Timestamp, VOLUME_DATA_KEY_ID, directory,
25};
26use linked_hash_map::LinkedHashMap;
27use scopeguard::ScopeGuard;
28use std::cmp::{Eq, PartialEq};
29use std::collections::btree_map::{BTreeMap, Entry};
30use std::marker::PhantomData;
31use std::mem::size_of;
32use std::pin::pin;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicU64, Ordering};
35use vfs::execution_scope::ActiveGuard;
36
37const FILE_OPEN_MARKER: u64 = u64::MAX;
38const REPLAY_THREADS: usize = 2;
39// The number of messages to buffer before sending to record. They are chunked up to reduce the
40// number of allocations in the serving threads.
41const MESSAGE_CHUNK_SIZE: usize = 64;
42const IO_SIZE: usize = 1 << 17; // 128KiB. Needs to be a power of 2 and >= block size.
43
44pub static RECORDED: AtomicU64 = AtomicU64::new(0);
45
46trait RecordedVolume: Send + Sync + Sized + Unpin {
47    type IdType: std::fmt::Display + Ord + Send + Sized;
48    type NodeType: PagerBacked;
49    type MessageType: Message<IdType = Self::IdType>;
50
51    fn new(volume: Arc<FxVolume>) -> Self;
52
53    fn volume(&self) -> &Arc<FxVolume>;
54
55    fn open(
56        &self,
57        id: Self::IdType,
58    ) -> impl std::future::Future<Output = Result<OpenedNode<Self::NodeType>, Error>> + Send;
59
60    /// Filters out open markers for files that may not be usable in the profile.
61    fn file_is_replayable(
62        &self,
63        id: &Self::IdType,
64    ) -> impl std::future::Future<Output = bool> + Send;
65
66    fn read_and_queue(
67        &self,
68        handle: Box<dyn ReadObjectHandle>,
69        sender: &async_channel::Sender<Request<Self::NodeType>>,
70        local_cache: &mut BTreeMap<Self::IdType, Option<OpenedNode<Self::NodeType>>>,
71    ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
72        async move {
73            let mut io_buf = handle.allocate_buffer(IO_SIZE).await;
74            let block_size = handle.block_size() as usize;
75            let file_size = handle.get_size() as usize;
76            let mut offset = 0;
77            while offset < file_size {
78                let actual = handle
79                    .read(offset as u64, io_buf.as_mut())
80                    .await
81                    .map_err(|e| e.context(format!("Failed to read at offset: {}", offset)))?;
82                offset += actual;
83                let mut local_offset = 0;
84                let mut next_block = block_size;
85                let mut next_offset = size_of::<Self::MessageType>();
86                while next_offset <= actual {
87                    let msg = Self::MessageType::decode_from(
88                        &io_buf.as_slice()[local_offset..next_offset],
89                    );
90
91                    local_offset = next_offset;
92                    next_offset = local_offset + size_of::<Self::MessageType>();
93                    // Messages don't overlap block boundaries.
94                    if next_offset > next_block {
95                        local_offset = next_block;
96                        next_offset = local_offset + size_of::<Self::MessageType>();
97                        next_block += block_size;
98                    }
99
100                    // Ignore trailing zeroes. This is technically a valid entry but extremely
101                    // unlikely and will only break an optimization.
102                    if msg.is_zeroes() {
103                        break;
104                    }
105
106                    let file = match local_cache.entry(msg.id()) {
107                        Entry::Occupied(entry) => match entry.get() {
108                            Some(opened_file) => (*opened_file).clone(),
109                            // Found a cached error.
110                            None => continue,
111                        },
112                        Entry::Vacant(entry) => match self.open(msg.id()).await {
113                            Err(e) => {
114                                debug!("Failed to open object {} from profile: {:?}", msg.id(), e);
115                                // Cache the error.
116                                entry.insert(None);
117                                continue;
118                            }
119                            Ok(opened_file) => {
120                                let file_clone = (*opened_file).clone();
121                                entry.insert(Some(opened_file));
122                                file_clone
123                            }
124                        },
125                    };
126
127                    sender.send(Request { file, offset: msg.offset() }).await?;
128                }
129            }
130            Ok(())
131        }
132    }
133
134    fn record(
135        &self,
136        name: &str,
137        receiver: async_channel::Receiver<Vec<Self::MessageType>>,
138    ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
139        async move {
140            let mut recorded_offsets = LinkedHashMap::<Self::MessageType, ()>::new();
141            let mut recorded_opens = BTreeMap::<Self::IdType, bool>::new();
142            while let Ok(buffer) = receiver.recv().await {
143                for message in buffer {
144                    if message.is_open_marker() {
145                        if let Entry::Vacant(entry) = recorded_opens.entry(message.id()) {
146                            let usable = self.file_is_replayable(entry.key()).await;
147                            entry.insert(usable);
148                        }
149                    } else {
150                        recorded_offsets.insert(message, ());
151                    }
152                }
153            }
154
155            // Start a recording handle. Put it in the graveyard in case we can't properly complete
156            // it.
157            let store = self.volume().store();
158            let fs = store.filesystem();
159            let mut transaction =
160                fs.clone().new_transaction(lock_keys![], Options::default()).await?;
161            let handle = ObjectStore::create_object(
162                self.volume(),
163                &mut transaction,
164                HandleOptions::default(),
165                None,
166            )
167            .await?;
168            store.add_to_graveyard(&mut transaction, handle.object_id());
169            transaction.commit().await?;
170
171            let clean_up = scopeguard::guard((), |_| {
172                fs.graveyard().queue_tombstone_object(store.store_object_id(), handle.object_id())
173            });
174
175            let block_size = handle.block_size() as usize;
176            let mut offset = 0;
177            let mut io_buf = handle.allocate_buffer(IO_SIZE).await;
178            let mut next_block = block_size;
179            while let Some((message, _)) = recorded_offsets.pop_front() {
180                // If a file opening was never recorded, or it is not usable drop the message.
181                if !recorded_opens.get(&message.id()).copied().unwrap_or(false) {
182                    continue;
183                }
184
185                let mut next_offset = offset + size_of::<Self::MessageType>();
186                if next_offset > next_block {
187                    // Zero the remainder of the block. Stopping on block boundaries allows us to
188                    // resize the I/O without supporting reading/writing half messages to a buffer.
189                    io_buf.as_mut_slice()[offset..next_block].fill(0);
190                    if next_block >= IO_SIZE {
191                        // The buffer is full.  Write it out.
192                        handle
193                            .write_or_append(None, io_buf.as_ref())
194                            .await
195                            .context("Failed to write profile block")?;
196                        offset = 0;
197                        next_offset = size_of::<Self::MessageType>();
198                        next_block = block_size;
199                    } else {
200                        offset = next_block;
201                        next_offset = offset + size_of::<Self::MessageType>();
202                        next_block += block_size;
203                    }
204                }
205                message.encode_to(&mut io_buf.as_mut_slice()[offset..next_offset]);
206                offset = next_offset;
207            }
208            if offset > 0 {
209                io_buf.as_mut_slice()[offset..next_block].fill(0);
210                handle
211                    .write_or_append(None, io_buf.subslice(0..next_block))
212                    .await
213                    .context("Failed to write profile block")?;
214            }
215
216            let profile_dir = self.volume().get_profile_directory().await?;
217
218            let mut lock_keys =
219                lock_keys![LockKey::object(store.store_object_id(), profile_dir.object_id())];
220            let mut old_id = INVALID_OBJECT_ID;
221            let mut transaction = loop {
222                let transaction = fs.clone().new_transaction(lock_keys, Options::default()).await?;
223                if let Some((id, descriptor, _)) = profile_dir.lookup(name).await? {
224                    ensure!(matches!(descriptor, ObjectDescriptor::File), FxfsError::Inconsistent);
225                    if id == old_id {
226                        break transaction;
227                    }
228                    lock_keys = lock_keys![
229                        LockKey::object(store.store_object_id(), profile_dir.object_id()),
230                        LockKey::object(store.store_object_id(), id)
231                    ];
232                    old_id = id;
233                } else {
234                    old_id = INVALID_OBJECT_ID;
235                    break transaction;
236                }
237            };
238
239            store.remove_from_graveyard(&mut transaction, handle.object_id());
240            directory::replace_child_with_object(
241                &mut transaction,
242                Some((handle.object_id(), ObjectDescriptor::File)),
243                (&profile_dir, name),
244                0,
245                Timestamp::now(),
246            )
247            .await?;
248            transaction.commit().await?;
249
250            ScopeGuard::into_inner(clean_up);
251            if old_id != INVALID_OBJECT_ID {
252                fs.graveyard().queue_tombstone_object(store.store_object_id(), old_id);
253            }
254
255            Ok(())
256        }
257    }
258}
259
260struct BlobVolume {
261    volume: Arc<FxVolume>,
262    // Cache the open blob directory here. The Mutex is just to make this Send, but it is not
263    // actually used concurrently.
264    root_dir: Mutex<Option<Arc<BlobDirectory>>>,
265}
266
267impl RecordedVolume for BlobVolume {
268    type IdType = Hash;
269    type NodeType = FxBlob;
270    type MessageType = BlobMessage;
271
272    fn new(volume: Arc<FxVolume>) -> Self {
273        Self { volume, root_dir: Mutex::new(None) }
274    }
275
276    fn volume(&self) -> &Arc<FxVolume> {
277        &self.volume
278    }
279
280    async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
281        let mut root_dir = self.root_dir.lock().await;
282        if root_dir.is_none() {
283            *root_dir = Some(
284                self.volume
285                    .get_or_load_node(
286                        self.volume.store().root_directory_object_id(),
287                        ObjectDescriptor::Directory,
288                        None,
289                    )
290                    .await?
291                    .into_any()
292                    .downcast::<BlobDirectory>()
293                    .map_err(|_| FxfsError::Inconsistent)?,
294            );
295        };
296        root_dir
297            .as_ref()
298            .unwrap()
299            .open_blob(&id.into())
300            .await?
301            .ok_or_else(|| FxfsError::NotFound.into())
302    }
303
304    async fn file_is_replayable(&self, _id: &Self::IdType) -> bool {
305        // There is nothing is filter out in blob volumes.
306        true
307    }
308}
309
310struct FileVolume {
311    volume: Arc<FxVolume>,
312}
313
314impl RecordedVolume for FileVolume {
315    type IdType = u64;
316    type NodeType = FxFile;
317    type MessageType = FileMessage;
318
319    fn new(volume: Arc<FxVolume>) -> Self {
320        Self { volume }
321    }
322
323    fn volume(&self) -> &Arc<FxVolume> {
324        &self.volume
325    }
326
327    async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
328        self.volume
329            .get_or_load_node(id, ObjectDescriptor::File, None)
330            .await?
331            .into_any()
332            .downcast::<FxFile>()
333            .map_err(|_| anyhow!("Non-file opened"))?
334            .into_opened_node()
335            .ok_or_else(|| anyhow!("File being purged"))
336    }
337
338    async fn file_is_replayable(&self, id: &Self::IdType) -> bool {
339        match self.volume.store().get_keys(*id).await {
340            // If any keys are not the volume key id, then the file may not be readable later.
341            // If there's more than one, then at least one is not the volume key.
342            Ok(keys)
343                if keys.is_empty()
344                    || (keys.len() == 1 && keys.first().unwrap().0 == VOLUME_DATA_KEY_ID) =>
345            {
346                true
347            }
348            _ => false,
349        }
350    }
351}
352
353trait Message: Eq + PartialEq + Sized + Send + Sync + std::hash::Hash + 'static {
354    type IdType: std::fmt::Display + Ord + Send + Sized;
355
356    fn id(&self) -> Self::IdType;
357    fn offset(&self) -> u64;
358    fn encode_to(&self, dest: &mut [u8]);
359    fn decode_from(src: &[u8]) -> Self;
360    fn is_zeroes(&self) -> bool;
361    fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error>;
362    fn is_open_marker(&self) -> bool;
363}
364
365#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
366struct BlobMessage {
367    id: Hash,
368    // Don't bother with offset+length. The kernel is going split up and align it one way and then
369    // we're going to change it all with read-ahead/read-around.
370    offset: u64,
371}
372
373impl BlobMessage {
374    fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
375        let (first, second) = mut_array_refs![dest, size_of::<Hash>(), size_of::<u64>()];
376        *first = self.id.into();
377        *second = self.offset.to_le_bytes();
378    }
379
380    fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
381        let (first, second) = array_refs!(src, size_of::<Hash>(), size_of::<u64>());
382        Self { id: Hash::from_array(*first), offset: u64::from_le_bytes(*second) }
383    }
384}
385
386impl Message for BlobMessage {
387    type IdType = Hash;
388
389    fn id(&self) -> Self::IdType {
390        self.id
391    }
392
393    fn offset(&self) -> u64 {
394        self.offset
395    }
396
397    fn encode_to(&self, dest: &mut [u8]) {
398        self.encode_to_impl(dest.try_into().unwrap());
399    }
400
401    fn decode_from(src: &[u8]) -> Self {
402        Self::decode_from_impl(src.try_into().unwrap())
403    }
404
405    fn is_zeroes(&self) -> bool {
406        self.id == Hash::from_array([0u8; size_of::<Hash>()]) && self.offset == 0
407    }
408
409    fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
410        match node.into_any().downcast::<FxBlob>() {
411            Ok(blob) => Ok(Self { id: blob.root(), offset }),
412            Err(_) => Err(anyhow!("Cannot record non-blob entry.")),
413        }
414    }
415
416    fn is_open_marker(&self) -> bool {
417        self.offset == FILE_OPEN_MARKER
418    }
419}
420
421#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
422struct FileMessage {
423    id: u64,
424    // Don't bother with offset+length. The kernel is going split up and align it one way and then
425    // we're going to change it all with read-ahead/read-around.
426    offset: u64,
427}
428
429impl FileMessage {
430    fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
431        let (first, second) = mut_array_refs![dest, size_of::<u64>(), size_of::<u64>()];
432        *first = self.id.to_le_bytes();
433        *second = self.offset.to_le_bytes();
434    }
435
436    fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
437        let (first, second) = array_refs!(src, size_of::<u64>(), size_of::<u64>());
438        Self { id: u64::from_le_bytes(*first), offset: u64::from_le_bytes(*second) }
439    }
440}
441
442impl Message for FileMessage {
443    type IdType = u64;
444
445    fn id(&self) -> Self::IdType {
446        self.id
447    }
448
449    fn offset(&self) -> u64 {
450        self.offset
451    }
452
453    fn encode_to(&self, dest: &mut [u8]) {
454        self.encode_to_impl(dest.try_into().unwrap())
455    }
456
457    fn decode_from(src: &[u8]) -> Self {
458        Self::decode_from_impl(src.try_into().unwrap())
459    }
460
461    fn is_zeroes(&self) -> bool {
462        self.id == 0 && self.offset == 0
463    }
464
465    fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
466        match node.into_any().downcast::<FxFile>() {
467            Ok(file) => Ok(Self { id: file.object_id(), offset }),
468            Err(_) => Err(anyhow!("Cannot record non-file entry")),
469        }
470    }
471
472    fn is_open_marker(&self) -> bool {
473        self.offset == FILE_OPEN_MARKER
474    }
475}
476
477/// Takes messages to be written into the current profile. This should be dropped before the
478/// recording is stopped to ensure that all messages have been flushed to the writer thread.
479pub trait Recorder: Send + Sync {
480    /// Record a page in request, for the given identifier and offset.
481    fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error>;
482
483    /// Record file opens to gather what files were actually used during the recording.
484    fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error>;
485}
486
487struct RecorderImpl<T: Message> {
488    sender: async_channel::Sender<Vec<T>>,
489    buffer: Vec<T>,
490}
491
492impl<T: Message> RecorderImpl<T> {
493    fn new(sender: async_channel::Sender<Vec<T>>) -> Self {
494        Self { sender, buffer: Vec::with_capacity(MESSAGE_CHUNK_SIZE) }
495    }
496}
497
498impl<T: Message> Recorder for RecorderImpl<T> {
499    fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error> {
500        self.buffer.push(T::from_node_request(node, offset)?);
501        if self.buffer.len() >= MESSAGE_CHUNK_SIZE {
502            // try_send to avoid async await, we use an unbounded channel anyways so any failure
503            // here should only be if the channel is closed, which is permanent anyways.
504            self.sender.try_send(std::mem::replace(
505                &mut self.buffer,
506                Vec::with_capacity(MESSAGE_CHUNK_SIZE),
507            ))?;
508        }
509        RECORDED.fetch_add(1, Ordering::Relaxed);
510        Ok(())
511    }
512
513    fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error> {
514        self.record(node, FILE_OPEN_MARKER)
515    }
516}
517
518impl<T: Message> Drop for RecorderImpl<T> {
519    fn drop(&mut self) {
520        // Best effort sending what messages have already been queued.
521        if self.buffer.len() > 0 {
522            let buffer = std::mem::take(&mut self.buffer);
523            let _ = self.sender.try_send(buffer);
524        }
525    }
526}
527
528struct Request<P: PagerBacked> {
529    file: Arc<P>,
530    offset: u64,
531}
532
533struct ReplayState<T> {
534    replay_threads: future::Shared<BoxFuture<'static, ()>>,
535    _cache_task: fasync::Task<()>,
536    _phantom: PhantomData<T>,
537}
538
539impl<T: RecordedVolume> ReplayState<T> {
540    fn new(handle: Box<dyn ReadObjectHandle>, volume: Arc<FxVolume>, guard: ActiveGuard) -> Self {
541        let (sender, receiver) = async_channel::unbounded::<Request<T::NodeType>>();
542
543        // Create async_channel. An async thread reads and populates the channel, then N threads
544        // consume it and touch pages.
545        let mut replay_threads = Vec::with_capacity(REPLAY_THREADS);
546        for _ in 0..REPLAY_THREADS {
547            let receiver = receiver.clone();
548            // The replay threads can have references to files so we make sure they have a guard
549            // so that shutdown will wait till they have been joined.
550            let guard = guard.clone();
551            replay_threads.push(fasync::unblock(move || {
552                let _guard = guard;
553                Self::page_in_thread(receiver);
554            }));
555        }
556        let replay_threads = (Box::pin(async {
557            join_all(replay_threads).await;
558        }) as BoxFuture<'static, ()>)
559            .shared();
560
561        let scope = volume.scope().clone();
562        let cache_task = scope
563            .spawn({
564                // The replay threads hold active guards, so we must watch for cancellation.  When
565                // cancelled, we'll drop the sender which will cause the replay threads to drop
566                // their guards, which will allow shutdown to proceed.
567                async move {
568                    let mut task = pin!(
569                        async {
570                            // Hold the items in cache until replay is stopped. Optional as None
571                            // indicates that the file could not be opened, and we want to cache that
572                            // failure.
573                            let mut local_cache: BTreeMap<
574                                T::IdType,
575                                Option<OpenedNode<T::NodeType>>,
576                            > = BTreeMap::new();
577
578                            let volume_id = volume.id();
579
580                            if let Err(error) = T::new(volume)
581                                .read_and_queue(handle, &sender, &mut local_cache)
582                                .await
583                            {
584                                error!(error:?; "Failed to read back profile");
585                            }
586                            sender.close();
587
588                            info!(
589                                "Replay for volume {} opened {} of {} objects.",
590                                volume_id,
591                                local_cache.iter().filter(|(_, e)| e.is_some()).count(),
592                                local_cache.len()
593                            );
594
595                            // Keep the cache alive until dropped.
596                            let () = std::future::pending().await;
597                        }
598                        .fuse()
599                    );
600
601                    select! {
602                        _ = task => {}
603                        _ = guard.on_cancel().fuse() => {}
604                    }
605                }
606            })
607            .into();
608
609        Self { replay_threads, _cache_task: cache_task, _phantom: PhantomData }
610    }
611
612    fn page_in_thread(queue: async_channel::Receiver<Request<T::NodeType>>) {
613        while let Ok(request) = queue.recv_blocking() {
614            let res = request.file.vmo().op_range(
615                zx::VmoOp::PREFETCH,
616                request.offset,
617                zx::system_get_page_size() as u64,
618            );
619            if let Err(e) = res {
620                warn!("Failed to prefetch page: {:?}", e);
621            }
622            // If the volume is shutdown, the sender will be dropped.
623            if queue.sender_count() == 0 {
624                return;
625            }
626        }
627    }
628}
629
630/// Holds the current profile recording and/or replay state, and provides methods for state
631/// transitions.
632#[async_trait]
633pub trait ProfileState: Send + Sync {
634    /// Creates a new recording and returns the `Recorder` object to record to. The recording
635    /// finalizes when the associated `Recorder` is dropped.  Stops any recording currently in
636    /// progress.
637    fn record_new(&mut self, volume: &Arc<FxVolume>, name: &str) -> Box<dyn Recorder>;
638
639    /// Reads given handle to parse a profile and replay it by requesting pages via
640    /// ZX_VMO_OP_PREFETCH in blocking background threads. Stops any replay currently in progress.
641    fn replay_profile(
642        &mut self,
643        handle: Box<dyn ReadObjectHandle>,
644        volume: Arc<FxVolume>,
645        guard: ActiveGuard,
646    );
647
648    /// Waits for replay to finish, but does not drop the cache.  The cache will be dropped when
649    /// the ProfileState impl is dropped.  This is fine to call multiple times.
650    async fn wait_for_replay_to_finish(&mut self);
651
652    /// Waits for the recording to finish.
653    async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error>;
654}
655
656pub fn new_profile_state(is_blob: bool) -> Box<dyn ProfileState> {
657    if is_blob {
658        Box::new(ProfileStateImpl::<BlobVolume>::new())
659    } else {
660        Box::new(ProfileStateImpl::<FileVolume>::new())
661    }
662}
663
664struct ProfileStateImpl<T> {
665    recording: Option<RemoteHandle<Result<(), Error>>>,
666    replay: Option<ReplayState<T>>,
667}
668
669impl<T> ProfileStateImpl<T> {
670    fn new() -> Self {
671        Self { recording: None, replay: None }
672    }
673}
674
675#[async_trait]
676impl<T: RecordedVolume> ProfileState for ProfileStateImpl<T> {
677    fn record_new(&mut self, volume: &Arc<FxVolume>, name: &str) -> Box<dyn Recorder> {
678        let (sender, receiver) = async_channel::unbounded();
679        let volume = volume.clone();
680        let name = name.to_string();
681        // Cancel the previous recording (if any).
682        self.recording = None;
683        let scope = volume.scope().clone();
684        let (task, remote_handle) = async move {
685            let recording = T::new(volume);
686            recording
687                .record(&name, receiver)
688                .await
689                .inspect_err(|error| warn!(error:?; "Profile recording '{name}' failed"))
690        }
691        .remote_handle();
692        self.recording = Some(remote_handle);
693        scope.spawn(task);
694        Box::new(RecorderImpl::new(sender))
695    }
696
697    fn replay_profile(
698        &mut self,
699        handle: Box<dyn ReadObjectHandle>,
700        volume: Arc<FxVolume>,
701        guard: ActiveGuard,
702    ) {
703        self.replay = Some(ReplayState::new(handle, volume, guard));
704    }
705
706    async fn wait_for_replay_to_finish(&mut self) {
707        if let Some(replay) = &mut self.replay {
708            replay.replay_threads.clone().await;
709        }
710    }
711
712    async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error> {
713        if let Some(recording) = self.recording.take() { recording.await } else { Ok(()) }
714    }
715}
716
717#[cfg(test)]
718mod tests {
719    use super::{
720        BlobMessage, BlobVolume, FileMessage, FileVolume, IO_SIZE, Message, RecordedVolume,
721        Request, new_profile_state,
722    };
723    use crate::fuchsia::file::FxFile;
724    use crate::fuchsia::fxblob::blob::FxBlob;
725    use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture, open_blob_fixture};
726    use crate::fuchsia::node::{FxNode, OpenedNode};
727    use crate::fuchsia::pager::PagerBacked;
728    use crate::fuchsia::testing::{TestFixture, TestFixtureOptions, open_file_checked};
729    use crate::fuchsia::volume::FxVolume;
730    use anyhow::Error;
731    use async_trait::async_trait;
732    use delivery_blob::CompressionMode;
733    use event_listener::{Event, EventListener};
734    use fidl_fuchsia_io as fio;
735    use fuchsia_async as fasync;
736    use fuchsia_hash::Hash;
737    use fuchsia_sync::Mutex;
738    use fxfs::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
739    use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
740    use fxfs::object_store::{DataObjectHandle, HandleOptions, ObjectDescriptor, ObjectStore};
741    use std::collections::BTreeMap;
742    use std::mem::size_of;
743    use std::sync::Arc;
744    use std::time::Duration;
745    use storage_device::buffer::{BufferRef, MutableBufferRef};
746    use storage_device::buffer_allocator::{BufferAllocator, BufferFuture, BufferSource};
747
748    struct FakeReaderWriterInner {
749        data: Vec<u8>,
750        delays: Vec<EventListener>,
751    }
752
753    struct FakeReaderWriter {
754        allocator: BufferAllocator,
755        inner: Arc<Mutex<FakeReaderWriterInner>>,
756    }
757
758    const BLOCK_SIZE: usize = 4096;
759
760    impl FakeReaderWriter {
761        fn new() -> Self {
762            Self {
763                allocator: BufferAllocator::new(BLOCK_SIZE, BufferSource::new(IO_SIZE * 2)),
764                inner: Arc::new(Mutex::new(FakeReaderWriterInner {
765                    data: Vec::new(),
766                    delays: Vec::new(),
767                })),
768            }
769        }
770
771        fn push_delay(&self, delay: EventListener) {
772            self.inner.lock().delays.insert(0, delay);
773        }
774    }
775
776    impl ObjectHandle for FakeReaderWriter {
777        fn object_id(&self) -> u64 {
778            0
779        }
780
781        fn block_size(&self) -> u64 {
782            self.allocator.block_size() as u64
783        }
784
785        fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
786            self.allocator.allocate_buffer(size)
787        }
788    }
789
790    impl WriteObjectHandle for FakeReaderWriter {
791        async fn write_or_append(
792            &self,
793            offset: Option<u64>,
794            buf: BufferRef<'_>,
795        ) -> Result<u64, Error> {
796            // We only append for now.
797            assert!(offset.is_none());
798            let delay = self.inner.lock().delays.pop();
799            if let Some(delay) = delay {
800                delay.await;
801            }
802            // This relocking has a TOCTOU flavour, but it shouldn't matter for this application.
803            self.inner.lock().data.extend_from_slice(buf.as_slice());
804            Ok(buf.len() as u64)
805        }
806
807        async fn truncate(&self, _size: u64) -> Result<(), Error> {
808            unreachable!();
809        }
810
811        async fn flush(&self) -> Result<(), Error> {
812            unreachable!();
813        }
814    }
815
816    async fn write_file(fixture: &TestFixture, name: &str, data: &[u8]) -> u64 {
817        let root_dir = fixture.volume().root_dir();
818        let mut transaction = fixture
819            .volume()
820            .volume()
821            .store()
822            .filesystem()
823            .new_transaction(
824                lock_keys![LockKey::object(
825                    fixture.volume().volume().store().store_object_id(),
826                    root_dir.object_id()
827                )],
828                Options::default(),
829            )
830            .await
831            .expect("Creating transaction for new file");
832        let id = root_dir
833            .directory()
834            .create_child_file(&mut transaction, name)
835            .await
836            .expect("Creating new_file")
837            .object_id();
838        transaction.commit().await.unwrap();
839        let file = open_file_checked(
840            fixture.root(),
841            name,
842            fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_FILE,
843            &Default::default(),
844        )
845        .await;
846        file.write(data).await.unwrap().expect("Writing file");
847        id
848    }
849
850    #[async_trait]
851    impl ReadObjectHandle for FakeReaderWriter {
852        async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
853            let delay = self.inner.lock().delays.pop();
854            if let Some(delay) = delay {
855                delay.await;
856            }
857            // This relocking has a TOCTOU flavour, but it shouldn't matter for this application.
858            let inner = self.inner.lock();
859            assert!(offset as usize <= inner.data.len());
860            let offset_end = std::cmp::min(offset as usize + buf.len(), inner.data.len());
861            let size = offset_end - offset as usize;
862            buf.as_mut_slice()[..size].clone_from_slice(&inner.data[offset as usize..offset_end]);
863            Ok(size)
864        }
865
866        fn get_size(&self) -> u64 {
867            self.inner.lock().data.len() as u64
868        }
869    }
870
871    #[fuchsia::test]
872    async fn test_encode_decode_blob() {
873        let mut buf = [0u8; size_of::<BlobMessage>()];
874        let m = BlobMessage { id: [88u8; 32].into(), offset: 77 };
875        m.encode_to(&mut buf.as_mut_slice());
876        let m2 = BlobMessage::decode_from(&buf);
877        assert_eq!(m, m2);
878    }
879
880    #[fuchsia::test]
881    async fn test_encode_decode_file() {
882        let mut buf = [0u8; size_of::<FileMessage>()];
883        let m = FileMessage { id: 88, offset: 77 };
884        m.encode_to(&mut buf.as_mut_slice());
885        let m2 = FileMessage::decode_from(&buf);
886        assert!(!m2.is_zeroes());
887        assert_eq!(m, m2);
888    }
889
890    const TEST_PROFILE_NAME: &str = "test_profile";
891
892    async fn get_test_profile_handle(volume: &Arc<FxVolume>) -> DataObjectHandle<FxVolume> {
893        let profile_dir = volume.get_profile_directory().await.unwrap();
894        ObjectStore::open_object(
895            volume,
896            profile_dir
897                .lookup(TEST_PROFILE_NAME)
898                .await
899                .expect("lookup failed")
900                .expect("not found")
901                .0,
902            HandleOptions::default(),
903            None,
904        )
905        .await
906        .unwrap()
907    }
908
909    async fn get_test_profile_contents(volume: &Arc<FxVolume>) -> Vec<u8> {
910        get_test_profile_handle(volume).await.contents(1024 * 1024).await.unwrap().to_vec()
911    }
912
913    #[fuchsia::test]
914    async fn test_recording_basic_blob() {
915        let fixture = new_blob_fixture().await;
916        {
917            let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
918            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
919
920            let mut state = new_profile_state(true);
921            let volume = fixture.volume().volume();
922
923            {
924                // Drop recorder when finished writing to flush data.
925                let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
926                recorder.record(blob.clone(), 0).unwrap();
927                recorder.record_open(blob).unwrap();
928            }
929
930            state.wait_for_recording_to_finish().await.unwrap();
931
932            assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
933        }
934        fixture.close().await;
935    }
936
937    #[fuchsia::test]
938    async fn test_recording_basic_file() {
939        let fixture = TestFixture::new().await;
940        {
941            let id = write_file(&fixture, "foo", &[88u8]).await;
942            let node = fixture
943                .volume()
944                .volume()
945                .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
946                .await
947                .unwrap();
948
949            let mut state = new_profile_state(false);
950            let volume = fixture.volume().volume();
951
952            {
953                // Drop recorder when finished writing to flush data.
954                let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
955                recorder.record(node.clone(), 0).unwrap();
956                recorder.record_open(node).unwrap();
957            }
958            state.wait_for_recording_to_finish().await.unwrap();
959
960            assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
961        }
962        fixture.close().await;
963    }
964
965    #[fuchsia::test]
966    async fn test_recording_filtered_without_open() {
967        let fixture = new_blob_fixture().await;
968        {
969            let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
970            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
971
972            let mut state = new_profile_state(true);
973            let volume = fixture.volume().volume();
974
975            {
976                // Drop recorder when finished writing to flush data.
977                let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
978                recorder.record(blob.clone(), 0).unwrap();
979            }
980            state.wait_for_recording_to_finish().await.unwrap();
981
982            assert_eq!(get_test_profile_contents(volume).await.len(), 0);
983        }
984        fixture.close().await;
985    }
986
987    #[fuchsia::test]
988    async fn test_recording_blob_more_than_block() {
989        let mut state = new_profile_state(true);
990
991        let fixture = new_blob_fixture().await;
992        assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
993        let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
994        let hash;
995        let volume = fixture.volume().volume();
996
997        {
998            hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
999            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1000            // Drop recorder when finished writing to flush data.
1001            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1002            recorder.record_open(blob.clone()).unwrap();
1003            for i in 0..message_count {
1004                recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1005            }
1006        }
1007        state.wait_for_recording_to_finish().await.unwrap();
1008
1009        assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1010
1011        let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1012        let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1013
1014        let volume = fixture.volume().volume().clone();
1015        let task = fasync::Task::spawn(async move {
1016            let handle = Box::new(get_test_profile_handle(&volume).await);
1017            let blob = BlobVolume::new(volume);
1018            blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1019        });
1020
1021        let mut recv_count = 0;
1022        while let Ok(msg) = receiver.recv().await {
1023            assert_eq!(msg.file.root(), hash);
1024            assert_eq!(msg.offset, 4096 * recv_count);
1025            recv_count += 1;
1026        }
1027        task.await;
1028        assert_eq!(recv_count, message_count as u64);
1029
1030        fixture.close().await;
1031    }
1032
1033    #[fuchsia::test]
1034    async fn test_recording_file_more_than_block() {
1035        let mut state = new_profile_state(false);
1036
1037        let fixture = TestFixture::new().await;
1038        assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1039        let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1040        let id;
1041        let volume = fixture.volume().volume();
1042        {
1043            id = write_file(&fixture, "foo", &[88u8]).await;
1044            let node = volume
1045                .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1046                .await
1047                .unwrap();
1048            // Drop recorder when finished writing to flush data.
1049            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1050            recorder.record_open(node.clone()).unwrap();
1051            for i in 0..message_count {
1052                recorder.record(node.clone(), 4096 * i as u64).unwrap();
1053            }
1054        }
1055        state.wait_for_recording_to_finish().await.unwrap();
1056
1057        assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1058
1059        let mut local_cache: BTreeMap<u64, Option<OpenedNode<FxFile>>> = BTreeMap::new();
1060        let (sender, receiver) = async_channel::unbounded::<Request<FxFile>>();
1061
1062        let volume = fixture.volume().volume().clone();
1063        let task = fasync::Task::spawn(async move {
1064            let handle = Box::new(get_test_profile_handle(&volume).await);
1065            let file = FileVolume::new(volume);
1066            file.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1067        });
1068
1069        let mut recv_count = 0;
1070        while let Ok(msg) = receiver.recv().await {
1071            assert_eq!(msg.file.object_id(), id);
1072            assert_eq!(msg.offset, 4096 * recv_count);
1073            recv_count += 1;
1074        }
1075        task.await;
1076        assert_eq!(recv_count, message_count as u64);
1077
1078        fixture.close().await;
1079    }
1080
1081    #[fuchsia::test]
1082    async fn test_recording_more_than_io_size() {
1083        let fixture = new_blob_fixture().await;
1084
1085        {
1086            let mut state = new_profile_state(true);
1087            let message_count = (IO_SIZE as usize / size_of::<BlobMessage>()) + 1;
1088            let hash;
1089            let volume = fixture.volume().volume();
1090            {
1091                hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
1092                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1093                // Drop recorder when finished writing to flush data.
1094                let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1095                recorder.record_open(blob.clone()).unwrap();
1096                for i in 0..message_count {
1097                    recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1098                }
1099            }
1100            state.wait_for_recording_to_finish().await.unwrap();
1101            assert_eq!(get_test_profile_contents(volume).await.len(), IO_SIZE + BLOCK_SIZE);
1102
1103            let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1104            let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1105
1106            let volume = volume.clone();
1107            let task = fasync::Task::spawn(async move {
1108                let handle = Box::new(get_test_profile_handle(&volume).await);
1109                let blob = BlobVolume::new(volume);
1110                blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1111            });
1112
1113            let mut recv_count = 0;
1114            while let Ok(msg) = receiver.recv().await {
1115                assert_eq!(msg.file.root(), hash);
1116                assert_eq!(msg.offset, 4096 * recv_count);
1117                recv_count += 1;
1118            }
1119            task.await;
1120            assert_eq!(recv_count, message_count as u64);
1121        }
1122
1123        fixture.close().await;
1124    }
1125
1126    #[fuchsia::test]
1127    async fn test_replay_profile_blob() {
1128        // Create all the files that we need first, then restart the filesystem to clear cache.
1129        let mut state = new_profile_state(true);
1130
1131        let mut hashes = Vec::new();
1132
1133        let fixture = new_blob_fixture().await;
1134        {
1135            assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1136            let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
1137
1138            let volume = fixture.volume().volume();
1139            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1140            // Page in the zero offsets only to avoid readahead strangeness.
1141            for i in 0..message_count {
1142                let hash =
1143                    fixture.write_blob(i.to_string().as_bytes(), CompressionMode::Never).await;
1144                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1145                recorder.record_open(blob.clone()).unwrap();
1146                hashes.push(hash);
1147                recorder.record(blob.clone(), 0).unwrap();
1148            }
1149        };
1150        let device = fixture.close().await;
1151        device.ensure_unique();
1152        state.wait_for_recording_to_finish().await.unwrap();
1153
1154        device.reopen(false);
1155        let fixture = open_blob_fixture(device).await;
1156        {
1157            // Need to get the root vmo to check committed bytes.
1158            // Ensure that nothing is paged in right now.
1159            for hash in &hashes {
1160                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1161                assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1162            }
1163
1164            let volume = fixture.volume().volume();
1165            state.replay_profile(
1166                Box::new(get_test_profile_handle(volume).await),
1167                volume.clone(),
1168                volume.scope().try_active_guard().unwrap(),
1169            );
1170
1171            // Await all data being played back by checking that things have paged in.
1172            for hash in &hashes {
1173                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1174                while blob.vmo().info().unwrap().committed_bytes == 0 {
1175                    fasync::Timer::new(Duration::from_millis(25)).await;
1176                }
1177            }
1178        }
1179        fixture.close().await;
1180    }
1181
1182    #[fuchsia::test]
1183    async fn test_replay_profile_file() {
1184        // Create all the files that we need first, then restart the filesystem to clear cache.
1185        let mut state = new_profile_state(false);
1186
1187        let mut ids = Vec::new();
1188
1189        let fixture = TestFixture::new().await;
1190        {
1191            assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1192            let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1193
1194            let volume = fixture.volume().volume();
1195            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1196            // Page in the zero offsets only to avoid readahead strangeness.
1197            for i in 0..message_count {
1198                let id = write_file(&fixture, &i.to_string(), &[88u8]).await;
1199                let node = fixture
1200                    .volume()
1201                    .volume()
1202                    .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1203                    .await
1204                    .unwrap();
1205                recorder.record_open(node.clone()).unwrap();
1206                ids.push(id);
1207                recorder.record(node.clone(), 0).unwrap();
1208            }
1209        };
1210        let device = fixture.close().await;
1211        device.ensure_unique();
1212        state.wait_for_recording_to_finish().await.unwrap();
1213
1214        device.reopen(false);
1215        let fixture = TestFixture::open(
1216            device,
1217            TestFixtureOptions { encrypted: true, format: false, ..Default::default() },
1218        )
1219        .await;
1220        {
1221            // Ensure that nothing is paged in right now.
1222            for id in &ids {
1223                let file = fixture
1224                    .volume()
1225                    .volume()
1226                    .get_or_load_node(
1227                        *id,
1228                        ObjectDescriptor::File,
1229                        Some(fixture.volume().root_dir()),
1230                    )
1231                    .await
1232                    .unwrap()
1233                    .into_any()
1234                    .downcast::<FxFile>()
1235                    .unwrap();
1236                assert_eq!(file.vmo().info().unwrap().committed_bytes, 0);
1237            }
1238
1239            let volume = fixture.volume().volume();
1240            state.replay_profile(
1241                Box::new(get_test_profile_handle(volume).await),
1242                volume.clone(),
1243                volume.scope().try_active_guard().unwrap(),
1244            );
1245
1246            // Await all data being played back by checking that things have paged in.
1247            for id in &ids {
1248                let file = fixture
1249                    .volume()
1250                    .volume()
1251                    .get_or_load_node(
1252                        *id,
1253                        ObjectDescriptor::File,
1254                        Some(fixture.volume().root_dir()),
1255                    )
1256                    .await
1257                    .unwrap()
1258                    .into_any()
1259                    .downcast::<FxFile>()
1260                    .unwrap();
1261                while file.vmo().info().unwrap().committed_bytes == 0 {
1262                    fasync::Timer::new(Duration::from_millis(25)).await;
1263                }
1264            }
1265            state.wait_for_recording_to_finish().await.unwrap();
1266        }
1267        fixture.close().await;
1268    }
1269
1270    #[fuchsia::test]
1271    async fn test_recording_during_replay() {
1272        let mut state = new_profile_state(true);
1273
1274        let hash;
1275        let first_recording;
1276        let fixture = new_blob_fixture().await;
1277        let volume = fixture.volume().volume();
1278
1279        // First make a simple recording.
1280        {
1281            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1282            hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1283            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1284            recorder.record_open(blob.clone()).unwrap();
1285            recorder.record(blob.clone(), 0).unwrap();
1286        }
1287
1288        state.wait_for_recording_to_finish().await.unwrap();
1289        first_recording = get_test_profile_contents(volume).await;
1290        assert_ne!(first_recording.len(), 0);
1291        let device = fixture.close().await;
1292        device.ensure_unique();
1293
1294        device.reopen(false);
1295        let fixture = open_blob_fixture(device).await;
1296
1297        {
1298            // Need to get the root vmo to check committed bytes.
1299            // Ensure that nothing is paged in right now.
1300            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1301            assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1302
1303            // Start recording
1304            let volume = fixture.volume().volume();
1305            let mut recorder = state.record_new(volume, TEST_PROFILE_NAME);
1306            recorder.record(blob.clone(), 4096).unwrap();
1307
1308            // Replay the original recording.
1309            let volume = fixture.volume().volume();
1310            state.replay_profile(
1311                Box::new(get_test_profile_handle(volume).await),
1312                volume.clone(),
1313                volume.scope().try_active_guard().unwrap(),
1314            );
1315
1316            // Await all data being played back by checking that things have paged in.
1317            {
1318                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1319                while blob.vmo().info().unwrap().committed_bytes == 0 {
1320                    fasync::Timer::new(Duration::from_millis(25)).await;
1321                }
1322            }
1323
1324            // Record the open after the replay. Needs both the before and after action to
1325            // capture anything ensuring that the two procedures overlapped.
1326            recorder.record_open(blob.clone()).unwrap();
1327        }
1328
1329        state.wait_for_recording_to_finish().await.unwrap();
1330
1331        let volume = fixture.volume().volume();
1332        let second_recording = get_test_profile_contents(volume).await;
1333        assert_ne!(second_recording.len(), 0);
1334        assert_ne!(&second_recording, &first_recording);
1335
1336        fixture.close().await;
1337    }
1338
1339    // Doesn't ensure that anything reads back properly, just that everything shuts down when
1340    // stopped early.
1341    #[fuchsia::test]
1342    async fn test_replay_profile_stop_reading_early() {
1343        let mut state = new_profile_state(true);
1344        let fixture = new_blob_fixture().await;
1345
1346        {
1347            let volume = fixture.volume().volume();
1348
1349            // Create the file that we need first.
1350            let message;
1351            {
1352                let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1353                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1354                message = BlobMessage { id: blob.root(), offset: 0 };
1355            }
1356            state.wait_for_recording_to_finish().await.unwrap();
1357
1358            // Make a profile long enough to require 2 reads.
1359            let replay_handle = Box::new(FakeReaderWriter::new());
1360            let mut buff = vec![0u8; IO_SIZE * 2];
1361            message.encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1362            message.encode_to_impl(
1363                (&mut buff[IO_SIZE..IO_SIZE + size_of::<BlobMessage>()]).try_into().unwrap(),
1364            );
1365
1366            replay_handle.inner.lock().data = buff;
1367            let delay1 = Event::new();
1368            replay_handle.push_delay(delay1.listen());
1369            let delay2 = Event::new();
1370            replay_handle.push_delay(delay2.listen());
1371
1372            state.replay_profile(
1373                replay_handle,
1374                volume.clone(),
1375                volume.scope().try_active_guard().unwrap(),
1376            );
1377
1378            // Delay the first read long enough so that the stop can be triggered during it.
1379            fasync::Task::spawn(async move {
1380                // Let the profiler wait on this a little.
1381                fasync::Timer::new(Duration::from_millis(100)).await;
1382                delay1.notify(usize::MAX);
1383            })
1384            .detach();
1385        }
1386
1387        // The reader should block indefinitely (we never notify delay2), but that shouldn't block
1388        // termination.
1389        fixture.close().await;
1390    }
1391
1392    #[fuchsia::test]
1393    async fn test_replay_blob_missing() {
1394        let fixture = new_blob_fixture().await;
1395        // Create the blob that comes after the missing blob. Ensure it still gets
1396        // recorded.
1397        let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1398        let mut buff = vec![0u8; IO_SIZE];
1399        {
1400            // First encode the blob that is missing. Just make it up. This will be skipped during
1401            // replay.
1402            {
1403                let message = BlobMessage { id: [42u8; 32].into(), offset: 0 };
1404                message
1405                    .encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1406            }
1407
1408            // Create the blob that won't be missing and encode that.
1409            {
1410                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1411                let message = BlobMessage { id: blob.root(), offset: 0 };
1412                message.encode_to_impl(
1413                    (&mut buff[size_of::<BlobMessage>()..(size_of::<BlobMessage>() * 2)])
1414                        .try_into()
1415                        .unwrap(),
1416                );
1417            }
1418        }
1419        let device = fixture.close().await;
1420        device.ensure_unique();
1421
1422        device.reopen(false);
1423        let fixture = open_blob_fixture(device).await;
1424        {
1425            let mut state = new_profile_state(true);
1426            let volume = fixture.volume().volume();
1427
1428            let replay_handle = Box::new(FakeReaderWriter::new());
1429            replay_handle.inner.lock().data = buff;
1430
1431            state.replay_profile(
1432                replay_handle,
1433                volume.clone(),
1434                volume.scope().try_active_guard().unwrap(),
1435            );
1436
1437            // Wait for the replay to populate the page.
1438            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1439            while blob.vmo().info().unwrap().committed_bytes == 0 {
1440                fasync::Timer::new(Duration::from_millis(25)).await;
1441            }
1442        }
1443        fixture.close().await;
1444    }
1445
1446    #[fuchsia::test]
1447    async fn test_replay_file_missing_or_tombstoned() {
1448        let fixture = TestFixture::new().await;
1449        let mut buff = vec![0u8; IO_SIZE];
1450        // Create the blob that comes after the missing blob. Ensure it still gets
1451        // recorded.
1452        let remaining_file_id;
1453        let tombstoned_file_id;
1454        // First encode the file that is missing.
1455        {
1456            let id = write_file(&fixture, "foo", &[1, 2, 3, 4]).await;
1457            let message = FileMessage { id, offset: 0 };
1458            message.encode_to_impl((&mut buff[0..size_of::<FileMessage>()]).try_into().unwrap());
1459        }
1460        // Remove the file now.
1461        fixture
1462            .root()
1463            .unlink("foo", &fio::UnlinkOptions::default())
1464            .await
1465            .unwrap()
1466            .expect("Unlinking");
1467
1468        // Encode the file that will be tombstoned during replay.
1469        {
1470            tombstoned_file_id = write_file(&fixture, "bar", &[1, 2, 3, 4]).await;
1471            let message = FileMessage { id: tombstoned_file_id, offset: 0 };
1472            message.encode_to_impl(
1473                (&mut buff[size_of::<FileMessage>()..(size_of::<FileMessage>() * 2)])
1474                    .try_into()
1475                    .unwrap(),
1476            );
1477        }
1478
1479        // Encode the file that will remain and be replayed last.
1480        {
1481            remaining_file_id = write_file(&fixture, "baz", &[1, 2, 3, 4]).await;
1482            let message = FileMessage { id: remaining_file_id, offset: 0 };
1483            message.encode_to_impl(
1484                (&mut buff[(size_of::<FileMessage>() * 2)..(size_of::<FileMessage>() * 3)])
1485                    .try_into()
1486                    .unwrap(),
1487            );
1488        }
1489        let device = fixture.close().await;
1490        device.ensure_unique();
1491
1492        device.reopen(false);
1493        let fixture =
1494            TestFixture::open(device, TestFixtureOptions { format: false, ..Default::default() })
1495                .await;
1496        {
1497            // Get a ref to the Arc on the file, then unlink it. Since the open count is zero it
1498            // should get marked for tombstone right away.
1499            let tombstoned_file = fixture
1500                .volume()
1501                .volume()
1502                .get_or_load_node(tombstoned_file_id, ObjectDescriptor::File, None)
1503                .await
1504                .expect("Opening file object")
1505                .into_any()
1506                .downcast::<FxFile>()
1507                .unwrap();
1508            fixture
1509                .root()
1510                .unlink("bar", &fio::UnlinkOptions::default())
1511                .await
1512                .unwrap()
1513                .expect("Unlinking");
1514
1515            let mut state = new_profile_state(false);
1516            let volume = fixture.volume().volume();
1517
1518            let replay_handle = Box::new(FakeReaderWriter::new());
1519            replay_handle.inner.lock().data = buff;
1520
1521            state.replay_profile(
1522                replay_handle,
1523                volume.clone(),
1524                volume.scope().try_active_guard().unwrap(),
1525            );
1526
1527            // Wait for the replay to populate the page.
1528            let remaining_file = fixture
1529                .volume()
1530                .volume()
1531                .get_or_load_node(remaining_file_id, ObjectDescriptor::File, None)
1532                .await
1533                .expect("Opening file object")
1534                .into_any()
1535                .downcast::<FxFile>()
1536                .unwrap();
1537            while remaining_file.vmo().info().unwrap().committed_bytes == 0 {
1538                fasync::Timer::new(Duration::from_millis(25)).await;
1539            }
1540
1541            // The tombstoned file should not have anything committed because it shouldn't be able
1542            // to open.
1543            assert_eq!(tombstoned_file.vmo().info().unwrap().committed_bytes, 0);
1544        }
1545        fixture.close().await;
1546    }
1547}