Skip to main content

fxfs_platform_testing/fuchsia/
profile.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::file::FxFile;
6use crate::fuchsia::fxblob::BlobDirectory;
7use crate::fuchsia::fxblob::blob::FxBlob;
8use crate::fuchsia::node::{FxNode, OpenedNode};
9use crate::fuchsia::pager::PagerBacked;
10use crate::fuchsia::volume::FxVolume;
11use anyhow::{Context as _, Error, anyhow, ensure};
12use arrayref::{array_refs, mut_array_refs};
13use async_trait::async_trait;
14use fuchsia_async as fasync;
15use fuchsia_hash::Hash;
16use futures::future::{self, BoxFuture, RemoteHandle, join_all};
17use futures::lock::Mutex;
18use futures::{FutureExt, select};
19use fxfs::errors::FxfsError;
20use fxfs::log::*;
21use fxfs::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle};
22use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
23use fxfs::object_store::{
24    DataObjectHandle, HandleOptions, ObjectDescriptor, ObjectStore, Timestamp, VOLUME_DATA_KEY_ID,
25    directory,
26};
27use linked_hash_map::LinkedHashMap;
28use scopeguard::ScopeGuard;
29use std::cmp::{Eq, PartialEq};
30use std::collections::btree_map::{BTreeMap, Entry};
31use std::marker::PhantomData;
32use std::mem::size_of;
33use std::pin::pin;
34use std::sync::Arc;
35use std::sync::atomic::{AtomicU64, Ordering};
36use vfs::execution_scope::ActiveGuard;
37
38const FILE_OPEN_MARKER: u64 = u64::MAX;
39const REPLAY_THREADS: usize = 2;
40// The number of messages to buffer before sending to record. They are chunked up to reduce the
41// number of allocations in the serving threads.
42const MESSAGE_CHUNK_SIZE: usize = 64;
43const IO_SIZE: usize = 1 << 17; // 128KiB. Needs to be a power of 2 and >= block size.
44
45pub static RECORDED: AtomicU64 = AtomicU64::new(0);
46
47/// A handle for recording a profile to.
48pub trait RecordingHandle: Send + Sync {
49    /// Append data to the handle.
50    fn append<'a>(
51        &'a self,
52        buf: storage_device::buffer::BufferRef<'a>,
53    ) -> futures::future::BoxFuture<'a, Result<u64, Error>>;
54
55    fn allocate_buffer(
56        &self,
57        size: usize,
58    ) -> futures::future::BoxFuture<'_, storage_device::buffer::Buffer<'_>>;
59
60    fn block_size(&self) -> usize;
61
62    /// The recording is finished being appended to the file. Commit it.
63    fn commit(self: Box<Self>) -> futures::future::BoxFuture<'static, Result<(), Error>>;
64
65    /// When the recording fails or is stopped prematurely this will be called to clean up the
66    /// resources, delete the backing data.
67    fn abort_cleanup(self: Box<Self>);
68}
69
70/// For placing the recording in the volume's internal profile directory.
71pub struct FileRecordingHandle {
72    name: String,
73    volume: Arc<FxVolume>,
74    handle: DataObjectHandle<FxVolume>,
75}
76
77impl FileRecordingHandle {
78    pub async fn new(name: &str, volume: Arc<FxVolume>) -> Result<Self, Error> {
79        let store = volume.store();
80        let mut transaction = store.new_transaction(lock_keys![], Options::default()).await?;
81        let handle =
82            ObjectStore::create_object(&volume, &mut transaction, HandleOptions::default(), None)
83                .await?;
84        store.add_to_graveyard(&mut transaction, handle.object_id());
85        transaction.commit().await?;
86
87        Ok(Self { name: name.to_string(), volume, handle })
88    }
89
90    async fn commit_impl(&self) -> Result<(), Error> {
91        let store = self.volume.store();
92        let fs = store.filesystem();
93        let profile_dir = self.volume.get_profile_directory().await?;
94
95        let mut lock_keys =
96            lock_keys![LockKey::object(store.store_object_id(), profile_dir.object_id())];
97        let mut old_id = INVALID_OBJECT_ID;
98        let mut transaction = loop {
99            let transaction = store.new_transaction(lock_keys, Options::default()).await?;
100            if let Some((id, descriptor, _)) = profile_dir.lookup(&self.name).await? {
101                ensure!(matches!(descriptor, ObjectDescriptor::File), FxfsError::Inconsistent);
102                if id == old_id {
103                    break transaction;
104                }
105                lock_keys = lock_keys![
106                    LockKey::object(store.store_object_id(), profile_dir.object_id()),
107                    LockKey::object(store.store_object_id(), id)
108                ];
109                old_id = id;
110            } else {
111                old_id = INVALID_OBJECT_ID;
112                break transaction;
113            }
114        };
115
116        store.remove_from_graveyard(&mut transaction, self.handle.object_id());
117        directory::replace_child_with_object(
118            &mut transaction,
119            Some((self.handle.object_id(), ObjectDescriptor::File)),
120            (&profile_dir, &self.name),
121            0,
122            false,
123            Timestamp::now(),
124        )
125        .await?;
126        transaction.commit().await?;
127
128        if old_id != INVALID_OBJECT_ID {
129            fs.graveyard().queue_tombstone_object(store.store_object_id(), old_id);
130        }
131
132        Ok(())
133    }
134}
135
136impl RecordingHandle for FileRecordingHandle {
137    fn append<'a>(
138        &'a self,
139        buf: storage_device::buffer::BufferRef<'a>,
140    ) -> futures::future::BoxFuture<'a, Result<u64, Error>> {
141        async move { self.handle.write_or_append(None, buf).await.map_err(Into::into) }.boxed()
142    }
143
144    fn allocate_buffer(
145        &self,
146        size: usize,
147    ) -> futures::future::BoxFuture<'_, storage_device::buffer::Buffer<'_>> {
148        self.handle.allocate_buffer(size).boxed()
149    }
150
151    fn block_size(&self) -> usize {
152        self.handle.block_size() as usize
153    }
154
155    fn commit(self: Box<Self>) -> futures::future::BoxFuture<'static, Result<(), Error>> {
156        async move {
157            let store = self.volume.store();
158            self.commit_impl().await.inspect_err(|_| {
159                store
160                    .filesystem()
161                    .graveyard()
162                    .queue_tombstone_object(store.store_object_id(), self.handle.object_id());
163            })
164        }
165        .boxed()
166    }
167
168    fn abort_cleanup(self: Box<Self>) {
169        let this = *self;
170        this.volume
171            .store()
172            .filesystem()
173            .graveyard()
174            .queue_tombstone_object(this.volume.store().store_object_id(), this.handle.object_id());
175    }
176}
177
178trait RecordedVolume: Send + Sync + Sized + Unpin {
179    type IdType: std::fmt::Display + Ord + Send + Sized;
180    type NodeType: PagerBacked;
181    type MessageType: Message<IdType = Self::IdType>;
182
183    fn new(volume: Arc<FxVolume>) -> Self;
184
185    fn open(
186        &self,
187        id: Self::IdType,
188    ) -> impl std::future::Future<Output = Result<OpenedNode<Self::NodeType>, Error>> + Send;
189
190    /// Filters out open markers for files that may not be usable in the profile.
191    fn file_is_replayable(
192        &self,
193        id: &Self::IdType,
194    ) -> impl std::future::Future<Output = bool> + Send;
195
196    fn read_and_queue(
197        &self,
198        handle: Box<dyn ReadObjectHandle>,
199        sender: &async_channel::Sender<Request<Self::NodeType>>,
200        local_cache: &mut BTreeMap<Self::IdType, Option<OpenedNode<Self::NodeType>>>,
201    ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
202        async move {
203            let mut io_buf = handle.allocate_buffer(IO_SIZE).await;
204            let block_size = handle.block_size() as usize;
205            let file_size = handle.get_size() as usize;
206            let mut offset = 0;
207            while offset < file_size {
208                let actual = handle
209                    .read(offset as u64, io_buf.as_mut())
210                    .await
211                    .map_err(|e| e.context(format!("Failed to read at offset: {}", offset)))?;
212                offset += actual;
213                let mut local_offset = 0;
214                let mut next_block = block_size;
215                let mut next_offset = size_of::<Self::MessageType>();
216                while next_offset <= actual {
217                    let msg = Self::MessageType::decode_from(
218                        &io_buf.as_slice()[local_offset..next_offset],
219                    );
220
221                    local_offset = next_offset;
222                    next_offset = local_offset + size_of::<Self::MessageType>();
223                    // Messages don't overlap block boundaries.
224                    if next_offset > next_block {
225                        local_offset = next_block;
226                        next_offset = local_offset + size_of::<Self::MessageType>();
227                        next_block += block_size;
228                    }
229
230                    // Ignore trailing zeroes. This is technically a valid entry but extremely
231                    // unlikely and will only break an optimization.
232                    if msg.is_zeroes() {
233                        break;
234                    }
235
236                    let file = match local_cache.entry(msg.id()) {
237                        Entry::Occupied(entry) => match entry.get() {
238                            Some(opened_file) => (*opened_file).clone(),
239                            // Found a cached error.
240                            None => continue,
241                        },
242                        Entry::Vacant(entry) => match self.open(msg.id()).await {
243                            Err(e) => {
244                                debug!("Failed to open object {} from profile: {:?}", msg.id(), e);
245                                // Cache the error.
246                                entry.insert(None);
247                                continue;
248                            }
249                            Ok(opened_file) => {
250                                let file_clone = opened_file.clone();
251                                entry.insert(Some(opened_file));
252                                file_clone
253                            }
254                        },
255                    };
256
257                    sender.send(Request { file, offset: msg.offset() }).await?;
258                }
259            }
260            Ok(())
261        }
262    }
263
264    fn record(
265        &self,
266        recording_handle: Box<dyn RecordingHandle>,
267        receiver: async_channel::Receiver<Vec<Self::MessageType>>,
268    ) -> impl std::future::Future<Output = Result<(), Error>> + Send {
269        // Ensure that this gets cleaned up if we cancel or fail anywhere.
270        let recording_handle = scopeguard::guard(recording_handle, |recording_handle| {
271            recording_handle.abort_cleanup();
272        });
273
274        async move {
275            let mut recorded_offsets = LinkedHashMap::<Self::MessageType, ()>::new();
276            let mut recorded_opens = BTreeMap::<Self::IdType, bool>::new();
277            while let Ok(buffer) = receiver.recv().await {
278                for message in buffer {
279                    if message.is_open_marker() {
280                        if let Entry::Vacant(entry) = recorded_opens.entry(message.id()) {
281                            let usable = self.file_is_replayable(entry.key()).await;
282                            entry.insert(usable);
283                        }
284                    } else {
285                        recorded_offsets.insert(message, ());
286                    }
287                }
288            }
289
290            let block_size = recording_handle.block_size();
291            let mut offset = 0;
292            let mut io_buf = recording_handle.allocate_buffer(IO_SIZE).await;
293            let mut next_block = block_size;
294            while let Some((message, _)) = recorded_offsets.pop_front() {
295                // If a file opening was never recorded, or it is not usable drop the message.
296                if !recorded_opens.get(&message.id()).copied().unwrap_or(false) {
297                    continue;
298                }
299
300                let mut next_offset = offset + size_of::<Self::MessageType>();
301                if next_offset > next_block {
302                    // Zero the remainder of the block. Stopping on block boundaries allows us to
303                    // resize the I/O without supporting reading/writing half messages to a buffer.
304                    io_buf.as_mut_slice()[offset..next_block].fill(0);
305                    if next_block >= IO_SIZE {
306                        recording_handle
307                            .append(io_buf.as_ref())
308                            .await
309                            .context("Failed to write profile block")?;
310                        offset = 0;
311                        next_offset = size_of::<Self::MessageType>();
312                        next_block = block_size;
313                    } else {
314                        offset = next_block;
315                        next_offset = offset + size_of::<Self::MessageType>();
316                        next_block += block_size;
317                    }
318                }
319                message.encode_to(&mut io_buf.as_mut_slice()[offset..next_offset]);
320                offset = next_offset;
321            }
322            if offset > 0 {
323                io_buf.as_mut_slice()[offset..next_block].fill(0);
324                recording_handle
325                    .append(io_buf.subslice(0..next_block))
326                    .await
327                    .context("Failed to write profile block")?;
328            }
329
330            std::mem::drop(io_buf);
331            // Defuse the cleanup.
332            let recording_handle = ScopeGuard::into_inner(recording_handle);
333            recording_handle.commit().await?;
334
335            Ok(())
336        }
337    }
338}
339
340struct BlobVolume {
341    volume: Arc<FxVolume>,
342    // Cache the open blob directory here. The Mutex is just to make this Send, but it is not
343    // actually used concurrently.
344    root_dir: Mutex<Option<Arc<BlobDirectory>>>,
345}
346
347impl RecordedVolume for BlobVolume {
348    type IdType = Hash;
349    type NodeType = FxBlob;
350    type MessageType = BlobMessage;
351
352    fn new(volume: Arc<FxVolume>) -> Self {
353        Self { volume, root_dir: Mutex::new(None) }
354    }
355
356    async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
357        let mut root_dir = self.root_dir.lock().await;
358        if root_dir.is_none() {
359            *root_dir = Some(
360                self.volume
361                    .get_or_load_node(
362                        self.volume.store().root_directory_object_id(),
363                        ObjectDescriptor::Directory,
364                        None,
365                    )
366                    .await?
367                    .into_any()
368                    .downcast::<BlobDirectory>()
369                    .map_err(|_| FxfsError::Inconsistent)?,
370            );
371        };
372        root_dir
373            .as_ref()
374            .unwrap()
375            .open_blob(&id.into())
376            .await?
377            .ok_or_else(|| FxfsError::NotFound.into())
378    }
379
380    async fn file_is_replayable(&self, _id: &Self::IdType) -> bool {
381        // There is nothing is filter out in blob volumes.
382        true
383    }
384}
385
386struct FileVolume {
387    volume: Arc<FxVolume>,
388}
389
390impl RecordedVolume for FileVolume {
391    type IdType = u64;
392    type NodeType = FxFile;
393    type MessageType = FileMessage;
394
395    fn new(volume: Arc<FxVolume>) -> Self {
396        Self { volume }
397    }
398
399    async fn open(&self, id: Self::IdType) -> Result<OpenedNode<Self::NodeType>, Error> {
400        self.volume
401            .get_or_load_node(id, ObjectDescriptor::File, None)
402            .await?
403            .into_any()
404            .downcast::<FxFile>()
405            .map_err(|_| anyhow!("Non-file opened"))?
406            .into_opened_node()
407            .ok_or_else(|| anyhow!("File being purged"))
408    }
409
410    async fn file_is_replayable(&self, id: &Self::IdType) -> bool {
411        match self.volume.store().get_keys(*id).await {
412            // If any keys are not the volume key id, then the file may not be readable later.
413            // If there's more than one, then at least one is not the volume key.
414            Ok(keys)
415                if keys.is_empty()
416                    || (keys.len() == 1 && keys.first().unwrap().0 == VOLUME_DATA_KEY_ID) =>
417            {
418                true
419            }
420            _ => false,
421        }
422    }
423}
424
425trait Message: Eq + PartialEq + Sized + Send + Sync + std::hash::Hash + 'static {
426    type IdType: std::fmt::Display + Ord + Send + Sized;
427
428    fn id(&self) -> Self::IdType;
429    fn offset(&self) -> u64;
430    fn encode_to(&self, dest: &mut [u8]);
431    fn decode_from(src: &[u8]) -> Self;
432    fn is_zeroes(&self) -> bool;
433    fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error>;
434    fn is_open_marker(&self) -> bool;
435}
436
437#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
438struct BlobMessage {
439    id: Hash,
440    // Don't bother with offset+length. The kernel is going split up and align it one way and then
441    // we're going to change it all with read-ahead/read-around.
442    offset: u64,
443}
444
445impl BlobMessage {
446    fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
447        let (first, second) = mut_array_refs![dest, size_of::<Hash>(), size_of::<u64>()];
448        *first = self.id.into();
449        *second = self.offset.to_le_bytes();
450    }
451
452    fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
453        let (first, second) = array_refs!(src, size_of::<Hash>(), size_of::<u64>());
454        Self { id: Hash::from_array(*first), offset: u64::from_le_bytes(*second) }
455    }
456}
457
458impl Message for BlobMessage {
459    type IdType = Hash;
460
461    fn id(&self) -> Self::IdType {
462        self.id
463    }
464
465    fn offset(&self) -> u64 {
466        self.offset
467    }
468
469    fn encode_to(&self, dest: &mut [u8]) {
470        self.encode_to_impl(dest.try_into().unwrap());
471    }
472
473    fn decode_from(src: &[u8]) -> Self {
474        Self::decode_from_impl(src.try_into().unwrap())
475    }
476
477    fn is_zeroes(&self) -> bool {
478        self.id == Hash::from_array([0u8; size_of::<Hash>()]) && self.offset == 0
479    }
480
481    fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
482        match node.into_any().downcast::<FxBlob>() {
483            Ok(blob) => Ok(Self { id: blob.root(), offset }),
484            Err(_) => Err(anyhow!("Cannot record non-blob entry.")),
485        }
486    }
487
488    fn is_open_marker(&self) -> bool {
489        self.offset == FILE_OPEN_MARKER
490    }
491}
492
493#[derive(Debug, Eq, std::hash::Hash, PartialEq)]
494struct FileMessage {
495    id: u64,
496    // Don't bother with offset+length. The kernel is going split up and align it one way and then
497    // we're going to change it all with read-ahead/read-around.
498    offset: u64,
499}
500
501impl FileMessage {
502    fn encode_to_impl(&self, dest: &mut [u8; size_of::<Self>()]) {
503        let (first, second) = mut_array_refs![dest, size_of::<u64>(), size_of::<u64>()];
504        *first = self.id.to_le_bytes();
505        *second = self.offset.to_le_bytes();
506    }
507
508    fn decode_from_impl(src: &[u8; size_of::<Self>()]) -> Self {
509        let (first, second) = array_refs!(src, size_of::<u64>(), size_of::<u64>());
510        Self { id: u64::from_le_bytes(*first), offset: u64::from_le_bytes(*second) }
511    }
512}
513
514impl Message for FileMessage {
515    type IdType = u64;
516
517    fn id(&self) -> Self::IdType {
518        self.id
519    }
520
521    fn offset(&self) -> u64 {
522        self.offset
523    }
524
525    fn encode_to(&self, dest: &mut [u8]) {
526        self.encode_to_impl(dest.try_into().unwrap())
527    }
528
529    fn decode_from(src: &[u8]) -> Self {
530        Self::decode_from_impl(src.try_into().unwrap())
531    }
532
533    fn is_zeroes(&self) -> bool {
534        self.id == 0 && self.offset == 0
535    }
536
537    fn from_node_request(node: Arc<dyn FxNode>, offset: u64) -> Result<Self, Error> {
538        match node.into_any().downcast::<FxFile>() {
539            Ok(file) => Ok(Self { id: file.object_id(), offset }),
540            Err(_) => Err(anyhow!("Cannot record non-file entry")),
541        }
542    }
543
544    fn is_open_marker(&self) -> bool {
545        self.offset == FILE_OPEN_MARKER
546    }
547}
548
549/// Takes messages to be written into the current profile. This should be dropped before the
550/// recording is stopped to ensure that all messages have been flushed to the writer thread.
551pub trait Recorder: Send + Sync {
552    /// Record a page in request, for the given identifier and offset.
553    fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error>;
554
555    /// Record file opens to gather what files were actually used during the recording.
556    fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error>;
557}
558
559struct RecorderImpl<T: Message> {
560    sender: async_channel::Sender<Vec<T>>,
561    buffer: Vec<T>,
562}
563
564impl<T: Message> RecorderImpl<T> {
565    fn new(sender: async_channel::Sender<Vec<T>>) -> Self {
566        Self { sender, buffer: Vec::with_capacity(MESSAGE_CHUNK_SIZE) }
567    }
568}
569
570impl<T: Message> Recorder for RecorderImpl<T> {
571    fn record(&mut self, node: Arc<dyn FxNode>, offset: u64) -> Result<(), Error> {
572        self.buffer.push(T::from_node_request(node, offset)?);
573        if self.buffer.len() >= MESSAGE_CHUNK_SIZE {
574            // try_send to avoid async await, we use an unbounded channel anyways so any failure
575            // here should only be if the channel is closed, which is permanent anyways.
576            self.sender.try_send(std::mem::replace(
577                &mut self.buffer,
578                Vec::with_capacity(MESSAGE_CHUNK_SIZE),
579            ))?;
580        }
581        RECORDED.fetch_add(1, Ordering::Relaxed);
582        Ok(())
583    }
584
585    fn record_open(&mut self, node: Arc<dyn FxNode>) -> Result<(), Error> {
586        self.record(node, FILE_OPEN_MARKER)
587    }
588}
589
590impl<T: Message> Drop for RecorderImpl<T> {
591    fn drop(&mut self) {
592        // Best effort sending what messages have already been queued.
593        if self.buffer.len() > 0 {
594            let buffer = std::mem::take(&mut self.buffer);
595            let _ = self.sender.try_send(buffer);
596        }
597    }
598}
599
600struct Request<P: PagerBacked> {
601    file: Arc<P>,
602    offset: u64,
603}
604
605struct ReplayState<T> {
606    replay_threads: future::Shared<BoxFuture<'static, ()>>,
607    _cache_task: fasync::Task<()>,
608    _phantom: PhantomData<T>,
609}
610
611impl<T: RecordedVolume> ReplayState<T> {
612    fn new(handle: Box<dyn ReadObjectHandle>, volume: Arc<FxVolume>, guard: ActiveGuard) -> Self {
613        let (sender, receiver) = async_channel::unbounded::<Request<T::NodeType>>();
614
615        // Create async_channel. An async thread reads and populates the channel, then N threads
616        // consume it and touch pages.
617        let mut replay_threads = Vec::with_capacity(REPLAY_THREADS);
618        for _ in 0..REPLAY_THREADS {
619            let receiver = receiver.clone();
620            // The replay threads can have references to files so we make sure they have a guard
621            // so that shutdown will wait till they have been joined.
622            let guard = guard.clone();
623            replay_threads.push(fasync::unblock(move || {
624                let _guard = guard;
625                Self::page_in_thread(receiver);
626            }));
627        }
628        let replay_threads = (Box::pin(async {
629            join_all(replay_threads).await;
630        }) as BoxFuture<'static, ()>)
631            .shared();
632
633        let scope = volume.scope().clone();
634        let cache_task = scope
635            .spawn({
636                // The replay threads hold active guards, so we must watch for cancellation.  When
637                // cancelled, we'll drop the sender which will cause the replay threads to drop
638                // their guards, which will allow shutdown to proceed.
639                async move {
640                    let mut task = pin!(
641                        async {
642                            // Hold the items in cache until replay is stopped. Optional as None
643                            // indicates that the file could not be opened, and we want to cache that
644                            // failure.
645                            let mut local_cache: BTreeMap<
646                                T::IdType,
647                                Option<OpenedNode<T::NodeType>>,
648                            > = BTreeMap::new();
649
650                            let volume_id = volume.id();
651
652                            if let Err(error) = T::new(volume)
653                                .read_and_queue(handle, &sender, &mut local_cache)
654                                .await
655                            {
656                                error!(error:?; "Failed to read back profile");
657                            }
658                            sender.close();
659
660                            info!(
661                                "Replay for volume {} opened {} of {} objects.",
662                                volume_id,
663                                local_cache.iter().filter(|(_, e)| e.is_some()).count(),
664                                local_cache.len()
665                            );
666
667                            // Keep the cache alive until dropped.
668                            let () = std::future::pending().await;
669                        }
670                        .fuse()
671                    );
672
673                    select! {
674                        _ = task => {}
675                        _ = guard.on_cancel().fuse() => {}
676                    }
677                }
678            })
679            .into();
680
681        Self { replay_threads, _cache_task: cache_task, _phantom: PhantomData }
682    }
683
684    fn page_in_thread(queue: async_channel::Receiver<Request<T::NodeType>>) {
685        while let Ok(request) = queue.recv_blocking() {
686            let res = request.file.vmo().op_range(
687                zx::VmoOp::PREFETCH,
688                request.offset,
689                zx::system_get_page_size() as u64,
690            );
691            if let Err(e) = res {
692                warn!("Failed to prefetch page: {:?}", e);
693            }
694            // If the volume is shutdown, the sender will be dropped.
695            if queue.sender_count() == 0 {
696                return;
697            }
698        }
699    }
700}
701
702/// Holds the current profile recording and/or replay state, and provides methods for state
703/// transitions.
704#[async_trait]
705pub trait ProfileState: Send + Sync {
706    /// Creates a new recording and returns the `Recorder` object to record to. The recording
707    /// finalizes when the associated `Recorder` is dropped.  Stops any recording currently in
708    /// progress.
709    fn record_new(
710        &mut self,
711        volume: &Arc<FxVolume>,
712        recording_handle: Box<dyn RecordingHandle>,
713    ) -> Box<dyn Recorder>;
714
715    /// Reads given handle to parse a profile and replay it by requesting pages via
716    /// ZX_VMO_OP_PREFETCH in blocking background threads. Stops any replay currently in progress.
717    fn replay_profile(
718        &mut self,
719        handle: Box<dyn ReadObjectHandle>,
720        volume: Arc<FxVolume>,
721        guard: ActiveGuard,
722    );
723
724    /// Waits for replay to finish, but does not drop the cache.  The cache will be dropped when
725    /// the ProfileState impl is dropped.  This is fine to call multiple times.
726    async fn wait_for_replay_to_finish(&mut self);
727
728    /// Waits for the recording to finish.
729    async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error>;
730}
731
732pub fn new_profile_state(is_blob: bool) -> Box<dyn ProfileState> {
733    if is_blob {
734        Box::new(ProfileStateImpl::<BlobVolume>::new())
735    } else {
736        Box::new(ProfileStateImpl::<FileVolume>::new())
737    }
738}
739
740struct ProfileStateImpl<T> {
741    recording: Option<RemoteHandle<Result<(), Error>>>,
742    replay: Option<ReplayState<T>>,
743}
744
745impl<T> ProfileStateImpl<T> {
746    fn new() -> Self {
747        Self { recording: None, replay: None }
748    }
749}
750
751#[async_trait]
752impl<T: RecordedVolume> ProfileState for ProfileStateImpl<T> {
753    fn record_new(
754        &mut self,
755        volume: &Arc<FxVolume>,
756        recording_handle: Box<dyn RecordingHandle>,
757    ) -> Box<dyn Recorder> {
758        let (sender, receiver) = async_channel::unbounded();
759        let volume = volume.clone();
760        // Cancel the previous recording (if any).
761        self.recording = None;
762        let scope = volume.scope().clone();
763        let (task, remote_handle) = async move {
764            let recording = T::new(volume);
765            recording
766                .record(recording_handle, receiver)
767                .await
768                .inspect_err(|error| warn!(error:?; "Profile recording failed"))
769        }
770        .remote_handle();
771        self.recording = Some(remote_handle);
772        scope.spawn(task);
773        Box::new(RecorderImpl::new(sender))
774    }
775
776    fn replay_profile(
777        &mut self,
778        handle: Box<dyn ReadObjectHandle>,
779        volume: Arc<FxVolume>,
780        guard: ActiveGuard,
781    ) {
782        self.replay = Some(ReplayState::new(handle, volume, guard));
783    }
784
785    async fn wait_for_replay_to_finish(&mut self) {
786        if let Some(replay) = &mut self.replay {
787            replay.replay_threads.clone().await;
788        }
789    }
790
791    async fn wait_for_recording_to_finish(&mut self) -> Result<(), Error> {
792        if let Some(recording) = self.recording.take() { recording.await } else { Ok(()) }
793    }
794}
795
796#[cfg(test)]
797mod tests {
798    use super::{
799        BlobMessage, BlobVolume, FileMessage, FileRecordingHandle, FileVolume, IO_SIZE, Message,
800        RecordedVolume, Request, new_profile_state,
801    };
802    use crate::fuchsia::file::FxFile;
803    use crate::fuchsia::fxblob::blob::FxBlob;
804    use crate::fuchsia::fxblob::testing::{BlobFixture, new_blob_fixture, open_blob_fixture};
805    use crate::fuchsia::node::{FxNode, OpenedNode};
806    use crate::fuchsia::pager::PagerBacked;
807    use crate::fuchsia::testing::{TestFixture, TestFixtureOptions, open_file_checked};
808    use crate::fuchsia::volume::FxVolume;
809    use anyhow::Error;
810    use async_trait::async_trait;
811    use delivery_blob::CompressionMode;
812    use event_listener::{Event, EventListener};
813    use fidl_fuchsia_io as fio;
814    use fuchsia_async as fasync;
815    use fuchsia_hash::Hash;
816    use fuchsia_sync::Mutex;
817    use fxfs::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
818    use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
819    use fxfs::object_store::{DataObjectHandle, HandleOptions, ObjectDescriptor, ObjectStore};
820    use std::collections::BTreeMap;
821    use std::mem::size_of;
822    use std::sync::Arc;
823    use std::time::Duration;
824    use storage_device::buffer::{BufferRef, MutableBufferRef};
825    use storage_device::buffer_allocator::{BufferAllocator, BufferFuture, BufferSource};
826
827    struct FakeReaderWriterInner {
828        data: Vec<u8>,
829        delays: Vec<EventListener>,
830    }
831
832    struct FakeReaderWriter {
833        allocator: BufferAllocator,
834        inner: Arc<Mutex<FakeReaderWriterInner>>,
835    }
836
837    const BLOCK_SIZE: usize = 4096;
838
839    impl FakeReaderWriter {
840        fn new() -> Self {
841            Self {
842                allocator: BufferAllocator::new(BLOCK_SIZE, BufferSource::new(IO_SIZE * 2)),
843                inner: Arc::new(Mutex::new(FakeReaderWriterInner {
844                    data: Vec::new(),
845                    delays: Vec::new(),
846                })),
847            }
848        }
849
850        fn push_delay(&self, delay: EventListener) {
851            self.inner.lock().delays.insert(0, delay);
852        }
853    }
854
855    impl ObjectHandle for FakeReaderWriter {
856        fn object_id(&self) -> u64 {
857            0
858        }
859
860        fn block_size(&self) -> u64 {
861            self.allocator.block_size() as u64
862        }
863
864        fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
865            self.allocator.allocate_buffer(size)
866        }
867    }
868
869    impl WriteObjectHandle for FakeReaderWriter {
870        async fn write_or_append(
871            &self,
872            offset: Option<u64>,
873            buf: BufferRef<'_>,
874        ) -> Result<u64, Error> {
875            // We only append for now.
876            assert!(offset.is_none());
877            let delay = self.inner.lock().delays.pop();
878            if let Some(delay) = delay {
879                delay.await;
880            }
881            // This relocking has a TOCTOU flavour, but it shouldn't matter for this application.
882            self.inner.lock().data.extend_from_slice(buf.as_slice());
883            Ok(buf.len() as u64)
884        }
885
886        async fn truncate(&self, _size: u64) -> Result<(), Error> {
887            unreachable!();
888        }
889
890        async fn flush(&self) -> Result<(), Error> {
891            unreachable!();
892        }
893    }
894
895    async fn write_file(fixture: &TestFixture, name: &str, data: &[u8]) -> u64 {
896        let root_dir = fixture.volume().root_dir();
897        let mut transaction = fixture
898            .volume()
899            .volume()
900            .store()
901            .new_transaction(
902                lock_keys![LockKey::object(
903                    fixture.volume().volume().store().store_object_id(),
904                    root_dir.object_id()
905                )],
906                Options::default(),
907            )
908            .await
909            .expect("Creating transaction for new file");
910        let id = root_dir
911            .directory()
912            .create_child_file(&mut transaction, name)
913            .await
914            .expect("Creating new_file")
915            .object_id();
916        transaction.commit().await.unwrap();
917        let file = open_file_checked(
918            fixture.root(),
919            name,
920            fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_FILE,
921            &Default::default(),
922        )
923        .await;
924        file.write(data).await.unwrap().expect("Writing file");
925        id
926    }
927
928    #[async_trait]
929    impl ReadObjectHandle for FakeReaderWriter {
930        async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
931            let delay = self.inner.lock().delays.pop();
932            if let Some(delay) = delay {
933                delay.await;
934            }
935            // This relocking has a TOCTOU flavour, but it shouldn't matter for this application.
936            let inner = self.inner.lock();
937            assert!(offset as usize <= inner.data.len());
938            let offset_end = std::cmp::min(offset as usize + buf.len(), inner.data.len());
939            let size = offset_end - offset as usize;
940            buf.as_mut_slice()[..size].clone_from_slice(&inner.data[offset as usize..offset_end]);
941            Ok(size)
942        }
943
944        fn get_size(&self) -> u64 {
945            self.inner.lock().data.len() as u64
946        }
947    }
948
949    #[fuchsia::test]
950    async fn test_encode_decode_blob() {
951        let mut buf = [0u8; size_of::<BlobMessage>()];
952        let m = BlobMessage { id: [88u8; 32].into(), offset: 77 };
953        m.encode_to(&mut buf.as_mut_slice());
954        let m2 = BlobMessage::decode_from(&buf);
955        assert_eq!(m, m2);
956    }
957
958    #[fuchsia::test]
959    async fn test_encode_decode_file() {
960        let mut buf = [0u8; size_of::<FileMessage>()];
961        let m = FileMessage { id: 88, offset: 77 };
962        m.encode_to(&mut buf.as_mut_slice());
963        let m2 = FileMessage::decode_from(&buf);
964        assert!(!m2.is_zeroes());
965        assert_eq!(m, m2);
966    }
967
968    const TEST_PROFILE_NAME: &str = "test_profile";
969
970    async fn get_test_profile_handle(volume: &Arc<FxVolume>) -> DataObjectHandle<FxVolume> {
971        let profile_dir = volume.get_profile_directory().await.unwrap();
972        ObjectStore::open_object(
973            volume,
974            profile_dir
975                .lookup(TEST_PROFILE_NAME)
976                .await
977                .expect("lookup failed")
978                .expect("not found")
979                .0,
980            HandleOptions::default(),
981            None,
982        )
983        .await
984        .unwrap()
985    }
986
987    async fn get_test_profile_contents(volume: &Arc<FxVolume>) -> Vec<u8> {
988        get_test_profile_handle(volume).await.contents(1024 * 1024).await.unwrap().to_vec()
989    }
990
991    #[fuchsia::test]
992    async fn test_recording_basic_blob() {
993        let fixture = new_blob_fixture().await;
994        {
995            let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
996            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
997
998            let mut state = new_profile_state(true);
999            let volume = fixture.volume().volume();
1000
1001            {
1002                // Drop recorder when finished writing to flush data.
1003                let handle =
1004                    FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1005                let mut recorder = state.record_new(volume, Box::new(handle));
1006                recorder.record(blob.clone(), 0).unwrap();
1007                recorder.record_open(blob).unwrap();
1008            }
1009
1010            state.wait_for_recording_to_finish().await.unwrap();
1011
1012            assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
1013        }
1014        fixture.close().await;
1015    }
1016
1017    #[fuchsia::test]
1018    async fn test_recording_basic_file() {
1019        let fixture = TestFixture::new().await;
1020        {
1021            let id = write_file(&fixture, "foo", &[88u8]).await;
1022            let node = fixture
1023                .volume()
1024                .volume()
1025                .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1026                .await
1027                .unwrap();
1028
1029            let mut state = new_profile_state(false);
1030            let volume = fixture.volume().volume();
1031
1032            {
1033                // Drop recorder when finished writing to flush data.
1034                let handle =
1035                    FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1036                let mut recorder = state.record_new(volume, Box::new(handle));
1037                recorder.record(node.clone(), 0).unwrap();
1038                recorder.record_open(node).unwrap();
1039            }
1040            state.wait_for_recording_to_finish().await.unwrap();
1041
1042            assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE);
1043        }
1044        fixture.close().await;
1045    }
1046
1047    #[fuchsia::test]
1048    async fn test_recording_filtered_without_open() {
1049        let fixture = new_blob_fixture().await;
1050        {
1051            let hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
1052            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1053
1054            let mut state = new_profile_state(true);
1055            let volume = fixture.volume().volume();
1056
1057            {
1058                // Drop recorder when finished writing to flush data.
1059                let handle =
1060                    FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1061                let mut recorder = state.record_new(volume, Box::new(handle));
1062                recorder.record(blob.clone(), 0).unwrap();
1063            }
1064            state.wait_for_recording_to_finish().await.unwrap();
1065
1066            assert_eq!(get_test_profile_contents(volume).await.len(), 0);
1067        }
1068        fixture.close().await;
1069    }
1070
1071    #[fuchsia::test]
1072    async fn test_recording_blob_more_than_block() {
1073        let mut state = new_profile_state(true);
1074
1075        let fixture = new_blob_fixture().await;
1076        assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1077        let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
1078        let hash;
1079        let volume = fixture.volume().volume();
1080
1081        {
1082            hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
1083            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1084            // Drop recorder when finished writing to flush data.
1085            let handle = FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1086            let mut recorder = state.record_new(volume, Box::new(handle));
1087            recorder.record_open(blob.clone()).unwrap();
1088            for i in 0..message_count {
1089                recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1090            }
1091        }
1092        state.wait_for_recording_to_finish().await.unwrap();
1093
1094        assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1095
1096        let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1097        let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1098
1099        let volume = fixture.volume().volume().clone();
1100        let task = fasync::Task::spawn(async move {
1101            let handle = Box::new(get_test_profile_handle(&volume).await);
1102            let blob = BlobVolume::new(volume);
1103            blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1104        });
1105
1106        let mut recv_count = 0;
1107        while let Ok(msg) = receiver.recv().await {
1108            assert_eq!(msg.file.root(), hash);
1109            assert_eq!(msg.offset, 4096 * recv_count);
1110            recv_count += 1;
1111        }
1112        task.await;
1113        assert_eq!(recv_count, message_count as u64);
1114
1115        fixture.close().await;
1116    }
1117
1118    #[fuchsia::test]
1119    async fn test_recording_file_more_than_block() {
1120        let mut state = new_profile_state(false);
1121
1122        let fixture = TestFixture::new().await;
1123        assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1124        let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1125        let id;
1126        let volume = fixture.volume().volume();
1127        {
1128            id = write_file(&fixture, "foo", &[88u8]).await;
1129            let node = volume
1130                .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1131                .await
1132                .unwrap();
1133            // Drop recorder when finished writing to flush data.
1134            let handle = FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1135            let mut recorder = state.record_new(volume, Box::new(handle));
1136            recorder.record_open(node.clone()).unwrap();
1137            for i in 0..message_count {
1138                recorder.record(node.clone(), 4096 * i as u64).unwrap();
1139            }
1140        }
1141        state.wait_for_recording_to_finish().await.unwrap();
1142
1143        assert_eq!(get_test_profile_contents(volume).await.len(), BLOCK_SIZE * 2);
1144
1145        let mut local_cache: BTreeMap<u64, Option<OpenedNode<FxFile>>> = BTreeMap::new();
1146        let (sender, receiver) = async_channel::unbounded::<Request<FxFile>>();
1147
1148        let volume = fixture.volume().volume().clone();
1149        let task = fasync::Task::spawn(async move {
1150            let handle = Box::new(get_test_profile_handle(&volume).await);
1151            let file = FileVolume::new(volume);
1152            file.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1153        });
1154
1155        let mut recv_count = 0;
1156        while let Ok(msg) = receiver.recv().await {
1157            assert_eq!(msg.file.object_id(), id);
1158            assert_eq!(msg.offset, 4096 * recv_count);
1159            recv_count += 1;
1160        }
1161        task.await;
1162        assert_eq!(recv_count, message_count as u64);
1163
1164        fixture.close().await;
1165    }
1166
1167    #[fuchsia::test]
1168    async fn test_recording_more_than_io_size() {
1169        let fixture = new_blob_fixture().await;
1170
1171        {
1172            let mut state = new_profile_state(true);
1173            let message_count = (IO_SIZE as usize / size_of::<BlobMessage>()) + 1;
1174            let hash;
1175            let volume = fixture.volume().volume();
1176            {
1177                hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
1178                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1179                // Drop recorder when finished writing to flush data.
1180                let handle =
1181                    FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1182                let mut recorder = state.record_new(volume, Box::new(handle));
1183                recorder.record_open(blob.clone()).unwrap();
1184                for i in 0..message_count {
1185                    recorder.record(blob.clone(), 4096 * i as u64).unwrap();
1186                }
1187            }
1188            state.wait_for_recording_to_finish().await.unwrap();
1189            assert_eq!(get_test_profile_contents(volume).await.len(), IO_SIZE + BLOCK_SIZE);
1190
1191            let mut local_cache: BTreeMap<Hash, Option<OpenedNode<FxBlob>>> = BTreeMap::new();
1192            let (sender, receiver) = async_channel::unbounded::<Request<FxBlob>>();
1193
1194            let volume = volume.clone();
1195            let task = fasync::Task::spawn(async move {
1196                let handle = Box::new(get_test_profile_handle(&volume).await);
1197                let blob = BlobVolume::new(volume);
1198                blob.read_and_queue(handle, &sender, &mut local_cache).await.unwrap();
1199            });
1200
1201            let mut recv_count = 0;
1202            while let Ok(msg) = receiver.recv().await {
1203                assert_eq!(msg.file.root(), hash);
1204                assert_eq!(msg.offset, 4096 * recv_count);
1205                recv_count += 1;
1206            }
1207            task.await;
1208            assert_eq!(recv_count, message_count as u64);
1209        }
1210
1211        fixture.close().await;
1212    }
1213
1214    #[fuchsia::test]
1215    async fn test_replay_profile_blob() {
1216        // Create all the files that we need first, then restart the filesystem to clear cache.
1217        let mut state = new_profile_state(true);
1218
1219        let mut hashes = Vec::new();
1220
1221        let fixture = new_blob_fixture().await;
1222        {
1223            assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1224            let message_count = (fixture.fs().block_size() as usize / size_of::<BlobMessage>()) + 1;
1225
1226            let volume = fixture.volume().volume();
1227            let handle = FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1228            let mut recorder = state.record_new(volume, Box::new(handle));
1229            // Page in the zero offsets only to avoid readahead strangeness.
1230            for i in 0..message_count {
1231                let hash =
1232                    fixture.write_blob(i.to_string().as_bytes(), CompressionMode::Never).await;
1233                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1234                recorder.record_open(blob.clone()).unwrap();
1235                hashes.push(hash);
1236                recorder.record(blob.clone(), 0).unwrap();
1237            }
1238        };
1239        let device = fixture.close().await;
1240        device.ensure_unique();
1241        state.wait_for_recording_to_finish().await.unwrap();
1242
1243        device.reopen(false);
1244        let fixture = open_blob_fixture(device).await;
1245        {
1246            // Need to get the root vmo to check committed bytes.
1247            // Ensure that nothing is paged in right now.
1248            for hash in &hashes {
1249                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1250                assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1251            }
1252
1253            let volume = fixture.volume().volume();
1254            state.replay_profile(
1255                Box::new(get_test_profile_handle(volume).await),
1256                volume.clone(),
1257                volume.scope().try_active_guard().unwrap(),
1258            );
1259
1260            // Await all data being played back by checking that things have paged in.
1261            for hash in &hashes {
1262                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
1263                while blob.vmo().info().unwrap().committed_bytes == 0 {
1264                    fasync::Timer::new(Duration::from_millis(25)).await;
1265                }
1266            }
1267        }
1268        fixture.close().await;
1269    }
1270
1271    #[fuchsia::test]
1272    async fn test_replay_profile_file() {
1273        // Create all the files that we need first, then restart the filesystem to clear cache.
1274        let mut state = new_profile_state(false);
1275
1276        let mut ids = Vec::new();
1277
1278        let fixture = TestFixture::new().await;
1279        {
1280            assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
1281            let message_count = (fixture.fs().block_size() as usize / size_of::<FileMessage>()) + 1;
1282
1283            let volume = fixture.volume().volume();
1284            let handle = FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1285            let mut recorder = state.record_new(volume, Box::new(handle));
1286            // Page in the zero offsets only to avoid readahead strangeness.
1287            for i in 0..message_count {
1288                let id = write_file(&fixture, &i.to_string(), &[88u8]).await;
1289                let node = fixture
1290                    .volume()
1291                    .volume()
1292                    .get_or_load_node(id, ObjectDescriptor::File, Some(fixture.volume().root_dir()))
1293                    .await
1294                    .unwrap();
1295                recorder.record_open(node.clone()).unwrap();
1296                ids.push(id);
1297                recorder.record(node.clone(), 0).unwrap();
1298            }
1299        };
1300        let device = fixture.close().await;
1301        device.ensure_unique();
1302        state.wait_for_recording_to_finish().await.unwrap();
1303
1304        device.reopen(false);
1305        let fixture = TestFixture::open(
1306            device,
1307            TestFixtureOptions { encrypted: true, format: false, ..Default::default() },
1308        )
1309        .await;
1310        {
1311            // Ensure that nothing is paged in right now.
1312            for id in &ids {
1313                let file = fixture
1314                    .volume()
1315                    .volume()
1316                    .get_or_load_node(
1317                        *id,
1318                        ObjectDescriptor::File,
1319                        Some(fixture.volume().root_dir()),
1320                    )
1321                    .await
1322                    .unwrap()
1323                    .into_any()
1324                    .downcast::<FxFile>()
1325                    .unwrap();
1326                assert_eq!(file.vmo().info().unwrap().committed_bytes, 0);
1327            }
1328
1329            let volume = fixture.volume().volume();
1330            state.replay_profile(
1331                Box::new(get_test_profile_handle(volume).await),
1332                volume.clone(),
1333                volume.scope().try_active_guard().unwrap(),
1334            );
1335
1336            // Await all data being played back by checking that things have paged in.
1337            for id in &ids {
1338                let file = fixture
1339                    .volume()
1340                    .volume()
1341                    .get_or_load_node(
1342                        *id,
1343                        ObjectDescriptor::File,
1344                        Some(fixture.volume().root_dir()),
1345                    )
1346                    .await
1347                    .unwrap()
1348                    .into_any()
1349                    .downcast::<FxFile>()
1350                    .unwrap();
1351                while file.vmo().info().unwrap().committed_bytes == 0 {
1352                    fasync::Timer::new(Duration::from_millis(25)).await;
1353                }
1354            }
1355            state.wait_for_recording_to_finish().await.unwrap();
1356        }
1357        fixture.close().await;
1358    }
1359
1360    #[fuchsia::test]
1361    async fn test_recording_during_replay() {
1362        let mut state = new_profile_state(true);
1363
1364        let hash;
1365        let first_recording;
1366        let fixture = new_blob_fixture().await;
1367        let volume = fixture.volume().volume();
1368
1369        // First make a simple recording.
1370        {
1371            let handle = FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1372            let mut recorder = state.record_new(volume, Box::new(handle));
1373            hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1374            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1375            recorder.record_open(blob.clone()).unwrap();
1376            recorder.record(blob.clone(), 0).unwrap();
1377        }
1378
1379        state.wait_for_recording_to_finish().await.unwrap();
1380        first_recording = get_test_profile_contents(volume).await;
1381        assert_ne!(first_recording.len(), 0);
1382        let device = fixture.close().await;
1383        device.ensure_unique();
1384
1385        device.reopen(false);
1386        let fixture = open_blob_fixture(device).await;
1387
1388        {
1389            // Need to get the root vmo to check committed bytes.
1390            // Ensure that nothing is paged in right now.
1391            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1392            assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
1393
1394            // Start recording
1395            let volume = fixture.volume().volume();
1396            let handle = FileRecordingHandle::new(TEST_PROFILE_NAME, volume.clone()).await.unwrap();
1397            let mut recorder = state.record_new(volume, Box::new(handle));
1398            recorder.record(blob.clone(), 4096).unwrap();
1399
1400            // Replay the original recording.
1401            let volume = fixture.volume().volume();
1402            state.replay_profile(
1403                Box::new(get_test_profile_handle(volume).await),
1404                volume.clone(),
1405                volume.scope().try_active_guard().unwrap(),
1406            );
1407
1408            // Await all data being played back by checking that things have paged in.
1409            {
1410                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1411                while blob.vmo().info().unwrap().committed_bytes == 0 {
1412                    fasync::Timer::new(Duration::from_millis(25)).await;
1413                }
1414            }
1415
1416            // Record the open after the replay. Needs both the before and after action to
1417            // capture anything ensuring that the two procedures overlapped.
1418            recorder.record_open(blob.clone()).unwrap();
1419        }
1420
1421        state.wait_for_recording_to_finish().await.unwrap();
1422
1423        let volume = fixture.volume().volume();
1424        let second_recording = get_test_profile_contents(volume).await;
1425        assert_ne!(second_recording.len(), 0);
1426        assert_ne!(&second_recording, &first_recording);
1427
1428        fixture.close().await;
1429    }
1430
1431    // Doesn't ensure that anything reads back properly, just that everything shuts down when
1432    // stopped early.
1433    #[fuchsia::test]
1434    async fn test_replay_profile_stop_reading_early() {
1435        let mut state = new_profile_state(true);
1436        let fixture = new_blob_fixture().await;
1437
1438        {
1439            let volume = fixture.volume().volume();
1440
1441            // Create the file that we need first.
1442            let message;
1443            {
1444                let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1445                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1446                message = BlobMessage { id: blob.root(), offset: 0 };
1447            }
1448            state.wait_for_recording_to_finish().await.unwrap();
1449
1450            // Make a profile long enough to require 2 reads.
1451            let replay_handle = Box::new(FakeReaderWriter::new());
1452            let mut buff = vec![0u8; IO_SIZE * 2];
1453            message.encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1454            message.encode_to_impl(
1455                (&mut buff[IO_SIZE..IO_SIZE + size_of::<BlobMessage>()]).try_into().unwrap(),
1456            );
1457
1458            replay_handle.inner.lock().data = buff;
1459            let delay1 = Event::new();
1460            replay_handle.push_delay(delay1.listen());
1461            let delay2 = Event::new();
1462            replay_handle.push_delay(delay2.listen());
1463
1464            state.replay_profile(
1465                replay_handle,
1466                volume.clone(),
1467                volume.scope().try_active_guard().unwrap(),
1468            );
1469
1470            // Delay the first read long enough so that the stop can be triggered during it.
1471            fasync::Task::spawn(async move {
1472                // Let the profiler wait on this a little.
1473                fasync::Timer::new(Duration::from_millis(100)).await;
1474                delay1.notify(usize::MAX);
1475            })
1476            .detach();
1477        }
1478
1479        // The reader should block indefinitely (we never notify delay2), but that shouldn't block
1480        // termination.
1481        fixture.close().await;
1482    }
1483
1484    #[fuchsia::test]
1485    async fn test_replay_blob_missing() {
1486        let fixture = new_blob_fixture().await;
1487        // Create the blob that comes after the missing blob. Ensure it still gets
1488        // recorded.
1489        let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
1490        let mut buff = vec![0u8; IO_SIZE];
1491        {
1492            // First encode the blob that is missing. Just make it up. This will be skipped during
1493            // replay.
1494            {
1495                let message = BlobMessage { id: [42u8; 32].into(), offset: 0 };
1496                message
1497                    .encode_to_impl((&mut buff[0..size_of::<BlobMessage>()]).try_into().unwrap());
1498            }
1499
1500            // Create the blob that won't be missing and encode that.
1501            {
1502                let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1503                let message = BlobMessage { id: blob.root(), offset: 0 };
1504                message.encode_to_impl(
1505                    (&mut buff[size_of::<BlobMessage>()..(size_of::<BlobMessage>() * 2)])
1506                        .try_into()
1507                        .unwrap(),
1508                );
1509            }
1510        }
1511        let device = fixture.close().await;
1512        device.ensure_unique();
1513
1514        device.reopen(false);
1515        let fixture = open_blob_fixture(device).await;
1516        {
1517            let mut state = new_profile_state(true);
1518            let volume = fixture.volume().volume();
1519
1520            let replay_handle = Box::new(FakeReaderWriter::new());
1521            replay_handle.inner.lock().data = buff;
1522
1523            state.replay_profile(
1524                replay_handle,
1525                volume.clone(),
1526                volume.scope().try_active_guard().unwrap(),
1527            );
1528
1529            // Wait for the replay to populate the page.
1530            let blob = fixture.get_blob((*hash).into()).await.expect("Opening blob");
1531            while blob.vmo().info().unwrap().committed_bytes == 0 {
1532                fasync::Timer::new(Duration::from_millis(25)).await;
1533            }
1534        }
1535        fixture.close().await;
1536    }
1537
1538    #[fuchsia::test]
1539    async fn test_replay_file_missing_or_tombstoned() {
1540        let fixture = TestFixture::new().await;
1541        let mut buff = vec![0u8; IO_SIZE];
1542        // Create the blob that comes after the missing blob. Ensure it still gets
1543        // recorded.
1544        let remaining_file_id;
1545        let tombstoned_file_id;
1546        // First encode the file that is missing.
1547        {
1548            let id = write_file(&fixture, "foo", &[1, 2, 3, 4]).await;
1549            let message = FileMessage { id, offset: 0 };
1550            message.encode_to_impl((&mut buff[0..size_of::<FileMessage>()]).try_into().unwrap());
1551        }
1552        // Remove the file now.
1553        fixture
1554            .root()
1555            .unlink("foo", &fio::UnlinkOptions::default())
1556            .await
1557            .unwrap()
1558            .expect("Unlinking");
1559
1560        // Encode the file that will be tombstoned during replay.
1561        {
1562            tombstoned_file_id = write_file(&fixture, "bar", &[1, 2, 3, 4]).await;
1563            let message = FileMessage { id: tombstoned_file_id, offset: 0 };
1564            message.encode_to_impl(
1565                (&mut buff[size_of::<FileMessage>()..(size_of::<FileMessage>() * 2)])
1566                    .try_into()
1567                    .unwrap(),
1568            );
1569        }
1570
1571        // Encode the file that will remain and be replayed last.
1572        {
1573            remaining_file_id = write_file(&fixture, "baz", &[1, 2, 3, 4]).await;
1574            let message = FileMessage { id: remaining_file_id, offset: 0 };
1575            message.encode_to_impl(
1576                (&mut buff[(size_of::<FileMessage>() * 2)..(size_of::<FileMessage>() * 3)])
1577                    .try_into()
1578                    .unwrap(),
1579            );
1580        }
1581        let device = fixture.close().await;
1582        device.ensure_unique();
1583
1584        device.reopen(false);
1585        let fixture =
1586            TestFixture::open(device, TestFixtureOptions { format: false, ..Default::default() })
1587                .await;
1588        {
1589            // Get a ref to the Arc on the file, then unlink it. Since the open count is zero it
1590            // should get marked for tombstone right away.
1591            let tombstoned_file = fixture
1592                .volume()
1593                .volume()
1594                .get_or_load_node(tombstoned_file_id, ObjectDescriptor::File, None)
1595                .await
1596                .expect("Opening file object")
1597                .into_any()
1598                .downcast::<FxFile>()
1599                .unwrap();
1600            fixture
1601                .root()
1602                .unlink("bar", &fio::UnlinkOptions::default())
1603                .await
1604                .unwrap()
1605                .expect("Unlinking");
1606
1607            let mut state = new_profile_state(false);
1608            let volume = fixture.volume().volume();
1609
1610            let replay_handle = Box::new(FakeReaderWriter::new());
1611            replay_handle.inner.lock().data = buff;
1612
1613            state.replay_profile(
1614                replay_handle,
1615                volume.clone(),
1616                volume.scope().try_active_guard().unwrap(),
1617            );
1618
1619            // Wait for the replay to populate the page.
1620            let remaining_file = fixture
1621                .volume()
1622                .volume()
1623                .get_or_load_node(remaining_file_id, ObjectDescriptor::File, None)
1624                .await
1625                .expect("Opening file object")
1626                .into_any()
1627                .downcast::<FxFile>()
1628                .unwrap();
1629            while remaining_file.vmo().info().unwrap().committed_bytes == 0 {
1630                fasync::Timer::new(Duration::from_millis(25)).await;
1631            }
1632
1633            // The tombstoned file should not have anything committed because it shouldn't be able
1634            // to open.
1635            assert_eq!(tombstoned_file.vmo().info().unwrap().committed_bytes, 0);
1636        }
1637        fixture.close().await;
1638    }
1639}