Skip to main content

fxfs_platform/fuchsia/
volume.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::component::map_to_raw_status;
6use crate::fuchsia::directory::FxDirectory;
7use crate::fuchsia::dirent_cache::DirentCache;
8use crate::fuchsia::file::FxFile;
9use crate::fuchsia::memory_pressure::{MemoryPressureLevel, MemoryPressureMonitor};
10use crate::fuchsia::node::{FxNode, GetResult, NodeCache};
11use crate::fuchsia::pager::Pager;
12use crate::fuchsia::profile::ProfileState;
13use crate::fuchsia::symlink::FxSymlink;
14use crate::fuchsia::volumes_directory::VolumesDirectory;
15use anyhow::{Error, bail, ensure};
16use async_trait::async_trait;
17use fidl::endpoints::ServerEnd;
18use fidl_fuchsia_fxfs::{
19    BytesAndNodes, FileBackedVolumeProviderRequest, FileBackedVolumeProviderRequestStream,
20    ProjectIdRequest, ProjectIdRequestStream, ProjectIterToken,
21};
22use fidl_fuchsia_io as fio;
23use fs_inspect::{FsInspectVolume, VolumeData};
24use fuchsia_async as fasync;
25use fuchsia_async::epoch::Epoch;
26use fuchsia_sync::Mutex;
27use futures::channel::oneshot;
28use futures::stream::{self, FusedStream, Stream};
29use futures::{FutureExt, StreamExt, TryStreamExt};
30use fxfs::errors::FxfsError;
31use fxfs::filesystem::{self, SyncOptions};
32use fxfs::future_with_guard::FutureWithGuard;
33use fxfs::log::*;
34use fxfs::object_store::directory::Directory;
35use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
36use fxfs::object_store::{DirType, HandleOptions, HandleOwner, ObjectDescriptor, ObjectStore};
37use refaults_vmo::PageRefaultCounter;
38use std::future::Future;
39use std::pin::pin;
40#[cfg(any(test, feature = "testing"))]
41use std::sync::atomic::AtomicBool;
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::{Arc, Weak};
44use std::time::Duration;
45use vfs::directory::entry::DirectoryEntry;
46use vfs::directory::simple::Simple;
47use vfs::execution_scope::ExecutionScope;
48
49// LINT.IfChange
50// TODO:(b/299919008) Fix this number to something reasonable, or maybe just for fxblob.
51const DIRENT_CACHE_LIMIT: usize = 2000;
52// LINT.ThenChange(//src/storage/stressor/src/aggressive.rs)
53
54/// The read ahead/around size to target. Increase reads to be this size within the restrictions of
55/// the format for the target object.
56pub const READ_AHEAD_SIZE: u64 = 128 * 1024;
57
58const PROFILE_DIRECTORY: &str = "profiles";
59
60#[derive(Clone, Copy, Debug)]
61pub struct MemoryPressureLevelConfig {
62    /// The period to wait between flushes, as well as perform other background maintenance tasks
63    /// (e.g. purging caches).
64    pub background_task_period: Duration,
65
66    /// The limit of cached nodes.
67    pub cache_size_limit: usize,
68
69    /// The initial delay before the background task runs. The background task has a longer initial
70    /// delay to avoid running the task during boot.
71    pub background_task_initial_delay: Duration,
72}
73
74impl Default for MemoryPressureLevelConfig {
75    fn default() -> Self {
76        Self {
77            background_task_period: Duration::from_secs(20),
78            cache_size_limit: DIRENT_CACHE_LIMIT,
79            background_task_initial_delay: Duration::from_secs(70),
80        }
81    }
82}
83
84#[derive(Clone, Copy, Debug)]
85pub struct MemoryPressureConfig {
86    /// The configuration to use at [`MemoryPressureLevel::Normal`].
87    pub mem_normal: MemoryPressureLevelConfig,
88
89    /// The configuration to use at [`MemoryPressureLevel::Warning`].
90    pub mem_warning: MemoryPressureLevelConfig,
91
92    /// The configuration to use at [`MemoryPressureLevel::Critical`].
93    pub mem_critical: MemoryPressureLevelConfig,
94}
95
96impl MemoryPressureConfig {
97    pub fn for_level(&self, level: &MemoryPressureLevel) -> &MemoryPressureLevelConfig {
98        match level {
99            MemoryPressureLevel::Normal => &self.mem_normal,
100            MemoryPressureLevel::Warning => &self.mem_warning,
101            MemoryPressureLevel::Critical => &self.mem_critical,
102        }
103    }
104}
105
106impl Default for MemoryPressureConfig {
107    fn default() -> Self {
108        // TODO(https://fxbug.dev/42061389): investigate a smarter strategy for determining flush
109        // frequency.
110        Self {
111            mem_normal: MemoryPressureLevelConfig {
112                background_task_period: Duration::from_secs(20),
113                cache_size_limit: DIRENT_CACHE_LIMIT,
114                background_task_initial_delay: Duration::from_secs(70),
115            },
116            mem_warning: MemoryPressureLevelConfig {
117                background_task_period: Duration::from_secs(5),
118                cache_size_limit: 100,
119                background_task_initial_delay: Duration::from_secs(5),
120            },
121            mem_critical: MemoryPressureLevelConfig {
122                background_task_period: Duration::from_millis(1500),
123                cache_size_limit: 20,
124                background_task_initial_delay: Duration::from_millis(1500),
125            },
126        }
127    }
128}
129
130/// FxVolume represents an opened volume. It is also a (weak) cache for all opened Nodes within the
131/// volume.
132pub struct FxVolume {
133    parent: Weak<VolumesDirectory>,
134    cache: NodeCache,
135    store: Arc<ObjectStore>,
136    pager: Pager,
137    executor: fasync::EHandle,
138    name: String,
139
140    // A tuple of the actual task and a channel to signal to terminate the task.
141    background_task: Mutex<Option<(fasync::Task<()>, oneshot::Sender<()>)>>,
142
143    // Unique identifier of the filesystem that owns this volume.
144    fs_id: u64,
145
146    // The execution scope for this volume.
147    scope: ExecutionScope,
148
149    dirent_cache: DirentCache,
150
151    profile_state: Mutex<Option<Box<dyn ProfileState>>>,
152
153    #[cfg(any(test, feature = "testing"))]
154    poisoned: AtomicBool,
155
156    blob_resupplied_count: Arc<PageRefaultCounter>,
157
158    /// The number of dirty bytes in pager backed VMOs that belong to this volume. VolumesDirectory
159    /// holds a count for all volumes. This count is only used for tracing.
160    pager_dirty_byte_count: AtomicU64,
161}
162
163#[fxfs_trace::trace]
164impl FxVolume {
165    pub fn new(
166        parent: Weak<VolumesDirectory>,
167        store: Arc<ObjectStore>,
168        fs_id: u64,
169        name: String,
170        blob_resupplied_count: Arc<PageRefaultCounter>,
171        memory_pressure_config: MemoryPressureConfig,
172    ) -> Result<Self, Error> {
173        let scope = ExecutionScope::new();
174        Ok(Self {
175            parent,
176            cache: NodeCache::new(),
177            store,
178            name,
179            pager: Pager::new(scope.clone())?,
180            executor: fasync::EHandle::local(),
181            background_task: Mutex::new(None),
182            fs_id,
183            scope,
184            dirent_cache: DirentCache::new(memory_pressure_config.mem_normal.cache_size_limit),
185            profile_state: Mutex::new(None),
186            #[cfg(any(test, feature = "testing"))]
187            poisoned: AtomicBool::new(false),
188            blob_resupplied_count,
189            pager_dirty_byte_count: AtomicU64::new(0),
190        })
191    }
192
193    pub fn store(&self) -> &Arc<ObjectStore> {
194        &self.store
195    }
196
197    pub fn cache(&self) -> &NodeCache {
198        &self.cache
199    }
200
201    pub fn dirent_cache(&self) -> &DirentCache {
202        &self.dirent_cache
203    }
204
205    pub fn pager(&self) -> &Pager {
206        &self.pager
207    }
208
209    pub fn id(&self) -> u64 {
210        self.fs_id
211    }
212
213    pub fn scope(&self) -> &ExecutionScope {
214        &self.scope
215    }
216
217    pub fn blob_resupplied_count(&self) -> &PageRefaultCounter {
218        &self.blob_resupplied_count
219    }
220
221    pub fn name(&self) -> &str {
222        &self.name
223    }
224
225    /// Reports the filesystem info, but if the volume has a space limit applied then the space
226    /// available and space used are reported based on the volume instead.
227    pub fn filesystem_info_for_volume(&self) -> fio::FilesystemInfo {
228        let allocator = self.store.filesystem().allocator();
229        let info =
230            if let Some(limit) = allocator.get_owner_bytes_limit(self.store.store_object_id()) {
231                filesystem::Info {
232                    used_bytes: allocator.get_owner_bytes_used(self.store.store_object_id()),
233                    total_bytes: limit,
234                }
235            } else {
236                self.store.filesystem().get_info()
237            };
238
239        info_to_filesystem_info(info, self.store.block_size(), self.store.object_count(), self.id())
240    }
241
242    /// Stop profiling, recover resources from it and finalize recordings.
243    pub async fn stop_profile_tasks(self: &Arc<Self>) {
244        let Some(mut state) = self.profile_state.lock().take() else { return };
245        state.wait_for_replay_to_finish().await;
246        self.pager.set_recorder(None);
247        let _ = state.wait_for_recording_to_finish().await;
248    }
249
250    /// Opens or creates the profile directory in the volume's internal directory.
251    pub async fn get_profile_directory(self: &Arc<Self>) -> Result<Directory<FxVolume>, Error> {
252        let internal_dir = self
253            .get_or_create_internal_dir()
254            .await
255            .map_err(|e| e.context("Opening internal directory"))?;
256        // Have to do separate calls to create the profile dir if necessary.
257        let mut transaction = self
258            .store()
259            .filesystem()
260            .new_transaction(
261                lock_keys![LockKey::object(
262                    self.store().store_object_id(),
263                    internal_dir.object_id(),
264                )],
265                Options::default(),
266            )
267            .await?;
268        Ok(match internal_dir.directory().lookup(PROFILE_DIRECTORY).await? {
269            Some((object_id, _, _)) => {
270                Directory::open_unchecked(self.clone(), object_id, DirType::Normal)
271            }
272            None => {
273                let new_dir = internal_dir
274                    .directory()
275                    .create_child_dir(&mut transaction, PROFILE_DIRECTORY)
276                    .await?;
277                transaction.commit().await?;
278                new_dir
279            }
280        })
281    }
282
283    /// Starts recording a profile for the volume under the name given, and if a profile exists
284    /// under that same name it is replayed and will be replaced after by the new recording if it
285    /// is cleanly shutdown and finalized.
286    pub async fn record_or_replay_profile(
287        self: &Arc<Self>,
288        mut state: Box<dyn ProfileState>,
289        name: &str,
290    ) -> Result<(), Error> {
291        // We don't meddle in FxDirectory or FxFile here because we don't want a paged object.
292        // Normally we ensure that there's only one copy by using the Node cache on the volume, but
293        // that would create FxFile, so in this case we just assume that only one profile operation
294        // should be ongoing at a time, as that is ensured in `VolumesDirectory`.
295
296        // If there is a recording already, prepare to replay it.
297        let profile_dir = self.get_profile_directory().await?;
298        let replay_handle = if let Some((id, descriptor, _)) = profile_dir.lookup(name).await? {
299            ensure!(matches!(descriptor, ObjectDescriptor::File), FxfsError::Inconsistent);
300            Some(Box::new(
301                ObjectStore::open_object(self, id, HandleOptions::default(), None).await?,
302            ))
303        } else {
304            None
305        };
306
307        let mut profile_state = self.profile_state.lock();
308
309        info!("Recording new profile '{name}' for volume object {}", self.store.store_object_id());
310        // Begin recording first to ensure that we capture any activity from the replay.
311        self.pager.set_recorder(Some(state.record_new(self, name)));
312        if let Some(handle) = replay_handle {
313            if let Some(guard) = self.scope().try_active_guard() {
314                state.replay_profile(handle, self.clone(), guard);
315                info!(
316                    "Replaying existing profile '{name}' for volume object {}",
317                    self.store.store_object_id()
318                );
319            }
320        }
321        *profile_state = Some(state);
322        Ok(())
323    }
324
325    async fn get_or_create_internal_dir(self: &Arc<Self>) -> Result<Arc<FxDirectory>, Error> {
326        let internal_data_id = self.store().get_or_create_internal_directory_id().await?;
327        let internal_dir = self
328            .get_or_load_node(internal_data_id, ObjectDescriptor::Directory, None)
329            .await?
330            .into_any()
331            .downcast::<FxDirectory>()
332            .unwrap();
333        Ok(internal_dir)
334    }
335
336    pub async fn terminate(&self) {
337        let task = std::mem::replace(&mut *self.background_task.lock(), None);
338        if let Some((task, terminate)) = task {
339            let _ = terminate.send(());
340            task.await;
341        }
342
343        // `NodeCache::terminate` will break any strong reference cycles contained within nodes
344        // (pager registration). The only remaining nodes should be those with open FIDL
345        // connections or vmo references in the process of handling the VMO_ZERO_CHILDREN signal.
346        // `ExecutionScope::shutdown` + `ExecutionScope::wait` will close the open FIDL connections
347        // and synchonrize the signal handling which should result in all nodes flushing and then
348        // dropping. Any async tasks required to flush a node should take an active guard on the
349        // `ExecutionScope` which will prevent `ExecutionScope::wait` from completing until all
350        // nodes are flushed.
351        self.scope.shutdown();
352        self.cache.terminate();
353        self.scope.wait().await;
354
355        // Make sure there are no deferred operations still pending for this volume.
356        Epoch::global().barrier().await;
357
358        // The dirent_cache must be cleared *after* shutting down the scope because there can be
359        // tasks that insert entries into the cache.
360        self.dirent_cache.clear();
361
362        if self.store.filesystem().options().read_only {
363            // If the filesystem is read only, we don't need to flush/sync anything.
364            if self.store.is_unlocked() {
365                self.store.lock_read_only();
366            }
367            return;
368        }
369
370        self.flush_all_files(true).await;
371        self.store.filesystem().graveyard().flush().await;
372        if self.store.crypt().is_some() {
373            if let Err(e) = self.store.lock().await {
374                // The store will be left in a safe state and there won't be data-loss unless
375                // there's an issue flushing the journal later.
376                warn!(error:? = e; "Locking store error");
377            }
378        }
379        let sync_status = self
380            .store
381            .filesystem()
382            .sync(SyncOptions { flush_device: true, ..Default::default() })
383            .await;
384        if let Err(e) = sync_status {
385            error!(error:? = e; "Failed to sync filesystem; data may be lost");
386        }
387    }
388
389    /// Attempts to get a node from the node cache. If the node wasn't present in the cache, loads
390    /// the object from the object store, installing the returned node into the cache and returns
391    /// the newly created FxNode backed by the loaded object.  |parent| is only set on the node if
392    /// the node was not present in the cache.  Otherwise, it is ignored.
393    pub async fn get_or_load_node(
394        self: &Arc<Self>,
395        object_id: u64,
396        object_descriptor: ObjectDescriptor,
397        parent: Option<Arc<FxDirectory>>,
398    ) -> Result<Arc<dyn FxNode>, Error> {
399        match self.cache.get_or_reserve(object_id).await {
400            GetResult::Node(node) => Ok(node),
401            GetResult::Placeholder(placeholder) => {
402                let node = match object_descriptor {
403                    ObjectDescriptor::File => FxFile::new(
404                        ObjectStore::open_object(self, object_id, HandleOptions::default(), None)
405                            .await?,
406                    ) as Arc<dyn FxNode>,
407                    ObjectDescriptor::Directory => {
408                        // Can't use open_unchecked because we don't know if the dir is casefolded
409                        // or encrypted.
410                        Arc::new(FxDirectory::new(parent, Directory::open(self, object_id).await?))
411                            as Arc<dyn FxNode>
412                    }
413                    ObjectDescriptor::Symlink => Arc::new(FxSymlink::new(self.clone(), object_id)),
414                    _ => bail!(FxfsError::Inconsistent),
415                };
416                placeholder.commit(&node);
417                Ok(node)
418            }
419        }
420    }
421
422    /// Marks the given directory deleted.
423    pub fn mark_directory_deleted(&self, object_id: u64) {
424        if let Some(node) = self.cache.get(object_id) {
425            // It's possible that node is a placeholder, in which case we don't need to wait for it
426            // to be resolved because it should be blocked behind the locks that are held by the
427            // caller, and once they're dropped, it'll be found to be deleted via the tree.
428            if let Ok(dir) = node.into_any().downcast::<FxDirectory>() {
429                dir.set_deleted();
430            }
431        }
432    }
433
434    /// Removes resources associated with |object_id| (which ought to be a file), if there are no
435    /// open connections to that file.
436    ///
437    /// This must be called *after committing* a transaction which deletes the last reference to
438    /// |object_id|, since before that point, new connections could be established.
439    pub(super) async fn maybe_purge_file(&self, object_id: u64) -> Result<(), Error> {
440        if let Some(node) = self.cache.get(object_id) {
441            node.mark_to_be_purged();
442            return Ok(());
443        }
444        // If this fails, the graveyard should clean it up on next mount.
445        self.store
446            .tombstone_object(
447                object_id,
448                Options { borrow_metadata_space: true, ..Default::default() },
449            )
450            .await?;
451        Ok(())
452    }
453
454    /// Starts the background work task.  This task will periodically:
455    ///   - scan all files and flush them to disk, and
456    ///   - purge unused cached data.
457    /// The task will hold a strong reference to the FxVolume while it is running, so the task must
458    /// be closed later with Self::terminate, or the FxVolume will never be dropped.
459    pub fn start_background_task(
460        self: &Arc<Self>,
461        config: MemoryPressureConfig,
462        mem_monitor: Option<&MemoryPressureMonitor>,
463    ) {
464        let mut background_task = self.background_task.lock();
465        if background_task.is_none() {
466            let (tx, rx) = oneshot::channel();
467
468            let task = if let Some(mem_monitor) = mem_monitor {
469                fasync::Task::spawn(self.clone().background_task(
470                    config,
471                    mem_monitor.get_level_stream(),
472                    rx,
473                ))
474            } else {
475                // With no memory pressure monitoring, just stub the stream out as always pending.
476                fasync::Task::spawn(self.clone().background_task(config, stream::pending(), rx))
477            };
478
479            *background_task = Some((task, tx));
480        }
481    }
482
483    #[trace]
484    async fn background_task(
485        self: Arc<Self>,
486        config: MemoryPressureConfig,
487        mut level_stream: impl Stream<Item = MemoryPressureLevel> + FusedStream + Unpin,
488        terminate: oneshot::Receiver<()>,
489    ) {
490        debug!(store_id = self.store.store_object_id(); "FxVolume::background_task start");
491        let mut terminate = terminate.fuse();
492        // Default to the normal period until updates come from the `level_stream`.
493        let mut level = MemoryPressureLevel::Normal;
494        let mut timer =
495            pin!(fasync::Timer::new(config.for_level(&level).background_task_initial_delay));
496
497        loop {
498            let mut should_terminate = false;
499            let mut should_flush = false;
500            let mut low_mem = false;
501            let mut should_purge_layer_files = false;
502            let mut should_update_cache_limit = false;
503
504            futures::select_biased! {
505                _ = terminate => should_terminate = true,
506                new_level = level_stream.next() => {
507                    // Because `level_stream` will never terminate, this is safe to unwrap.
508                    let new_level = new_level.unwrap();
509                    // At critical levels, it's okay to undertake expensive work immediately
510                    // to reclaim memory.
511                    low_mem = matches!(new_level, MemoryPressureLevel::Critical);
512                    should_purge_layer_files = true;
513                    if new_level != level {
514                        level = new_level;
515                        should_update_cache_limit = true;
516                        let level_config = config.for_level(&level);
517                        timer.as_mut().reset(fasync::MonotonicInstant::after(
518                            level_config.background_task_period.into())
519                        );
520                        debug!(
521                            "Background task period changed to {:?} due to new memory pressure \
522                            level ({:?}).",
523                            config.for_level(&level).background_task_period, level
524                        );
525                    }
526                }
527                _ = timer => {
528                    timer.as_mut().reset(fasync::MonotonicInstant::after(
529                        config.for_level(&level).background_task_period.into())
530                    );
531                    should_flush = true;
532                    // Only purge layer file caches once we have elevated memory pressure.
533                    should_purge_layer_files = !matches!(level, MemoryPressureLevel::Normal);
534                }
535            };
536            if should_terminate {
537                break;
538            }
539            // Maybe close extra files *before* iterating them for flush/low mem.
540            if should_update_cache_limit {
541                self.dirent_cache.set_limit(config.for_level(&level).cache_size_limit);
542            }
543            if should_flush {
544                self.flush_all_files(false).await;
545                self.dirent_cache.recycle_stale_files();
546            } else if low_mem {
547                // This is a softer version of flushing files, so don't bother if we're flushing.
548                self.minimize_memory().await;
549            }
550            if should_purge_layer_files {
551                for layer in self.store.tree().immutable_layer_set().layers {
552                    layer.purge_cached_data();
553                }
554            }
555        }
556        debug!(store_id = self.store.store_object_id(); "FxVolume::background_task end");
557    }
558
559    /// Reports that a certain number of bytes will be dirtied in a pager-backed VMO.
560    ///
561    /// Note that this function may await flush tasks.
562    pub fn report_pager_dirty(
563        self: Arc<Self>,
564        byte_count: u64,
565        mark_dirty: impl FnOnce() + Send + 'static,
566    ) {
567        let this = self.clone();
568        let callback = move || {
569            mark_dirty();
570            let prev = this.pager_dirty_byte_count.fetch_add(byte_count, Ordering::Relaxed);
571            fxfs_trace::counter!("dirty-bytes", 0, this.name => prev.saturating_add(byte_count));
572        };
573        if let Some(parent) = self.parent.upgrade() {
574            parent.report_pager_dirty(byte_count, self, callback);
575        } else {
576            callback();
577        }
578    }
579
580    /// Reports that a certain number of bytes were cleaned in a pager-backed VMO.
581    pub fn report_pager_clean(&self, byte_count: u64) {
582        if let Some(parent) = self.parent.upgrade() {
583            parent.report_pager_clean(byte_count);
584        }
585        let prev = self.pager_dirty_byte_count.fetch_sub(byte_count, Ordering::Relaxed);
586        fxfs_trace::counter!("dirty-bytes", 0, self.name => prev.saturating_sub(byte_count));
587    }
588
589    #[trace]
590    pub async fn flush_all_files(&self, last_chance: bool) {
591        let mut flushed = 0;
592        for file in self.cache.files() {
593            if let Some(node) = file.into_opened_node() {
594                if let Err(e) = FxFile::flush(&node, last_chance).await {
595                    warn!(
596                        store_id = self.store.store_object_id(),
597                        oid = node.object_id(),
598                        error:? = e;
599                        "Failed to flush",
600                    )
601                }
602                if last_chance {
603                    let file = node.clone();
604                    std::mem::drop(node);
605                    file.force_clean();
606                }
607            }
608            flushed += 1;
609        }
610        debug!(store_id = self.store.store_object_id(), file_count = flushed; "FxVolume flushed");
611    }
612
613    /// Flushes only files with dirty pages.
614    ///
615    /// `PagedObjectHandle` tracks the number dirty pages locally (except for overwrite files) which
616    /// makes determining whether flushing a file will reduce the number of dirty pages efficient.
617    /// `flush_all_files` checks if any metadata needs to be flushed which involves a syscall making
618    /// it significantly slower when there are lots of open files without dirty pages.
619    #[trace]
620    pub async fn minimize_memory(&self) {
621        for file in self.cache.files() {
622            if let Some(node) = file.into_opened_node() {
623                if let Err(e) = node.handle().minimize_memory().await {
624                    warn!(
625                        store_id = self.store.store_object_id(),
626                        oid = node.object_id(),
627                        error:? = e;
628                        "Failed to flush",
629                    )
630                }
631            }
632        }
633    }
634
635    /// Spawns a short term task for the volume that includes a guard that will prevent termination.
636    pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) {
637        if let Some(guard) = self.scope.try_active_guard() {
638            self.executor.spawn_detached(FutureWithGuard::new(guard, task));
639        }
640    }
641
642    /// Tries to unwrap this volume.  If it fails, it will poison the volume so that when it is
643    /// dropped, you get a backtrace.
644    #[cfg(any(test, feature = "testing"))]
645    pub fn try_unwrap(self: Arc<Self>) -> Option<FxVolume> {
646        self.poisoned.store(true, Ordering::Relaxed);
647        match Arc::try_unwrap(self) {
648            Ok(volume) => {
649                volume.poisoned.store(false, Ordering::Relaxed);
650                Some(volume)
651            }
652            Err(this) => {
653                // Log details about all the places where there might be a reference cycle.
654                info!(
655                    "background_task: {}, profile_state: {}, dirent_cache count: {}, \
656                     pager strong file refs={}, no tasks={}",
657                    this.background_task.lock().is_some(),
658                    this.profile_state.lock().is_some(),
659                    this.dirent_cache.len(),
660                    crate::pager::STRONG_FILE_REFS.load(Ordering::Relaxed),
661                    {
662                        let mut no_tasks = pin!(this.scope.wait());
663                        no_tasks
664                            .poll_unpin(&mut std::task::Context::from_waker(
665                                std::task::Waker::noop(),
666                            ))
667                            .is_ready()
668                    },
669                );
670                None
671            }
672        }
673    }
674
675    pub async fn handle_file_backed_volume_provider_requests(
676        this: Weak<Self>,
677        scope: ExecutionScope,
678        mut requests: FileBackedVolumeProviderRequestStream,
679    ) -> Result<(), Error> {
680        while let Some(request) = requests.try_next().await? {
681            match request {
682                FileBackedVolumeProviderRequest::Open {
683                    parent_directory_token,
684                    name,
685                    server_end,
686                    control_handle: _,
687                } => {
688                    // Try and get an active guard before upgrading.
689                    let Some(_guard) = scope.try_active_guard() else {
690                        bail!("Volume shutting down")
691                    };
692                    let Some(this) = this.upgrade() else { bail!("FxVolume dropped") };
693                    match this
694                        .scope
695                        .token_registry()
696                        // NB: For now, we only expect these calls in a regular (non-blob) volume.
697                        // Hard-code the type for simplicity; attempts to call on a blob volume will
698                        // get an error.
699                        .get_owner(parent_directory_token)
700                        .and_then(|dir| {
701                            dir.ok_or(zx::Status::BAD_HANDLE).and_then(|dir| {
702                                dir.into_any()
703                                    .downcast::<FxDirectory>()
704                                    .map_err(|_| zx::Status::BAD_HANDLE)
705                            })
706                        }) {
707                        Ok(dir) => {
708                            dir.open_block_file(&name, server_end).await;
709                        }
710                        Err(status) => {
711                            let _ = server_end.close_with_epitaph(status).unwrap_or_else(|e| {
712                                error!(error:? = e; "open failed to send epitaph");
713                            });
714                        }
715                    }
716                }
717            }
718        }
719        Ok(())
720    }
721
722    pub async fn handle_project_id_requests(
723        this: Weak<Self>,
724        scope: ExecutionScope,
725        mut requests: ProjectIdRequestStream,
726    ) -> Result<(), Error> {
727        while let Some(request) = requests.try_next().await? {
728            // Try and get an active guard before upgrading.
729            let Some(_guard) = scope.try_active_guard() else { bail!("Volume shutting down") };
730            let Some(this) = this.upgrade() else { bail!("FxVolume dropped") };
731            let store_id = this.store.store_object_id();
732
733            match request {
734                ProjectIdRequest::SetLimit { responder, project_id, bytes, nodes } => responder
735                    .send(
736                    this.store().set_project_limit(project_id, bytes, nodes).await.map_err(
737                        |error| {
738                            error!(error:?, store_id, project_id; "Failed to set project limit");
739                            map_to_raw_status(error)
740                        },
741                    ),
742                )?,
743                ProjectIdRequest::Clear { responder, project_id } => responder.send(
744                    this.store().clear_project_limit(project_id).await.map_err(|error| {
745                        error!(error:?, store_id, project_id; "Failed to clear project limit");
746                        map_to_raw_status(error)
747                    }),
748                )?,
749                ProjectIdRequest::SetForNode { responder, node_id, project_id } => {
750                    responder
751                        .send(this.store().set_project_for_node(node_id, project_id).await.map_err(
752                        |error| {
753                            error!(error:?, store_id, node_id, project_id; "Failed to apply node.");
754                            map_to_raw_status(error)
755                        },
756                    ))?
757                }
758                ProjectIdRequest::GetForNode { responder, node_id } => responder.send(
759                    this.store().get_project_for_node(node_id).await.map_err(|error| {
760                        error!(error:?, store_id, node_id; "Failed to get node.");
761                        map_to_raw_status(error)
762                    }),
763                )?,
764                ProjectIdRequest::ClearForNode { responder, node_id } => responder.send(
765                    this.store().clear_project_for_node(node_id).await.map_err(|error| {
766                        error!(error:?, store_id, node_id; "Failed to clear for node.");
767                        map_to_raw_status(error)
768                    }),
769                )?,
770                ProjectIdRequest::List { responder, token } => {
771                    responder.send(match this.list_projects(&token).await {
772                        Ok((ref entries, ref next_token)) => Ok((entries, next_token.as_ref())),
773                        Err(error) => {
774                            error!(error:?, store_id, token:?; "Failed to list projects.");
775                            Err(map_to_raw_status(error))
776                        }
777                    })?
778                }
779                ProjectIdRequest::Info { responder, project_id } => {
780                    responder.send(match this.project_info(project_id).await {
781                        Ok((ref limit, ref usage)) => Ok((limit, usage)),
782                        Err(error) => {
783                            error!(error:?, store_id, project_id; "Failed to get project info.");
784                            Err(map_to_raw_status(error))
785                        }
786                    })?
787                }
788            }
789        }
790        Ok(())
791    }
792
793    // Maximum entries to fit based on 64KiB message size minus 16 bytes of header, 16 bytes
794    // of vector header, 16 bytes for the optional token header, and 8 bytes of token value.
795    // https://fuchsia.dev/fuchsia-src/development/languages/fidl/guides/max-out-pagination
796    const MAX_PROJECT_ENTRIES: usize = 8184;
797
798    // Calls out to the inner volume to list available projects, removing and re-adding the fidl
799    // wrapper types for the pagination token.
800    async fn list_projects(
801        &self,
802        last_token: &Option<Box<ProjectIterToken>>,
803    ) -> Result<(Vec<u64>, Option<ProjectIterToken>), Error> {
804        let (entries, token) = self
805            .store()
806            .list_projects(
807                match last_token {
808                    None => 0,
809                    Some(v) => v.value,
810                },
811                Self::MAX_PROJECT_ENTRIES,
812            )
813            .await?;
814        Ok((entries, token.map(|value| ProjectIterToken { value })))
815    }
816
817    async fn project_info(&self, project_id: u64) -> Result<(BytesAndNodes, BytesAndNodes), Error> {
818        let (limit, usage) = self.store().project_info(project_id).await?;
819        // At least one of them needs to be around to return anything.
820        ensure!(limit.is_some() || usage.is_some(), FxfsError::NotFound);
821        Ok((
822            limit.map_or_else(
823                || BytesAndNodes { bytes: u64::MAX, nodes: u64::MAX },
824                |v| BytesAndNodes { bytes: v.0, nodes: v.1 },
825            ),
826            usage.map_or_else(
827                || BytesAndNodes { bytes: 0, nodes: 0 },
828                |v| BytesAndNodes { bytes: v.0, nodes: v.1 },
829            ),
830        ))
831    }
832}
833
834#[cfg(any(test, feature = "testing"))]
835impl Drop for FxVolume {
836    fn drop(&mut self) {
837        assert!(!*self.poisoned.get_mut());
838    }
839}
840
841impl HandleOwner for FxVolume {}
842
843impl AsRef<ObjectStore> for FxVolume {
844    fn as_ref(&self) -> &ObjectStore {
845        &self.store
846    }
847}
848
849#[async_trait]
850impl FsInspectVolume for FxVolume {
851    async fn get_volume_data(&self) -> Option<VolumeData> {
852        // Don't try to return data if the volume is shutting down.
853        let _guard = self.scope.try_active_guard()?;
854
855        let object_count = self.store().object_count();
856        let (used_bytes, bytes_limit) =
857            self.store.filesystem().allocator().owner_allocation_info(self.store.store_object_id());
858        let encrypted = self.store().crypt().is_some();
859        let port_koid = fasync::EHandle::local().port().as_handle_ref().koid().unwrap().raw_koid();
860        Some(VolumeData { bytes_limit, used_bytes, used_nodes: object_count, encrypted, port_koid })
861    }
862}
863
864pub trait RootDir: FxNode + DirectoryEntry {
865    fn as_directory_entry(self: Arc<Self>) -> Arc<dyn DirectoryEntry>;
866
867    fn serve(self: Arc<Self>, flags: fio::Flags, server_end: ServerEnd<fio::DirectoryMarker>);
868
869    fn as_node(self: Arc<Self>) -> Arc<dyn FxNode>;
870
871    fn register_additional_volume_services(
872        self: Arc<Self>,
873        _svc_dir: &Simple,
874    ) -> Result<(), Error> {
875        Ok(())
876    }
877}
878
879#[derive(Clone)]
880pub struct FxVolumeAndRoot {
881    volume: Arc<FxVolume>,
882    root: Arc<dyn RootDir>,
883
884    // This is used for service connections and anything that isn't the actual volume.
885    admin_scope: ExecutionScope,
886
887    // The outgoing directory that the volume might be served on.
888    outgoing_dir: Arc<Simple>,
889}
890
891impl FxVolumeAndRoot {
892    pub async fn new<T: From<Directory<FxVolume>> + RootDir>(
893        parent: Weak<VolumesDirectory>,
894        store: Arc<ObjectStore>,
895        unique_id: u64,
896        volume_name: String,
897        blob_resupplied_count: Arc<PageRefaultCounter>,
898        memory_pressure_config: MemoryPressureConfig,
899    ) -> Result<Self, Error> {
900        let volume = Arc::new(FxVolume::new(
901            parent,
902            store,
903            unique_id,
904            volume_name,
905            blob_resupplied_count.clone(),
906            memory_pressure_config,
907        )?);
908        let root_object_id = volume.store().root_directory_object_id();
909        let root_dir = Directory::open(&volume, root_object_id).await?;
910        let root = Arc::<T>::new(root_dir.into()) as Arc<dyn RootDir>;
911        volume
912            .cache
913            .get_or_reserve(root_object_id)
914            .await
915            .placeholder()
916            .unwrap()
917            .commit(&root.clone().as_node());
918        Ok(Self {
919            volume,
920            root,
921            admin_scope: ExecutionScope::new(),
922            outgoing_dir: vfs::directory::immutable::simple(),
923        })
924    }
925
926    pub fn volume(&self) -> &Arc<FxVolume> {
927        &self.volume
928    }
929
930    pub fn root(&self) -> &Arc<dyn RootDir> {
931        &self.root
932    }
933
934    pub fn admin_scope(&self) -> &ExecutionScope {
935        &self.admin_scope
936    }
937
938    pub fn outgoing_dir(&self) -> &Arc<Simple> {
939        &self.outgoing_dir
940    }
941
942    // The same as root but downcasted to FxDirectory.
943    pub fn root_dir(&self) -> Arc<FxDirectory> {
944        self.root().clone().into_any().downcast::<FxDirectory>().expect("Invalid type for root")
945    }
946
947    pub fn into_volume(self) -> Arc<FxVolume> {
948        self.volume
949    }
950}
951
952// The correct number here is arguably u64::MAX - 1 (because node 0 is reserved). There's a bug
953// where inspect test cases fail if we try and use that, possibly because of a signed/unsigned bug.
954// See https://fxbug.dev/42168242.  Until that's fixed, we'll have to use i64::MAX.
955const TOTAL_NODES: u64 = i64::MAX as u64;
956
957// An array used to initialize the FilesystemInfo |name| field. This just spells "fxfs" 0-padded to
958// 32 bytes.
959const FXFS_INFO_NAME_FIDL: [i8; 32] = [
960    0x66, 0x78, 0x66, 0x73, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
961    0, 0, 0, 0,
962];
963
964fn info_to_filesystem_info(
965    info: filesystem::Info,
966    block_size: u64,
967    object_count: u64,
968    fs_id: u64,
969) -> fio::FilesystemInfo {
970    fio::FilesystemInfo {
971        total_bytes: info.total_bytes,
972        used_bytes: info.used_bytes,
973        total_nodes: TOTAL_NODES,
974        used_nodes: object_count,
975        // TODO(https://fxbug.dev/42175592): Support free_shared_pool_bytes.
976        free_shared_pool_bytes: 0,
977        fs_id,
978        block_size: block_size as u32,
979        max_filename_size: fio::MAX_NAME_LENGTH as u32,
980        fs_type: fidl_fuchsia_fs::VfsType::Fxfs.into_primitive(),
981        padding: 0,
982        name: FXFS_INFO_NAME_FIDL,
983    }
984}
985
986#[cfg(test)]
987mod tests {
988    use super::DIRENT_CACHE_LIMIT;
989    use crate::fuchsia::file::FxFile;
990    use crate::fuchsia::fxblob::testing::{self as blob_testing, BlobFixture};
991    use crate::fuchsia::memory_pressure::MemoryPressureLevel;
992    use crate::fuchsia::pager::PagerBacked;
993    use crate::fuchsia::profile::{RECORDED, new_profile_state};
994    use crate::fuchsia::testing::{
995        TestFixture, TestFixtureOptions, close_dir_checked, close_file_checked, open_dir,
996        open_dir_checked, open_file, open_file_checked,
997    };
998    use crate::fuchsia::volume::{FxVolume, MemoryPressureConfig, MemoryPressureLevelConfig};
999    use crate::fuchsia::volumes_directory::VolumesDirectory;
1000    use delivery_blob::CompressionMode;
1001    use fidl_fuchsia_fxfs::{BytesAndNodes, ProjectIdMarker};
1002    use fidl_fuchsia_io as fio;
1003    use fs_inspect::FsInspectVolume;
1004    use fuchsia_async::{self as fasync, TimeoutExt as _};
1005    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
1006    use fuchsia_fs::file;
1007    use fxfs::filesystem::{FxFilesystem, FxFilesystemBuilder};
1008    use fxfs::fsck::{fsck, fsck_volume};
1009    use fxfs::object_store::directory::replace_child;
1010    use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
1011    use fxfs::object_store::volume::root_volume;
1012    use fxfs::object_store::{HandleOptions, ObjectDescriptor, ObjectStore, StoreOptions};
1013    use fxfs_crypt_common::CryptBase;
1014    use fxfs_crypto::{Crypt, WrappingKeyId};
1015    use fxfs_insecure_crypto::new_insecure_crypt;
1016    use refaults_vmo::PageRefaultCounter;
1017    use std::sync::atomic::Ordering;
1018    use std::sync::{Arc, Weak};
1019    use std::time::Duration;
1020    use storage_device::DeviceHolder;
1021    use storage_device::fake_device::FakeDevice;
1022    use zx::Status;
1023
1024    const WRAPPING_KEY_ID: WrappingKeyId = u128::to_le_bytes(123);
1025
1026    #[fuchsia::test(threads = 10)]
1027    async fn test_rename_different_dirs() {
1028        use zx::Event;
1029
1030        let fixture = TestFixture::new().await;
1031        let root = fixture.root();
1032
1033        let src = open_dir_checked(
1034            &root,
1035            "foo",
1036            fio::Flags::FLAG_MAYBE_CREATE
1037                | fio::PERM_READABLE
1038                | fio::PERM_WRITABLE
1039                | fio::Flags::PROTOCOL_DIRECTORY,
1040            Default::default(),
1041        )
1042        .await;
1043
1044        let dst = open_dir_checked(
1045            &root,
1046            "bar",
1047            fio::Flags::FLAG_MAYBE_CREATE
1048                | fio::PERM_READABLE
1049                | fio::PERM_WRITABLE
1050                | fio::Flags::PROTOCOL_DIRECTORY,
1051            Default::default(),
1052        )
1053        .await;
1054
1055        let f = open_file_checked(
1056            &root,
1057            "foo/a",
1058            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1059            &Default::default(),
1060        )
1061        .await;
1062        close_file_checked(f).await;
1063
1064        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1065        Status::ok(status).expect("get_token failed");
1066        src.rename("a", Event::from(dst_token.unwrap()), "b")
1067            .await
1068            .expect("FIDL call failed")
1069            .expect("rename failed");
1070
1071        assert_eq!(
1072            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1073                .await
1074                .expect_err("Open succeeded")
1075                .root_cause()
1076                .downcast_ref::<Status>()
1077                .expect("No status"),
1078            &Status::NOT_FOUND,
1079        );
1080        let f =
1081            open_file_checked(&root, "bar/b", fio::Flags::PROTOCOL_FILE, &Default::default()).await;
1082        close_file_checked(f).await;
1083
1084        close_dir_checked(dst).await;
1085        close_dir_checked(src).await;
1086        fixture.close().await;
1087    }
1088
1089    #[fuchsia::test(threads = 10)]
1090    async fn test_rename_same_dir() {
1091        use zx::Event;
1092        let fixture = TestFixture::new().await;
1093        let root = fixture.root();
1094
1095        let src = open_dir_checked(
1096            &root,
1097            "foo",
1098            fio::Flags::FLAG_MAYBE_CREATE
1099                | fio::PERM_READABLE
1100                | fio::PERM_WRITABLE
1101                | fio::Flags::PROTOCOL_DIRECTORY,
1102            Default::default(),
1103        )
1104        .await;
1105
1106        let f = open_file_checked(
1107            &root,
1108            "foo/a",
1109            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1110            &Default::default(),
1111        )
1112        .await;
1113        close_file_checked(f).await;
1114
1115        let (status, src_token) = src.get_token().await.expect("FIDL call failed");
1116        Status::ok(status).expect("get_token failed");
1117        src.rename("a", Event::from(src_token.unwrap()), "b")
1118            .await
1119            .expect("FIDL call failed")
1120            .expect("rename failed");
1121
1122        assert_eq!(
1123            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1124                .await
1125                .expect_err("Open succeeded")
1126                .root_cause()
1127                .downcast_ref::<Status>()
1128                .expect("No status"),
1129            &Status::NOT_FOUND,
1130        );
1131        let f =
1132            open_file_checked(&root, "foo/b", fio::Flags::PROTOCOL_FILE, &Default::default()).await;
1133        close_file_checked(f).await;
1134
1135        close_dir_checked(src).await;
1136        fixture.close().await;
1137    }
1138
1139    #[fuchsia::test(threads = 10)]
1140    async fn test_rename_overwrites_file() {
1141        use zx::Event;
1142        let fixture = TestFixture::new().await;
1143        let root = fixture.root();
1144
1145        let src = open_dir_checked(
1146            &root,
1147            "foo",
1148            fio::Flags::FLAG_MAYBE_CREATE
1149                | fio::PERM_READABLE
1150                | fio::PERM_WRITABLE
1151                | fio::Flags::PROTOCOL_DIRECTORY,
1152            Default::default(),
1153        )
1154        .await;
1155
1156        let dst = open_dir_checked(
1157            &root,
1158            "bar",
1159            fio::Flags::FLAG_MAYBE_CREATE
1160                | fio::PERM_READABLE
1161                | fio::PERM_WRITABLE
1162                | fio::Flags::PROTOCOL_DIRECTORY,
1163            Default::default(),
1164        )
1165        .await;
1166
1167        // The src file is non-empty.
1168        let src_file = open_file_checked(
1169            &root,
1170            "foo/a",
1171            fio::Flags::FLAG_MAYBE_CREATE
1172                | fio::PERM_READABLE
1173                | fio::PERM_WRITABLE
1174                | fio::Flags::PROTOCOL_FILE,
1175            &Default::default(),
1176        )
1177        .await;
1178        let buf = vec![0xaa as u8; 8192];
1179        file::write(&src_file, buf.as_slice()).await.expect("Failed to write to file");
1180        close_file_checked(src_file).await;
1181
1182        // The dst file is empty (so we can distinguish it).
1183        let f = open_file_checked(
1184            &root,
1185            "bar/b",
1186            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1187            &Default::default(),
1188        )
1189        .await;
1190        close_file_checked(f).await;
1191
1192        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1193        Status::ok(status).expect("get_token failed");
1194        src.rename("a", Event::from(dst_token.unwrap()), "b")
1195            .await
1196            .expect("FIDL call failed")
1197            .expect("rename failed");
1198
1199        assert_eq!(
1200            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1201                .await
1202                .expect_err("Open succeeded")
1203                .root_cause()
1204                .downcast_ref::<Status>()
1205                .expect("No status"),
1206            &Status::NOT_FOUND,
1207        );
1208        let file = open_file_checked(
1209            &root,
1210            "bar/b",
1211            fio::PERM_READABLE | fio::Flags::PROTOCOL_FILE,
1212            &Default::default(),
1213        )
1214        .await;
1215        let buf = file::read(&file).await.expect("read file failed");
1216        assert_eq!(buf, vec![0xaa as u8; 8192]);
1217        close_file_checked(file).await;
1218
1219        close_dir_checked(dst).await;
1220        close_dir_checked(src).await;
1221        fixture.close().await;
1222    }
1223
1224    #[fuchsia::test(threads = 10)]
1225    async fn test_rename_overwrites_dir() {
1226        use zx::Event;
1227        let fixture = TestFixture::new().await;
1228        let root = fixture.root();
1229
1230        let src = open_dir_checked(
1231            &root,
1232            "foo",
1233            fio::Flags::FLAG_MAYBE_CREATE
1234                | fio::PERM_READABLE
1235                | fio::PERM_WRITABLE
1236                | fio::Flags::PROTOCOL_DIRECTORY,
1237            Default::default(),
1238        )
1239        .await;
1240
1241        let dst = open_dir_checked(
1242            &root,
1243            "bar",
1244            fio::Flags::FLAG_MAYBE_CREATE
1245                | fio::PERM_READABLE
1246                | fio::PERM_WRITABLE
1247                | fio::Flags::PROTOCOL_DIRECTORY,
1248            Default::default(),
1249        )
1250        .await;
1251
1252        // The src dir is non-empty.
1253        open_dir_checked(
1254            &root,
1255            "foo/a",
1256            fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
1257            Default::default(),
1258        )
1259        .await;
1260        open_file_checked(
1261            &root,
1262            "foo/a/file",
1263            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1264            &Default::default(),
1265        )
1266        .await;
1267        open_dir_checked(
1268            &root,
1269            "bar/b",
1270            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_DIRECTORY,
1271            Default::default(),
1272        )
1273        .await;
1274
1275        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1276        Status::ok(status).expect("get_token failed");
1277        src.rename("a", Event::from(dst_token.unwrap()), "b")
1278            .await
1279            .expect("FIDL call failed")
1280            .expect("rename failed");
1281
1282        assert_eq!(
1283            open_dir(&root, "foo/a", fio::Flags::PROTOCOL_DIRECTORY, &Default::default())
1284                .await
1285                .expect_err("Open succeeded")
1286                .root_cause()
1287                .downcast_ref::<Status>()
1288                .expect("No status"),
1289            &Status::NOT_FOUND,
1290        );
1291        let f =
1292            open_file_checked(&root, "bar/b/file", fio::Flags::PROTOCOL_FILE, &Default::default())
1293                .await;
1294        close_file_checked(f).await;
1295
1296        close_dir_checked(dst).await;
1297        close_dir_checked(src).await;
1298
1299        fixture.close().await;
1300    }
1301
1302    #[fuchsia::test]
1303    async fn test_background_flush() {
1304        let fixture = TestFixture::new().await;
1305        {
1306            let file = open_file_checked(
1307                fixture.root(),
1308                "file",
1309                fio::Flags::FLAG_MAYBE_CREATE
1310                    | fio::PERM_READABLE
1311                    | fio::PERM_WRITABLE
1312                    | fio::Flags::PROTOCOL_FILE,
1313                &Default::default(),
1314            )
1315            .await;
1316            let object_id = file
1317                .get_attributes(fio::NodeAttributesQuery::ID)
1318                .await
1319                .expect("Fidl get attr")
1320                .expect("get attr")
1321                .1
1322                .id
1323                .unwrap();
1324
1325            // Write some data to the file, which will only go to the cache for now.
1326            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1327
1328            // Initialized to the default size.
1329            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1330            let volume = fixture.volume().volume().clone();
1331
1332            let data_has_persisted = || async {
1333                // We have to reopen the object each time since this is a distinct handle from the
1334                // one managed by the FxFile.
1335                let object =
1336                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1337                        .await
1338                        .expect("open_object failed");
1339                let data = object.contents(8192).await.expect("read failed");
1340                data.len() == 1 && data[..] == [123u8]
1341            };
1342            assert!(!data_has_persisted().await);
1343
1344            fixture.volume().volume().start_background_task(
1345                MemoryPressureConfig {
1346                    mem_normal: MemoryPressureLevelConfig {
1347                        background_task_period: Duration::from_millis(100),
1348                        background_task_initial_delay: Duration::from_millis(100),
1349                        ..Default::default()
1350                    },
1351                    mem_warning: Default::default(),
1352                    mem_critical: Default::default(),
1353                },
1354                None,
1355            );
1356
1357            const MAX_WAIT: Duration = Duration::from_secs(20);
1358            let wait_increments = Duration::from_millis(400);
1359            let mut total_waited = Duration::ZERO;
1360
1361            while total_waited < MAX_WAIT {
1362                fasync::Timer::new(wait_increments).await;
1363                total_waited += wait_increments;
1364
1365                if data_has_persisted().await {
1366                    break;
1367                }
1368            }
1369
1370            assert!(data_has_persisted().await);
1371        }
1372
1373        fixture.close().await;
1374    }
1375
1376    #[fuchsia::test(threads = 2)]
1377    async fn test_background_flush_with_warning_memory_pressure() {
1378        let fixture = TestFixture::new().await;
1379        {
1380            let file = open_file_checked(
1381                fixture.root(),
1382                "file",
1383                fio::Flags::FLAG_MAYBE_CREATE
1384                    | fio::PERM_READABLE
1385                    | fio::PERM_WRITABLE
1386                    | fio::Flags::PROTOCOL_FILE,
1387                &Default::default(),
1388            )
1389            .await;
1390            let object_id = file
1391                .get_attributes(fio::NodeAttributesQuery::ID)
1392                .await
1393                .expect("Fidl get attr")
1394                .expect("get attr")
1395                .1
1396                .id
1397                .unwrap();
1398
1399            // Write some data to the file, which will only go to the cache for now.
1400            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1401
1402            // Initialized to the default size.
1403            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1404            let volume = fixture.volume().volume().clone();
1405
1406            let data_has_persisted = || async {
1407                // We have to reopen the object each time since this is a distinct handle from the
1408                // one managed by the FxFile.
1409                let object =
1410                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1411                        .await
1412                        .expect("open_object failed");
1413                let data = object.contents(8192).await.expect("read failed");
1414                data.len() == 1 && data[..] == [123u8]
1415            };
1416            assert!(!data_has_persisted().await);
1417
1418            // Configure the flush task to only flush quickly on warning.
1419            let flush_config = MemoryPressureConfig {
1420                mem_normal: MemoryPressureLevelConfig {
1421                    background_task_period: Duration::from_secs(20),
1422                    cache_size_limit: DIRENT_CACHE_LIMIT,
1423                    ..Default::default()
1424                },
1425                mem_warning: MemoryPressureLevelConfig {
1426                    background_task_period: Duration::from_millis(100),
1427                    cache_size_limit: 100,
1428                    background_task_initial_delay: Duration::from_millis(100),
1429                    ..Default::default()
1430                },
1431                mem_critical: MemoryPressureLevelConfig {
1432                    background_task_period: Duration::from_secs(20),
1433                    cache_size_limit: 50,
1434                    ..Default::default()
1435                },
1436            };
1437            fixture.volume().volume().start_background_task(
1438                flush_config,
1439                fixture.volumes_directory().memory_pressure_monitor(),
1440            );
1441
1442            // Send the memory pressure update.
1443            fixture
1444                .memory_pressure_proxy()
1445                .on_level_changed(MemoryPressureLevel::Warning)
1446                .await
1447                .expect("Failed to send memory pressure level change");
1448
1449            // Wait a bit of time for the flush to occur (but less than the normal and critical
1450            // periods).
1451            const MAX_WAIT: Duration = Duration::from_secs(3);
1452            let wait_increments = Duration::from_millis(400);
1453            let mut total_waited = Duration::ZERO;
1454
1455            while total_waited < MAX_WAIT {
1456                fasync::Timer::new(wait_increments).await;
1457                total_waited += wait_increments;
1458
1459                if data_has_persisted().await {
1460                    break;
1461                }
1462            }
1463
1464            assert!(data_has_persisted().await);
1465            assert_eq!(fixture.volume().volume().dirent_cache().limit(), 100);
1466        }
1467
1468        fixture.close().await;
1469    }
1470
1471    #[fuchsia::test(threads = 2)]
1472    async fn test_background_flush_with_critical_memory_pressure() {
1473        let fixture = TestFixture::new().await;
1474        {
1475            let file = open_file_checked(
1476                fixture.root(),
1477                "file",
1478                fio::Flags::FLAG_MAYBE_CREATE
1479                    | fio::PERM_READABLE
1480                    | fio::PERM_WRITABLE
1481                    | fio::Flags::PROTOCOL_FILE,
1482                &Default::default(),
1483            )
1484            .await;
1485            let object_id = file
1486                .get_attributes(fio::NodeAttributesQuery::ID)
1487                .await
1488                .expect("Fidl get attr")
1489                .expect("get attr")
1490                .1
1491                .id
1492                .unwrap();
1493
1494            // Write some data to the file, which will only go to the cache for now.
1495            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1496
1497            // Initialized to the default size.
1498            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1499            let volume = fixture.volume().volume().clone();
1500
1501            let data_has_persisted = || async {
1502                // We have to reopen the object each time since this is a distinct handle from the
1503                // one managed by the FxFile.
1504                let object =
1505                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1506                        .await
1507                        .expect("open_object failed");
1508                let data = object.contents(8192).await.expect("read failed");
1509                data.len() == 1 && data[..] == [123u8]
1510            };
1511            assert!(!data_has_persisted().await);
1512
1513            let flush_config = MemoryPressureConfig {
1514                mem_normal: MemoryPressureLevelConfig {
1515                    cache_size_limit: DIRENT_CACHE_LIMIT,
1516                    ..Default::default()
1517                },
1518                mem_warning: MemoryPressureLevelConfig {
1519                    cache_size_limit: 100,
1520                    ..Default::default()
1521                },
1522                mem_critical: MemoryPressureLevelConfig {
1523                    cache_size_limit: 50,
1524                    ..Default::default()
1525                },
1526            };
1527            fixture.volume().volume().start_background_task(
1528                flush_config,
1529                fixture.volumes_directory().memory_pressure_monitor(),
1530            );
1531
1532            // Send the memory pressure update.
1533            fixture
1534                .memory_pressure_proxy()
1535                .on_level_changed(MemoryPressureLevel::Critical)
1536                .await
1537                .expect("Failed to send memory pressure level change");
1538
1539            // Critical memory should trigger a flush immediately so expect a flush very quickly.
1540            const MAX_WAIT: Duration = Duration::from_secs(2);
1541            let wait_increments = Duration::from_millis(400);
1542            let mut total_waited = Duration::ZERO;
1543
1544            while total_waited < MAX_WAIT {
1545                fasync::Timer::new(wait_increments).await;
1546                total_waited += wait_increments;
1547
1548                if data_has_persisted().await {
1549                    break;
1550                }
1551            }
1552
1553            assert!(data_has_persisted().await);
1554            assert_eq!(fixture.volume().volume().dirent_cache().limit(), 50);
1555        }
1556
1557        fixture.close().await;
1558    }
1559
1560    // This test verifies that it is safe to query a volume after it is unmounted in case of a race
1561    // during unmount/shutdown.
1562    #[fuchsia::test(threads = 2)]
1563    async fn test_query_info_unmounted_volume() {
1564        const TEST_VOLUME: &str = "test_1234";
1565        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1566
1567        let fixture = TestFixture::new().await;
1568        {
1569            let volumes_directory = fixture.volumes_directory();
1570            let volume = volumes_directory
1571                .create_and_mount_volume(TEST_VOLUME, Some(crypt.clone()), false, None)
1572                .await
1573                .unwrap();
1574
1575            assert!(volume.volume().get_volume_data().await.is_some());
1576
1577            // Unmount it but keep a reference.
1578            volumes_directory
1579                .lock()
1580                .await
1581                .unmount(volume.volume().store().store_object_id())
1582                .await
1583                .expect("unmount failed");
1584
1585            // This returns None, but doesn't crash.
1586            assert!(volume.volume().get_volume_data().await.is_none());
1587        }
1588        fixture.close().await;
1589    }
1590
1591    #[fuchsia::test]
1592    async fn test_project_limit_persistence() {
1593        const BYTES_LIMIT_1: u64 = 123456;
1594        const NODES_LIMIT_1: u64 = 4321;
1595        const BYTES_LIMIT_2: u64 = 456789;
1596        const NODES_LIMIT_2: u64 = 9876;
1597        const VOLUME_NAME: &str = "A";
1598        const FILE_NAME: &str = "B";
1599        const PROJECT_ID: u64 = 42;
1600        const PROJECT_ID2: u64 = 343;
1601        let volume_store_id;
1602        let node_id;
1603        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
1604        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1605        {
1606            let blob_resupplied_count =
1607                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1608            let volumes_directory = VolumesDirectory::new(
1609                root_volume(filesystem.clone()).await.unwrap(),
1610                Weak::new(),
1611                None,
1612                blob_resupplied_count,
1613                MemoryPressureConfig::default(),
1614            )
1615            .await
1616            .unwrap();
1617
1618            let volume_and_root = volumes_directory
1619                .create_and_mount_volume(VOLUME_NAME, None, false, None)
1620                .await
1621                .expect("create unencrypted volume failed");
1622            volume_store_id = volume_and_root.volume().store().store_object_id();
1623
1624            let (volume_dir_proxy, dir_server_end) =
1625                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1626            volumes_directory
1627                .serve_volume(&volume_and_root, dir_server_end, false)
1628                .expect("serve_volume failed");
1629
1630            let project_proxy =
1631                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1632                    .expect("Unable to connect to project id service");
1633
1634            project_proxy
1635                .set_limit(0, BYTES_LIMIT_1, NODES_LIMIT_1)
1636                .await
1637                .unwrap()
1638                .expect_err("Should not set limits for project id 0");
1639
1640            project_proxy
1641                .set_limit(PROJECT_ID, BYTES_LIMIT_1, NODES_LIMIT_1)
1642                .await
1643                .unwrap()
1644                .expect("To set limits");
1645            {
1646                let BytesAndNodes { bytes, nodes } =
1647                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1648                assert_eq!(bytes, BYTES_LIMIT_1);
1649                assert_eq!(nodes, NODES_LIMIT_1);
1650            }
1651
1652            let file_proxy = {
1653                let (root_proxy, root_server_end) =
1654                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1655                volume_dir_proxy
1656                    .open(
1657                        "root",
1658                        fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
1659                        &Default::default(),
1660                        root_server_end.into_channel(),
1661                    )
1662                    .expect("Failed to open volume root");
1663
1664                open_file_checked(
1665                    &root_proxy,
1666                    FILE_NAME,
1667                    fio::Flags::FLAG_MAYBE_CREATE
1668                        | fio::PERM_READABLE
1669                        | fio::PERM_WRITABLE
1670                        | fio::Flags::PROTOCOL_FILE,
1671                    &Default::default(),
1672                )
1673                .await
1674            };
1675
1676            let (_, immutable_attributes) =
1677                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
1678            node_id = immutable_attributes.id.unwrap();
1679
1680            project_proxy
1681                .set_for_node(node_id, 0)
1682                .await
1683                .unwrap()
1684                .expect_err("Should not set 0 project id");
1685
1686            project_proxy
1687                .set_for_node(node_id, PROJECT_ID)
1688                .await
1689                .unwrap()
1690                .expect("Setting project on node");
1691
1692            project_proxy
1693                .set_limit(PROJECT_ID, BYTES_LIMIT_2, NODES_LIMIT_2)
1694                .await
1695                .unwrap()
1696                .expect("To set limits");
1697            {
1698                let BytesAndNodes { bytes, nodes } =
1699                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1700                assert_eq!(bytes, BYTES_LIMIT_2);
1701                assert_eq!(nodes, NODES_LIMIT_2);
1702            }
1703
1704            assert_eq!(
1705                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1706                PROJECT_ID
1707            );
1708
1709            volumes_directory.terminate().await;
1710            filesystem.close().await.expect("close filesystem failed");
1711        }
1712        device = filesystem.take_device().await;
1713        device.ensure_unique();
1714        device.reopen(false);
1715        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1716        {
1717            fsck(filesystem.clone()).await.expect("Fsck");
1718            fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
1719            let blob_resupplied_count =
1720                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1721            let volumes_directory = VolumesDirectory::new(
1722                root_volume(filesystem.clone()).await.unwrap(),
1723                Weak::new(),
1724                None,
1725                blob_resupplied_count,
1726                MemoryPressureConfig::default(),
1727            )
1728            .await
1729            .unwrap();
1730            let volume_and_root = volumes_directory
1731                .mount_volume(VOLUME_NAME, None, false)
1732                .await
1733                .expect("mount unencrypted volume failed");
1734
1735            let (volume_proxy, _scope) = crate::volumes_directory::serve_startup_volume_proxy(
1736                &volumes_directory,
1737                VOLUME_NAME,
1738            );
1739
1740            let project_proxy = {
1741                let (volume_dir_proxy, dir_server_end) =
1742                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1743                volumes_directory
1744                    .serve_volume(&volume_and_root, dir_server_end, false)
1745                    .expect("serve_volume failed");
1746
1747                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1748                    .expect("Unable to connect to project id service")
1749            };
1750
1751            let usage_bytes_and_nodes = {
1752                let (
1753                    BytesAndNodes { bytes: limit_bytes, nodes: limit_nodes },
1754                    usage_bytes_and_nodes,
1755                ) = project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info");
1756                assert_eq!(limit_bytes, BYTES_LIMIT_2);
1757                assert_eq!(limit_nodes, NODES_LIMIT_2);
1758                usage_bytes_and_nodes
1759            };
1760
1761            // Should be unable to clear the project limit, due to being in use.
1762            project_proxy.clear(PROJECT_ID).await.unwrap().expect("To clear limits");
1763
1764            assert_eq!(
1765                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1766                PROJECT_ID
1767            );
1768            project_proxy
1769                .set_for_node(node_id, PROJECT_ID2)
1770                .await
1771                .unwrap()
1772                .expect("Changing project");
1773            assert_eq!(
1774                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1775                PROJECT_ID2
1776            );
1777
1778            assert_eq!(
1779                project_proxy.info(PROJECT_ID).await.unwrap().expect_err("Expect missing limits"),
1780                Status::NOT_FOUND.into_raw()
1781            );
1782            assert_eq!(
1783                project_proxy.info(PROJECT_ID2).await.unwrap().expect("Fetching project info").1,
1784                usage_bytes_and_nodes
1785            );
1786
1787            std::mem::drop(volume_proxy);
1788            volumes_directory.terminate().await;
1789            std::mem::drop(volumes_directory);
1790            filesystem.close().await.expect("close filesystem failed");
1791        }
1792        device = filesystem.take_device().await;
1793        device.ensure_unique();
1794        device.reopen(false);
1795        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1796        fsck(filesystem.clone()).await.expect("Fsck");
1797        fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
1798        let blob_resupplied_count =
1799            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1800        let volumes_directory = VolumesDirectory::new(
1801            root_volume(filesystem.clone()).await.unwrap(),
1802            Weak::new(),
1803            None,
1804            blob_resupplied_count,
1805            MemoryPressureConfig::default(),
1806        )
1807        .await
1808        .unwrap();
1809        let volume_and_root = volumes_directory
1810            .mount_volume(VOLUME_NAME, None, false)
1811            .await
1812            .expect("mount unencrypted volume failed");
1813        let (volume_dir_proxy, dir_server_end) =
1814            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1815        volumes_directory
1816            .serve_volume(&volume_and_root, dir_server_end, false)
1817            .expect("serve_volume failed");
1818        let project_proxy = connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1819            .expect("Unable to connect to project id service");
1820        assert_eq!(
1821            project_proxy.info(PROJECT_ID).await.unwrap().expect_err("Expect missing limits"),
1822            Status::NOT_FOUND.into_raw()
1823        );
1824        volumes_directory.terminate().await;
1825        std::mem::drop(volumes_directory);
1826        filesystem.close().await.expect("close filesystem failed");
1827    }
1828
1829    #[fuchsia::test]
1830    async fn test_project_limit_accounting() {
1831        const BYTES_LIMIT: u64 = 123456;
1832        const NODES_LIMIT: u64 = 4321;
1833        const VOLUME_NAME: &str = "A";
1834        const FILE_NAME: &str = "B";
1835        const PROJECT_ID: u64 = 42;
1836        let volume_store_id;
1837        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
1838        let first_object_id;
1839        let mut bytes_usage;
1840        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1841        {
1842            let blob_resupplied_count =
1843                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1844            let volumes_directory = VolumesDirectory::new(
1845                root_volume(filesystem.clone()).await.unwrap(),
1846                Weak::new(),
1847                None,
1848                blob_resupplied_count,
1849                MemoryPressureConfig::default(),
1850            )
1851            .await
1852            .unwrap();
1853
1854            let volume_and_root = volumes_directory
1855                .create_and_mount_volume(
1856                    VOLUME_NAME,
1857                    Some(Arc::new(new_insecure_crypt())),
1858                    false,
1859                    None,
1860                )
1861                .await
1862                .expect("create unencrypted volume failed");
1863            volume_store_id = volume_and_root.volume().store().store_object_id();
1864
1865            let (volume_dir_proxy, dir_server_end) =
1866                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1867            volumes_directory
1868                .serve_volume(&volume_and_root, dir_server_end, false)
1869                .expect("serve_volume failed");
1870
1871            let project_proxy =
1872                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1873                    .expect("Unable to connect to project id service");
1874
1875            project_proxy
1876                .set_limit(PROJECT_ID, BYTES_LIMIT, NODES_LIMIT)
1877                .await
1878                .unwrap()
1879                .expect("To set limits");
1880            {
1881                let BytesAndNodes { bytes, nodes } =
1882                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1883                assert_eq!(bytes, BYTES_LIMIT);
1884                assert_eq!(nodes, NODES_LIMIT);
1885            }
1886
1887            let file_proxy = {
1888                let (root_proxy, root_server_end) =
1889                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1890                volume_dir_proxy
1891                    .open(
1892                        "root",
1893                        fio::PERM_READABLE | fio::PERM_WRITABLE,
1894                        &Default::default(),
1895                        root_server_end.into_channel(),
1896                    )
1897                    .expect("Failed to open volume root");
1898
1899                open_file_checked(
1900                    &root_proxy,
1901                    FILE_NAME,
1902                    fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
1903                    &Default::default(),
1904                )
1905                .await
1906            };
1907
1908            assert_eq!(
1909                8192,
1910                file_proxy
1911                    .write(&vec![0xff as u8; 8192])
1912                    .await
1913                    .expect("FIDL call failed")
1914                    .map_err(Status::from_raw)
1915                    .expect("File write was successful")
1916            );
1917            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
1918
1919            {
1920                let BytesAndNodes { bytes, nodes } =
1921                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
1922                assert_eq!(bytes, 0);
1923                assert_eq!(nodes, 0);
1924            }
1925
1926            let (_, immutable_attributes) =
1927                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
1928            let node_id = immutable_attributes.id.unwrap();
1929
1930            first_object_id = node_id;
1931            project_proxy
1932                .set_for_node(node_id, PROJECT_ID)
1933                .await
1934                .unwrap()
1935                .expect("Setting project on node");
1936
1937            bytes_usage = {
1938                let BytesAndNodes { bytes, nodes } =
1939                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
1940                assert!(bytes > 0);
1941                assert_eq!(nodes, 1);
1942                bytes
1943            };
1944
1945            // Grow the file by a block.
1946            assert_eq!(
1947                8192,
1948                file_proxy
1949                    .write(&vec![0xff as u8; 8192])
1950                    .await
1951                    .expect("FIDL call failed")
1952                    .map_err(Status::from_raw)
1953                    .expect("File write was successful")
1954            );
1955            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
1956            bytes_usage = {
1957                let BytesAndNodes { bytes, nodes } =
1958                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
1959                assert!(bytes > bytes_usage);
1960                assert_eq!(nodes, 1);
1961                bytes
1962            };
1963
1964            volumes_directory.terminate().await;
1965            filesystem.close().await.expect("close filesystem failed");
1966        }
1967        device = filesystem.take_device().await;
1968        device.ensure_unique();
1969        device.reopen(false);
1970        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1971        {
1972            fsck(filesystem.clone()).await.expect("Fsck");
1973            fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
1974                .await
1975                .expect("Fsck volume");
1976            let blob_resupplied_count =
1977                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1978            let volumes_directory = VolumesDirectory::new(
1979                root_volume(filesystem.clone()).await.unwrap(),
1980                Weak::new(),
1981                None,
1982                blob_resupplied_count,
1983                MemoryPressureConfig::default(),
1984            )
1985            .await
1986            .unwrap();
1987            let volume_and_root = volumes_directory
1988                .mount_volume(VOLUME_NAME, Some(Arc::new(new_insecure_crypt())), false)
1989                .await
1990                .expect("mount unencrypted volume failed");
1991
1992            let (root_proxy, project_proxy) = {
1993                let (volume_dir_proxy, dir_server_end) =
1994                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1995                volumes_directory
1996                    .serve_volume(&volume_and_root, dir_server_end, false)
1997                    .expect("serve_volume failed");
1998
1999                let (root_proxy, root_server_end) =
2000                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2001                volume_dir_proxy
2002                    .open(
2003                        "root",
2004                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2005                        &Default::default(),
2006                        root_server_end.into_channel(),
2007                    )
2008                    .expect("Failed to open volume root");
2009                let project_proxy = {
2010                    connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2011                        .expect("Unable to connect to project id service")
2012                };
2013                (root_proxy, project_proxy)
2014            };
2015
2016            {
2017                let BytesAndNodes { bytes, nodes } =
2018                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2019                assert_eq!(bytes, bytes_usage);
2020                assert_eq!(nodes, 1);
2021            }
2022
2023            assert_eq!(
2024                project_proxy
2025                    .get_for_node(first_object_id)
2026                    .await
2027                    .unwrap()
2028                    .expect("Checking project"),
2029                PROJECT_ID
2030            );
2031            root_proxy
2032                .unlink(FILE_NAME, &fio::UnlinkOptions::default())
2033                .await
2034                .expect("FIDL call failed")
2035                .expect("unlink failed");
2036            filesystem.graveyard().flush().await;
2037
2038            {
2039                let BytesAndNodes { bytes, nodes } =
2040                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2041                assert_eq!(bytes, 0);
2042                assert_eq!(nodes, 0);
2043            }
2044
2045            let file_proxy = open_file_checked(
2046                &root_proxy,
2047                FILE_NAME,
2048                fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2049                &Default::default(),
2050            )
2051            .await;
2052
2053            let (_, immutable_attributes) =
2054                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2055            let node_id = immutable_attributes.id.unwrap();
2056
2057            project_proxy
2058                .set_for_node(node_id, PROJECT_ID)
2059                .await
2060                .unwrap()
2061                .expect("Applying project");
2062
2063            bytes_usage = {
2064                let BytesAndNodes { bytes, nodes } =
2065                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2066                // Empty file should have less space than the non-empty file from above.
2067                assert!(bytes < bytes_usage);
2068                assert_eq!(nodes, 1);
2069                bytes
2070            };
2071
2072            assert_eq!(
2073                8192,
2074                file_proxy
2075                    .write(&vec![0xff as u8; 8192])
2076                    .await
2077                    .expect("FIDL call failed")
2078                    .map_err(Status::from_raw)
2079                    .expect("File write was successful")
2080            );
2081            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
2082            bytes_usage = {
2083                let BytesAndNodes { bytes, nodes } =
2084                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2085                assert!(bytes > bytes_usage);
2086                assert_eq!(nodes, 1);
2087                bytes
2088            };
2089
2090            // Trim to zero. Bytes should decrease.
2091            file_proxy.resize(0).await.expect("FIDL call failed").expect("Resize file");
2092            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
2093            {
2094                let BytesAndNodes { bytes, nodes } =
2095                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2096                assert!(bytes < bytes_usage);
2097                assert_eq!(nodes, 1);
2098            };
2099
2100            // Dropping node from project. Usage should go to zero.
2101            project_proxy
2102                .clear_for_node(node_id)
2103                .await
2104                .expect("FIDL call failed")
2105                .expect("Clear failed.");
2106            {
2107                let BytesAndNodes { bytes, nodes } =
2108                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2109                assert_eq!(bytes, 0);
2110                assert_eq!(nodes, 0);
2111            };
2112
2113            volumes_directory.terminate().await;
2114            filesystem.close().await.expect("close filesystem failed");
2115        }
2116        device = filesystem.take_device().await;
2117        device.ensure_unique();
2118        device.reopen(false);
2119        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2120        fsck(filesystem.clone()).await.expect("Fsck");
2121        fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
2122            .await
2123            .expect("Fsck volume");
2124        filesystem.close().await.expect("close filesystem failed");
2125    }
2126
2127    #[fuchsia::test]
2128    async fn test_project_node_inheritance() {
2129        const BYTES_LIMIT: u64 = 123456;
2130        const NODES_LIMIT: u64 = 4321;
2131        const VOLUME_NAME: &str = "A";
2132        const DIR_NAME: &str = "B";
2133        const SUBDIR_NAME: &str = "C";
2134        const FILE_NAME: &str = "D";
2135        const PROJECT_ID: u64 = 42;
2136        let volume_store_id;
2137        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
2138        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2139        {
2140            let blob_resupplied_count =
2141                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2142            let volumes_directory = VolumesDirectory::new(
2143                root_volume(filesystem.clone()).await.unwrap(),
2144                Weak::new(),
2145                None,
2146                blob_resupplied_count,
2147                MemoryPressureConfig::default(),
2148            )
2149            .await
2150            .unwrap();
2151
2152            let volume_and_root = volumes_directory
2153                .create_and_mount_volume(
2154                    VOLUME_NAME,
2155                    Some(Arc::new(new_insecure_crypt())),
2156                    false,
2157                    None,
2158                )
2159                .await
2160                .expect("create unencrypted volume failed");
2161            volume_store_id = volume_and_root.volume().store().store_object_id();
2162
2163            let (volume_dir_proxy, dir_server_end) =
2164                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2165            volumes_directory
2166                .serve_volume(&volume_and_root, dir_server_end, false)
2167                .expect("serve_volume failed");
2168
2169            let project_proxy =
2170                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2171                    .expect("Unable to connect to project id service");
2172
2173            project_proxy
2174                .set_limit(PROJECT_ID, BYTES_LIMIT, NODES_LIMIT)
2175                .await
2176                .unwrap()
2177                .expect("To set limits");
2178
2179            let dir_proxy = {
2180                let (root_proxy, root_server_end) =
2181                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2182                volume_dir_proxy
2183                    .open(
2184                        "root",
2185                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2186                        &Default::default(),
2187                        root_server_end.into_channel(),
2188                    )
2189                    .expect("Failed to open volume root");
2190
2191                open_dir_checked(
2192                    &root_proxy,
2193                    DIR_NAME,
2194                    fio::Flags::FLAG_MAYBE_CREATE
2195                        | fio::PERM_READABLE
2196                        | fio::PERM_WRITABLE
2197                        | fio::Flags::PROTOCOL_DIRECTORY,
2198                    Default::default(),
2199                )
2200                .await
2201            };
2202            {
2203                let (_, immutable_attributes) =
2204                    dir_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2205                let node_id = immutable_attributes.id.unwrap();
2206
2207                project_proxy
2208                    .set_for_node(node_id, PROJECT_ID)
2209                    .await
2210                    .unwrap()
2211                    .expect("Setting project on node");
2212            }
2213
2214            let subdir_proxy = open_dir_checked(
2215                &dir_proxy,
2216                SUBDIR_NAME,
2217                fio::Flags::FLAG_MAYBE_CREATE
2218                    | fio::PERM_READABLE
2219                    | fio::PERM_WRITABLE
2220                    | fio::Flags::PROTOCOL_DIRECTORY,
2221                Default::default(),
2222            )
2223            .await;
2224            {
2225                let (_, immutable_attributes) = subdir_proxy
2226                    .get_attributes(fio::NodeAttributesQuery::ID)
2227                    .await
2228                    .unwrap()
2229                    .unwrap();
2230                let node_id = immutable_attributes.id.unwrap();
2231
2232                assert_eq!(
2233                    project_proxy
2234                        .get_for_node(node_id)
2235                        .await
2236                        .unwrap()
2237                        .expect("Setting project on node"),
2238                    PROJECT_ID
2239                );
2240            }
2241
2242            let file_proxy = open_file_checked(
2243                &subdir_proxy,
2244                FILE_NAME,
2245                fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2246                &Default::default(),
2247            )
2248            .await;
2249            {
2250                let (_, immutable_attributes) =
2251                    file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2252                let node_id = immutable_attributes.id.unwrap();
2253
2254                assert_eq!(
2255                    project_proxy
2256                        .get_for_node(node_id)
2257                        .await
2258                        .unwrap()
2259                        .expect("Setting project on node"),
2260                    PROJECT_ID
2261                );
2262            }
2263
2264            // An unnamed temporary file is created slightly differently to a regular file object.
2265            // Just in case, check that it inherits project ID as well.
2266            let tmpfile_proxy = open_file_checked(
2267                &subdir_proxy,
2268                ".",
2269                fio::Flags::PROTOCOL_FILE
2270                    | fio::Flags::FLAG_CREATE_AS_UNNAMED_TEMPORARY
2271                    | fio::PERM_READABLE,
2272                &fio::Options::default(),
2273            )
2274            .await;
2275            {
2276                let (_, immutable_attributes) = tmpfile_proxy
2277                    .get_attributes(fio::NodeAttributesQuery::ID)
2278                    .await
2279                    .unwrap()
2280                    .unwrap();
2281                let node_id: u64 = immutable_attributes.id.unwrap();
2282                assert_eq!(
2283                    project_proxy
2284                        .get_for_node(node_id)
2285                        .await
2286                        .unwrap()
2287                        .expect("Setting project on node"),
2288                    PROJECT_ID
2289                );
2290            }
2291
2292            let BytesAndNodes { nodes, .. } =
2293                project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2294            assert_eq!(nodes, 3);
2295            volumes_directory.terminate().await;
2296            filesystem.close().await.expect("close filesystem failed");
2297        }
2298        device = filesystem.take_device().await;
2299        device.ensure_unique();
2300        device.reopen(false);
2301        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2302        fsck(filesystem.clone()).await.expect("Fsck");
2303        fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
2304            .await
2305            .expect("Fsck volume");
2306        filesystem.close().await.expect("close filesystem failed");
2307    }
2308
2309    #[fuchsia::test]
2310    async fn test_project_listing() {
2311        const VOLUME_NAME: &str = "A";
2312        const FILE_NAME: &str = "B";
2313        const NON_ZERO_PROJECT_ID: u64 = 3;
2314        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
2315        let volume_store_id;
2316        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2317        {
2318            let blob_resupplied_count =
2319                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2320            let volumes_directory = VolumesDirectory::new(
2321                root_volume(filesystem.clone()).await.unwrap(),
2322                Weak::new(),
2323                None,
2324                blob_resupplied_count,
2325                MemoryPressureConfig::default(),
2326            )
2327            .await
2328            .unwrap();
2329            let volume_and_root = volumes_directory
2330                .create_and_mount_volume(VOLUME_NAME, None, false, None)
2331                .await
2332                .expect("create unencrypted volume failed");
2333            volume_store_id = volume_and_root.volume().store().store_object_id();
2334
2335            let (volume_dir_proxy, dir_server_end) =
2336                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2337            volumes_directory
2338                .serve_volume(&volume_and_root, dir_server_end, false)
2339                .expect("serve_volume failed");
2340            let project_proxy =
2341                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2342                    .expect("Unable to connect to project id service");
2343            // This is just to ensure that the small numbers below can be used for this test.
2344            assert!(FxVolume::MAX_PROJECT_ENTRIES >= 4);
2345            // Create a bunch of proxies. 3 more than the limit to ensure pagination.
2346            let num_entries = u64::try_from(FxVolume::MAX_PROJECT_ENTRIES + 3).unwrap();
2347            for project_id in 1..=num_entries {
2348                project_proxy.set_limit(project_id, 1, 1).await.unwrap().expect("To set limits");
2349            }
2350
2351            // Add one usage entry to be interspersed with the limit entries. Verifies that the
2352            // iterator will progress passed it with no effect.
2353            let file_proxy = {
2354                let (root_proxy, root_server_end) =
2355                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2356                volume_dir_proxy
2357                    .open(
2358                        "root",
2359                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2360                        &Default::default(),
2361                        root_server_end.into_channel(),
2362                    )
2363                    .expect("Failed to open volume root");
2364
2365                open_file_checked(
2366                    &root_proxy,
2367                    FILE_NAME,
2368                    fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2369                    &Default::default(),
2370                )
2371                .await
2372            };
2373            let (_, immutable_attributes) =
2374                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2375            let node_id = immutable_attributes.id.unwrap();
2376            project_proxy
2377                .set_for_node(node_id, NON_ZERO_PROJECT_ID)
2378                .await
2379                .unwrap()
2380                .expect("Setting project on node");
2381            {
2382                let BytesAndNodes { nodes, .. } = project_proxy
2383                    .info(NON_ZERO_PROJECT_ID)
2384                    .await
2385                    .unwrap()
2386                    .expect("Fetching project info")
2387                    .1;
2388                assert_eq!(nodes, 1);
2389            }
2390
2391            // If this `unwrap()` fails, it is likely the MAX_PROJECT_ENTRIES is too large for fidl.
2392            let (mut entries, mut next_token) =
2393                project_proxy.list(None).await.unwrap().expect("To get project listing");
2394            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2395            assert!(next_token.is_some());
2396            assert!(entries.contains(&1));
2397            assert!(entries.contains(&3));
2398            assert!(!entries.contains(&num_entries));
2399            // Page two should have a small set at the end.
2400            (entries, next_token) = project_proxy
2401                .list(next_token.as_deref())
2402                .await
2403                .unwrap()
2404                .expect("To get project listing");
2405            assert_eq!(entries.len(), 3);
2406            assert!(next_token.is_none());
2407            assert!(entries.contains(&num_entries));
2408            assert!(!entries.contains(&1));
2409            assert!(!entries.contains(&3));
2410            // Delete a couple and list all again, but one has usage still.
2411            project_proxy.clear(1).await.unwrap().expect("Clear project");
2412            project_proxy.clear(3).await.unwrap().expect("Clear project");
2413            (entries, next_token) =
2414                project_proxy.list(None).await.unwrap().expect("To get project listing");
2415            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2416            assert!(next_token.is_some());
2417            assert!(!entries.contains(&num_entries));
2418            assert!(!entries.contains(&1));
2419            assert!(entries.contains(&3));
2420            (entries, next_token) = project_proxy
2421                .list(next_token.as_deref())
2422                .await
2423                .unwrap()
2424                .expect("To get project listing");
2425            assert_eq!(entries.len(), 2);
2426            assert!(next_token.is_none());
2427            assert!(entries.contains(&num_entries));
2428            // Delete two more to hit the edge case.
2429            project_proxy.clear(2).await.unwrap().expect("Clear project");
2430            project_proxy.clear(4).await.unwrap().expect("Clear project");
2431            (entries, next_token) =
2432                project_proxy.list(None).await.unwrap().expect("To get project listing");
2433            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2434            assert!(next_token.is_none());
2435            assert!(entries.contains(&num_entries));
2436            volumes_directory.terminate().await;
2437            filesystem.close().await.expect("close filesystem failed");
2438        }
2439        device = filesystem.take_device().await;
2440        device.ensure_unique();
2441        device.reopen(false);
2442        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2443        fsck(filesystem.clone()).await.expect("Fsck");
2444        fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
2445        filesystem.close().await.expect("close filesystem failed");
2446    }
2447
2448    #[fuchsia::test(threads = 10)]
2449    async fn test_profile_blob() {
2450        let mut hashes = Vec::new();
2451        let device = {
2452            let fixture = blob_testing::new_blob_fixture().await;
2453
2454            for i in 0..3u64 {
2455                let hash =
2456                    fixture.write_blob(i.to_le_bytes().as_slice(), CompressionMode::Never).await;
2457                hashes.push(hash);
2458            }
2459            fixture.close().await
2460        };
2461        device.ensure_unique();
2462
2463        device.reopen(false);
2464        let mut device = {
2465            let fixture = blob_testing::open_blob_fixture(device).await;
2466            fixture
2467                .volume()
2468                .volume()
2469                .record_or_replay_profile(new_profile_state(true), "foo")
2470                .await
2471                .expect("Recording");
2472
2473            // Page in the zero offsets only to avoid readahead strangeness.
2474            let mut writable = [0u8];
2475            for hash in &hashes {
2476                let vmo = fixture.get_blob_vmo(*hash).await;
2477                vmo.read(&mut writable, 0).expect("Vmo read");
2478            }
2479            fixture.volume().volume().stop_profile_tasks().await;
2480            fixture.close().await
2481        };
2482
2483        // Do this multiple times to ensure that the re-recording doesn't drop anything.
2484        for i in 0..3 {
2485            device.ensure_unique();
2486            device.reopen(false);
2487            let fixture = blob_testing::open_blob_fixture(device).await;
2488            {
2489                // Ensure that nothing is paged in right now.
2490                for hash in &hashes {
2491                    let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2492                    assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2493                }
2494
2495                fixture
2496                    .volume()
2497                    .volume()
2498                    .record_or_replay_profile(new_profile_state(true), "foo")
2499                    .await
2500                    .expect("Replaying");
2501
2502                // Move the file in flight to ensure a new version lands to be used next time.
2503                {
2504                    let store_id = fixture.volume().volume().store().store_object_id();
2505                    let dir = fixture.volume().volume().get_profile_directory().await.unwrap();
2506                    let old_file = dir.lookup("foo").await.unwrap().unwrap().0;
2507                    let mut transaction = fixture
2508                        .fs()
2509                        .clone()
2510                        .new_transaction(
2511                            lock_keys!(
2512                                LockKey::object(store_id, dir.object_id()),
2513                                LockKey::object(store_id, old_file),
2514                            ),
2515                            Options::default(),
2516                        )
2517                        .await
2518                        .unwrap();
2519                    replace_child(&mut transaction, Some((&dir, "foo")), (&dir, &i.to_string()))
2520                        .await
2521                        .expect("Replace old profile.");
2522                    transaction.commit().await.unwrap();
2523                    assert!(
2524                        dir.lookup("foo").await.unwrap().is_none(),
2525                        "Old profile should be moved"
2526                    );
2527                }
2528
2529                // Await all data being played back by checking that things have paged in.
2530                async {
2531                    for hash in &hashes {
2532                        // Fetch vmo this way as well to ensure that the open is counting the file
2533                        // as used in the current recording.
2534                        let _vmo = fixture.get_blob_vmo(*hash).await;
2535                        let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2536                        while blob.vmo().info().unwrap().populated_bytes == 0 {
2537                            fasync::Timer::new(Duration::from_millis(25)).await;
2538                        }
2539                    }
2540                }
2541                .on_timeout(std::time::Duration::from_secs(120), || {
2542                    panic!("Replay did not page in for all VMOs")
2543                })
2544                .await;
2545
2546                // Complete the recording.
2547                fixture.volume().volume().stop_profile_tasks().await;
2548            }
2549            device = fixture.close().await;
2550        }
2551    }
2552
2553    #[fuchsia::test(threads = 10)]
2554    async fn test_profile_file() {
2555        let mut hashes = Vec::new();
2556        let crypt_file_id;
2557        let device = {
2558            let fixture = TestFixture::new().await;
2559            // Include a crypt file to access during the recording. It should not be added.
2560            {
2561                let crypt_dir = open_dir_checked(
2562                    fixture.root(),
2563                    "crypt_dir",
2564                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2565                    Default::default(),
2566                )
2567                .await;
2568                let crypt: Arc<CryptBase> = fixture.crypt().unwrap();
2569                crypt
2570                    .add_wrapping_key(WRAPPING_KEY_ID, [1; 32].into())
2571                    .expect("add_wrapping_key failed");
2572                crypt_dir
2573                    .update_attributes(&fio::MutableNodeAttributes {
2574                        wrapping_key_id: Some(WRAPPING_KEY_ID),
2575                        ..Default::default()
2576                    })
2577                    .await
2578                    .expect("update_attributes wire call failed")
2579                    .expect("update_attributes failed");
2580                let crypt_file = open_file_checked(
2581                    &crypt_dir,
2582                    "crypt_file",
2583                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2584                    &Default::default(),
2585                )
2586                .await;
2587                crypt_file.write("asdf".as_bytes()).await.unwrap().expect("Writing crypt file");
2588                crypt_file_id = crypt_file
2589                    .get_attributes(fio::NodeAttributesQuery::ID)
2590                    .await
2591                    .unwrap()
2592                    .expect("Get id")
2593                    .1
2594                    .id
2595                    .expect("Reading id in response");
2596            }
2597
2598            for i in 0..3u64 {
2599                let file_proxy = open_file_checked(
2600                    fixture.root(),
2601                    &i.to_string(),
2602                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2603                    &Default::default(),
2604                )
2605                .await;
2606                file_proxy.write(i.to_le_bytes().as_slice()).await.unwrap().expect("Writing file");
2607                let id = file_proxy
2608                    .get_attributes(fio::NodeAttributesQuery::ID)
2609                    .await
2610                    .unwrap()
2611                    .expect("Get id")
2612                    .1
2613                    .id
2614                    .expect("Reading id in response");
2615                hashes.push((i, id));
2616            }
2617            fixture.close().await
2618        };
2619        device.ensure_unique();
2620
2621        device.reopen(false);
2622        let mut device = {
2623            let fixture = TestFixture::open(
2624                device,
2625                TestFixtureOptions { format: false, ..Default::default() },
2626            )
2627            .await;
2628            fixture
2629                .volume()
2630                .volume()
2631                .record_or_replay_profile(new_profile_state(false), "foo")
2632                .await
2633                .expect("Recording");
2634
2635            {
2636                let crypt: Arc<CryptBase> = fixture.crypt().unwrap();
2637                crypt
2638                    .add_wrapping_key(WRAPPING_KEY_ID, [1; 32].into())
2639                    .expect("add wrapping key failed");
2640                let crypt_file = open_file_checked(
2641                    fixture.root(),
2642                    "crypt_dir/crypt_file",
2643                    fio::PERM_READABLE,
2644                    &Default::default(),
2645                )
2646                .await;
2647                crypt_file.read(1).await.unwrap().expect("Reading crypt file");
2648            }
2649            // Page in the zero offsets only to avoid readahead strangeness.
2650            for (i, _) in &hashes {
2651                let file_proxy = open_file_checked(
2652                    fixture.root(),
2653                    &i.to_string(),
2654                    fio::PERM_READABLE,
2655                    &Default::default(),
2656                )
2657                .await;
2658                file_proxy.read(1).await.unwrap().expect("Reading file");
2659            }
2660            fixture.volume().volume().stop_profile_tasks().await;
2661            fixture.close().await
2662        };
2663
2664        // Do this multiple times to ensure that the re-recording doesn't drop anything.
2665        for i in 0..3 {
2666            device.ensure_unique();
2667            device.reopen(false);
2668            let fixture = TestFixture::open(
2669                device,
2670                TestFixtureOptions { format: false, ..Default::default() },
2671            )
2672            .await;
2673            {
2674                // Need to get the root vmo to check committed bytes.
2675                let volume = fixture.volume().volume().clone();
2676                // Ensure that nothing is paged in right now.
2677                {
2678                    let crypt_file = volume
2679                        .get_or_load_node(crypt_file_id, ObjectDescriptor::File, None)
2680                        .await
2681                        .expect("Opening file internally")
2682                        .into_any()
2683                        .downcast::<FxFile>()
2684                        .expect("Should be file");
2685                    assert_eq!(crypt_file.vmo().info().unwrap().populated_bytes, 0);
2686                }
2687                for (_, id) in &hashes {
2688                    let file = volume
2689                        .get_or_load_node(*id, ObjectDescriptor::File, None)
2690                        .await
2691                        .expect("Opening file internally")
2692                        .into_any()
2693                        .downcast::<FxFile>()
2694                        .expect("Should be file");
2695                    assert_eq!(file.vmo().info().unwrap().populated_bytes, 0);
2696                }
2697
2698                fixture
2699                    .volume()
2700                    .volume()
2701                    .record_or_replay_profile(new_profile_state(false), "foo")
2702                    .await
2703                    .expect("Replaying");
2704
2705                // Move the file in flight to ensure a new version lands to be used next time.
2706                {
2707                    let store_id = fixture.volume().volume().store().store_object_id();
2708                    let dir = fixture.volume().volume().get_profile_directory().await.unwrap();
2709                    let old_file = dir.lookup("foo").await.unwrap().unwrap().0;
2710                    let mut transaction = fixture
2711                        .fs()
2712                        .clone()
2713                        .new_transaction(
2714                            lock_keys!(
2715                                LockKey::object(store_id, dir.object_id()),
2716                                LockKey::object(store_id, old_file),
2717                            ),
2718                            Options::default(),
2719                        )
2720                        .await
2721                        .unwrap();
2722                    replace_child(&mut transaction, Some((&dir, "foo")), (&dir, &i.to_string()))
2723                        .await
2724                        .expect("Replace old profile.");
2725                    transaction.commit().await.unwrap();
2726                    assert!(
2727                        dir.lookup("foo").await.unwrap().is_none(),
2728                        "Old profile should be moved"
2729                    );
2730                }
2731
2732                // Await all data being played back by checking that things have paged in.
2733                async {
2734                    for (_, id) in &hashes {
2735                        let file = volume
2736                            .get_or_load_node(*id, ObjectDescriptor::File, None)
2737                            .await
2738                            .expect("Opening file internally")
2739                            .into_any()
2740                            .downcast::<FxFile>()
2741                            .expect("Should be file");
2742                        while file.vmo().info().unwrap().populated_bytes == 0 {
2743                            fasync::Timer::new(Duration::from_millis(25)).await;
2744                        }
2745                    }
2746                }
2747                .on_timeout(std::time::Duration::from_secs(120), || {
2748                    panic!("Replay did not page in for all VMOs")
2749                })
2750                .await;
2751                // The crypt file access should not have been recorded or replayed.
2752                {
2753                    let crypt_file = volume
2754                        .get_or_load_node(crypt_file_id, ObjectDescriptor::File, None)
2755                        .await
2756                        .expect("Opening file internally")
2757                        .into_any()
2758                        .downcast::<FxFile>()
2759                        .expect("Should be file");
2760                    assert_eq!(crypt_file.vmo().info().unwrap().populated_bytes, 0);
2761                }
2762
2763                // Open all the files to show that they have been used.
2764                for (i, _) in &hashes {
2765                    let _file_proxy = open_file_checked(
2766                        fixture.root(),
2767                        &i.to_string(),
2768                        fio::PERM_READABLE,
2769                        &Default::default(),
2770                    )
2771                    .await;
2772                }
2773
2774                // Complete the recording.
2775                fixture.volume().volume().stop_profile_tasks().await;
2776            }
2777            device = fixture.close().await;
2778        }
2779    }
2780
2781    #[fuchsia::test(threads = 10)]
2782    async fn test_profile_update() {
2783        let mut hashes = Vec::new();
2784        let device = {
2785            let fixture = blob_testing::new_blob_fixture().await;
2786            for i in 0..2u64 {
2787                let hash =
2788                    fixture.write_blob(i.to_le_bytes().as_slice(), CompressionMode::Never).await;
2789                hashes.push(hash);
2790            }
2791            fixture.close().await
2792        };
2793        device.ensure_unique();
2794
2795        device.reopen(false);
2796        let device = {
2797            let fixture = blob_testing::open_blob_fixture(device).await;
2798
2799            {
2800                let volume = fixture.volume().volume();
2801                volume
2802                    .record_or_replay_profile(new_profile_state(true), "foo")
2803                    .await
2804                    .expect("Recording");
2805
2806                let original_recorded = RECORDED.load(Ordering::Relaxed);
2807
2808                // Page in the zero offsets only to avoid readahead strangeness.
2809                {
2810                    let mut writable = [0u8];
2811                    let hash = &hashes[0];
2812                    let vmo = fixture.get_blob_vmo(*hash).await;
2813                    vmo.read(&mut writable, 0).expect("Vmo read");
2814                }
2815
2816                // The recording happens asynchronously, so we must wait.  This is crude, but it's
2817                // only for testing and it's simple.
2818                while RECORDED.load(Ordering::Relaxed) == original_recorded {
2819                    fasync::Timer::new(std::time::Duration::from_millis(10)).await;
2820                }
2821
2822                volume.stop_profile_tasks().await;
2823            }
2824            fixture.close().await
2825        };
2826
2827        device.ensure_unique();
2828        device.reopen(false);
2829        let fixture = blob_testing::open_blob_fixture(device).await;
2830        {
2831            // Need to get the root vmo to check committed bytes.
2832            // Ensure that nothing is paged in right now.
2833            for hash in &hashes {
2834                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2835                assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2836            }
2837
2838            let volume = fixture.volume().volume();
2839
2840            volume
2841                .record_or_replay_profile(new_profile_state(true), "foo")
2842                .await
2843                .expect("Replaying");
2844
2845            // Await all data being played back by checking that things have paged in.
2846            async {
2847                let hash = &hashes[0];
2848                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2849                while blob.vmo().info().unwrap().populated_bytes == 0 {
2850                    fasync::Timer::new(Duration::from_millis(25)).await;
2851                }
2852            }
2853            .on_timeout(std::time::Duration::from_secs(120), || {
2854                panic!("Replay did not page in for all VMOs")
2855            })
2856            .await;
2857
2858            let original_recorded = RECORDED.load(Ordering::Relaxed);
2859
2860            // Record the new profile that will overwrite it.
2861            {
2862                let mut writable = [0u8];
2863                let hash = &hashes[1];
2864                let vmo = fixture.get_blob_vmo(*hash).await;
2865                vmo.read(&mut writable, 0).expect("Vmo read");
2866            }
2867
2868            // The recording happens asynchronously, so we must wait.  This is crude, but it's only
2869            // for testing and it's simple.
2870            while RECORDED.load(Ordering::Relaxed) == original_recorded {
2871                fasync::Timer::new(std::time::Duration::from_millis(10)).await;
2872            }
2873
2874            // Complete the recording.
2875            volume.stop_profile_tasks().await;
2876        }
2877        let device = fixture.close().await;
2878
2879        device.ensure_unique();
2880        device.reopen(false);
2881        let fixture = blob_testing::open_blob_fixture(device).await;
2882        {
2883            // Need to get the root vmo to check committed bytes.
2884            // Ensure that nothing is paged in right now.
2885            for hash in &hashes {
2886                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2887                assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2888            }
2889
2890            fixture
2891                .volume()
2892                .volume()
2893                .record_or_replay_profile(new_profile_state(true), "foo")
2894                .await
2895                .expect("Replaying");
2896
2897            // Await all data being played back by checking that things have paged in.
2898            async {
2899                let hash = &hashes[1];
2900                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2901                while blob.vmo().info().unwrap().populated_bytes == 0 {
2902                    fasync::Timer::new(Duration::from_millis(25)).await;
2903                }
2904            }
2905            .on_timeout(std::time::Duration::from_secs(30), || {
2906                panic!("Replay did not page in for all VMOs")
2907            })
2908            .await;
2909
2910            // Complete the recording.
2911            fixture.volume().volume().stop_profile_tasks().await;
2912
2913            // Verify that first blob was not paged in as the it should be dropped from the profile.
2914            {
2915                let hash = &hashes[0];
2916                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2917                assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2918            }
2919        }
2920        fixture.close().await;
2921    }
2922
2923    #[fuchsia::test(threads = 10)]
2924    async fn test_unencrypted_volume() {
2925        let fixture = TestFixture::new_unencrypted().await;
2926        let root = fixture.root();
2927
2928        let f = open_file_checked(
2929            &root,
2930            "foo",
2931            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
2932            &Default::default(),
2933        )
2934        .await;
2935        close_file_checked(f).await;
2936
2937        fixture.close().await;
2938    }
2939
2940    #[fuchsia::test]
2941    async fn test_read_only_unencrypted_volume() {
2942        // Make a new Fxfs filesystem with an unencrypted volume named "vol".
2943        let fs = {
2944            let device = fxfs::filesystem::mkfs_with_volume(
2945                DeviceHolder::new(FakeDevice::new(8192, 512)),
2946                "vol",
2947                None,
2948            )
2949            .await
2950            .unwrap();
2951            // Re-open the device as read-only and mount the filesystem as read-only.
2952            device.reopen(true);
2953            FxFilesystemBuilder::new().read_only(true).open(device).await.unwrap()
2954        };
2955        // Ensure we can access the volume and gracefully terminate any tasks.
2956        {
2957            let root_volume = root_volume(fs.clone()).await.unwrap();
2958            let store = root_volume.volume("vol", StoreOptions::default()).await.unwrap();
2959            let unique_id = store.store_object_id();
2960            let blob_resupplied_count =
2961                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2962            let volume = FxVolume::new(
2963                Weak::new(),
2964                store,
2965                unique_id,
2966                "vol".to_owned(),
2967                blob_resupplied_count,
2968                MemoryPressureConfig::default(),
2969            )
2970            .unwrap();
2971            volume.terminate().await;
2972        }
2973        // Close the filesystem, and make sure we don't have any dangling references.
2974        fs.close().await.unwrap();
2975        let device = fs.take_device().await;
2976        device.ensure_unique();
2977    }
2978
2979    #[fuchsia::test]
2980    async fn test_read_only_encrypted_volume() {
2981        let crypt: Arc<CryptBase> = Arc::new(new_insecure_crypt());
2982        // Make a new Fxfs filesystem with an encrypted volume named "vol".
2983        let fs = {
2984            let device = fxfs::filesystem::mkfs_with_volume(
2985                DeviceHolder::new(FakeDevice::new(8192, 512)),
2986                "vol",
2987                Some(crypt.clone()),
2988            )
2989            .await
2990            .unwrap();
2991            // Re-open the device as read-only and mount the filesystem as read-only.
2992            device.reopen(true);
2993            FxFilesystemBuilder::new().read_only(true).open(device).await.unwrap()
2994        };
2995        // Ensure we can access the volume and gracefully terminate any tasks.
2996        {
2997            let root_volume = root_volume(fs.clone()).await.unwrap();
2998            let store = root_volume
2999                .volume("vol", StoreOptions { crypt: Some(crypt), ..StoreOptions::default() })
3000                .await
3001                .unwrap();
3002            let unique_id = store.store_object_id();
3003            let blob_resupplied_count =
3004                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
3005            let volume = FxVolume::new(
3006                Weak::new(),
3007                store,
3008                unique_id,
3009                "vol".to_owned(),
3010                blob_resupplied_count,
3011                MemoryPressureConfig::default(),
3012            )
3013            .unwrap();
3014            volume.terminate().await;
3015        }
3016        // Close the filesystem, and make sure we don't have any dangling references.
3017        fs.close().await.unwrap();
3018        let device = fs.take_device().await;
3019        device.ensure_unique();
3020    }
3021}