Skip to main content

fxfs_platform_testing/fuchsia/
volume.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::component::map_to_raw_status;
6use crate::fuchsia::directory::FxDirectory;
7use crate::fuchsia::dirent_cache::DirentCache;
8use crate::fuchsia::file::{FlushType, FxFile};
9use crate::fuchsia::memory_pressure::{MemoryPressureLevel, MemoryPressureMonitor};
10use crate::fuchsia::node::{FxNode, GetResult, NodeCache};
11use crate::fuchsia::pager::Pager;
12use crate::fuchsia::profile::ProfileState;
13use crate::fuchsia::symlink::FxSymlink;
14use crate::fuchsia::volumes_directory::VolumesDirectory;
15use anyhow::{Error, bail, ensure};
16use async_trait::async_trait;
17use fidl::endpoints::ServerEnd;
18use fidl_fuchsia_fxfs::{
19    BytesAndNodes, FileBackedVolumeProviderRequest, FileBackedVolumeProviderRequestStream,
20    ProjectIdRequest, ProjectIdRequestStream, ProjectIterToken,
21};
22use fidl_fuchsia_io as fio;
23use fs_inspect::{FsInspectVolume, VolumeData};
24use fuchsia_async as fasync;
25use fuchsia_async::epoch::Epoch;
26use fuchsia_sync::Mutex;
27use futures::channel::oneshot;
28use futures::stream::{self, FusedStream, Stream};
29use futures::{FutureExt, StreamExt, TryStreamExt};
30use fxfs::errors::FxfsError;
31use fxfs::filesystem::{self, SyncOptions};
32use fxfs::future_with_guard::FutureWithGuard;
33use fxfs::log::*;
34use fxfs::object_store::directory::Directory;
35use fxfs::object_store::project_id::ProjectIdExt;
36use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
37use fxfs::object_store::{
38    DirType, HandleOptions, HandleOwner, ObjectDescriptor, ObjectStore, ProjectId,
39};
40use refaults_vmo::PageRefaultCounter;
41use std::future::Future;
42use std::pin::pin;
43#[cfg(any(test, feature = "testing"))]
44use std::sync::atomic::AtomicBool;
45use std::sync::atomic::{AtomicU64, Ordering};
46use std::sync::{Arc, Weak};
47use std::time::Duration;
48use vfs::directory::entry::DirectoryEntry;
49use vfs::directory::simple::Simple;
50use vfs::execution_scope::ExecutionScope;
51
52// LINT.IfChange
53// TODO:(b/299919008) Fix this number to something reasonable, or maybe just for fxblob.
54const DIRENT_CACHE_LIMIT: usize = 2000;
55// LINT.ThenChange(//src/storage/stressor/src/aggressive.rs)
56
57/// The read ahead/around size to target. Increase reads to be this size within the restrictions of
58/// the format for the target object.
59pub const READ_AHEAD_SIZE: u64 = 128 * 1024;
60
61const PROFILE_DIRECTORY: &str = "profiles";
62
63#[derive(Clone, Copy, Debug)]
64pub struct MemoryPressureLevelConfig {
65    /// The period to wait between flushes, as well as perform other background maintenance tasks
66    /// (e.g. purging caches).
67    pub background_task_period: Duration,
68
69    /// The limit of cached nodes.
70    pub cache_size_limit: usize,
71
72    /// The initial delay before the background task runs. The background task has a longer initial
73    /// delay to avoid running the task during boot.
74    pub background_task_initial_delay: Duration,
75}
76
77impl Default for MemoryPressureLevelConfig {
78    fn default() -> Self {
79        Self {
80            background_task_period: Duration::from_secs(20),
81            cache_size_limit: DIRENT_CACHE_LIMIT,
82            background_task_initial_delay: Duration::from_secs(70),
83        }
84    }
85}
86
87#[derive(Clone, Copy, Debug)]
88pub struct MemoryPressureConfig {
89    /// The configuration to use at [`MemoryPressureLevel::Normal`].
90    pub mem_normal: MemoryPressureLevelConfig,
91
92    /// The configuration to use at [`MemoryPressureLevel::Warning`].
93    pub mem_warning: MemoryPressureLevelConfig,
94
95    /// The configuration to use at [`MemoryPressureLevel::Critical`].
96    pub mem_critical: MemoryPressureLevelConfig,
97}
98
99impl MemoryPressureConfig {
100    pub fn for_level(&self, level: &MemoryPressureLevel) -> &MemoryPressureLevelConfig {
101        match level {
102            MemoryPressureLevel::Normal => &self.mem_normal,
103            MemoryPressureLevel::Warning => &self.mem_warning,
104            MemoryPressureLevel::Critical => &self.mem_critical,
105        }
106    }
107}
108
109impl Default for MemoryPressureConfig {
110    fn default() -> Self {
111        // TODO(https://fxbug.dev/42061389): investigate a smarter strategy for determining flush
112        // frequency.
113        Self {
114            mem_normal: MemoryPressureLevelConfig {
115                background_task_period: Duration::from_secs(20),
116                cache_size_limit: DIRENT_CACHE_LIMIT,
117                background_task_initial_delay: Duration::from_secs(70),
118            },
119            mem_warning: MemoryPressureLevelConfig {
120                background_task_period: Duration::from_secs(5),
121                cache_size_limit: 100,
122                background_task_initial_delay: Duration::from_secs(5),
123            },
124            mem_critical: MemoryPressureLevelConfig {
125                background_task_period: Duration::from_millis(1500),
126                cache_size_limit: 20,
127                background_task_initial_delay: Duration::from_millis(1500),
128            },
129        }
130    }
131}
132
133/// FxVolume represents an opened volume. It is also a (weak) cache for all opened Nodes within the
134/// volume.
135pub struct FxVolume {
136    parent: Weak<VolumesDirectory>,
137    cache: NodeCache,
138    store: Arc<ObjectStore>,
139    pager: Pager,
140    executor: fasync::EHandle,
141    name: String,
142
143    // A tuple of the actual task and a channel to signal to terminate the task.
144    background_task: Mutex<Option<(fasync::Task<()>, oneshot::Sender<()>)>>,
145
146    // Unique identifier of the filesystem that owns this volume.
147    fs_id: u64,
148
149    // The execution scope for this volume.
150    scope: ExecutionScope,
151
152    dirent_cache: DirentCache,
153
154    profile_state: Mutex<Option<Box<dyn ProfileState>>>,
155
156    #[cfg(any(test, feature = "testing"))]
157    poisoned: AtomicBool,
158
159    blob_resupplied_count: Arc<PageRefaultCounter>,
160
161    /// The number of dirty bytes in pager backed VMOs that belong to this volume. VolumesDirectory
162    /// holds a count for all volumes. This count is only used for tracing.
163    pager_dirty_byte_count: AtomicU64,
164}
165
166#[fxfs_trace::trace]
167impl FxVolume {
168    pub fn new(
169        parent: Weak<VolumesDirectory>,
170        store: Arc<ObjectStore>,
171        fs_id: u64,
172        name: String,
173        blob_resupplied_count: Arc<PageRefaultCounter>,
174        memory_pressure_config: MemoryPressureConfig,
175    ) -> Result<Self, Error> {
176        let scope = ExecutionScope::new();
177        Ok(Self {
178            parent,
179            cache: NodeCache::new(),
180            store,
181            name,
182            pager: Pager::new(scope.clone())?,
183            executor: fasync::EHandle::local(),
184            background_task: Mutex::new(None),
185            fs_id,
186            scope,
187            dirent_cache: DirentCache::new(memory_pressure_config.mem_normal.cache_size_limit),
188            profile_state: Mutex::new(None),
189            #[cfg(any(test, feature = "testing"))]
190            poisoned: AtomicBool::new(false),
191            blob_resupplied_count,
192            pager_dirty_byte_count: AtomicU64::new(0),
193        })
194    }
195
196    pub fn store(&self) -> &Arc<ObjectStore> {
197        &self.store
198    }
199
200    pub fn cache(&self) -> &NodeCache {
201        &self.cache
202    }
203
204    pub fn dirent_cache(&self) -> &DirentCache {
205        &self.dirent_cache
206    }
207
208    pub fn pager(&self) -> &Pager {
209        &self.pager
210    }
211
212    pub fn id(&self) -> u64 {
213        self.fs_id
214    }
215
216    pub fn scope(&self) -> &ExecutionScope {
217        &self.scope
218    }
219
220    pub fn blob_resupplied_count(&self) -> &PageRefaultCounter {
221        &self.blob_resupplied_count
222    }
223
224    pub fn name(&self) -> &str {
225        &self.name
226    }
227
228    /// Reports the filesystem info, but if the volume has a space limit applied then the space
229    /// available and space used are reported based on the volume instead.
230    pub fn filesystem_info_for_volume(&self) -> fio::FilesystemInfo {
231        let allocator = self.store.filesystem().allocator();
232        let info =
233            if let Some(limit) = allocator.get_owner_bytes_limit(self.store.store_object_id()) {
234                filesystem::Info {
235                    used_bytes: allocator.get_owner_bytes_used(self.store.store_object_id()),
236                    total_bytes: limit,
237                }
238            } else {
239                self.store.filesystem().get_info()
240            };
241
242        info_to_filesystem_info(info, self.store.block_size(), self.store.object_count(), self.id())
243    }
244
245    /// Stop profiling, recover resources from it and finalize recordings.
246    pub async fn stop_profile_tasks(self: &Arc<Self>) {
247        let Some(mut state) = self.profile_state.lock().take() else { return };
248        state.wait_for_replay_to_finish().await;
249        self.pager.set_recorder(None);
250        let _ = state.wait_for_recording_to_finish().await;
251    }
252
253    /// Opens or creates the profile directory in the volume's internal directory.
254    pub async fn get_profile_directory(self: &Arc<Self>) -> Result<Directory<FxVolume>, Error> {
255        let internal_dir = self
256            .get_or_create_internal_dir()
257            .await
258            .map_err(|e| e.context("Opening internal directory"))?;
259        // Have to do separate calls to create the profile dir if necessary.
260        let mut transaction = self
261            .store()
262            .filesystem()
263            .new_transaction(
264                lock_keys![LockKey::object(
265                    self.store().store_object_id(),
266                    internal_dir.object_id(),
267                )],
268                Options::default(),
269            )
270            .await?;
271        Ok(match internal_dir.directory().lookup(PROFILE_DIRECTORY).await? {
272            Some((object_id, _, _)) => {
273                Directory::open_unchecked(self.clone(), object_id, DirType::Normal)
274            }
275            None => {
276                let new_dir = internal_dir
277                    .directory()
278                    .create_child_dir(&mut transaction, PROFILE_DIRECTORY)
279                    .await?;
280                transaction.commit().await?;
281                new_dir
282            }
283        })
284    }
285
286    /// Starts recording a profile for the volume under the name given, and if a profile exists
287    /// under that same name it is replayed and will be replaced after by the new recording if it
288    /// is cleanly shutdown and finalized.
289    pub async fn record_or_replay_profile(
290        self: &Arc<Self>,
291        mut state: Box<dyn ProfileState>,
292        name: &str,
293    ) -> Result<(), Error> {
294        // We don't meddle in FxDirectory or FxFile here because we don't want a paged object.
295        // Normally we ensure that there's only one copy by using the Node cache on the volume, but
296        // that would create FxFile, so in this case we just assume that only one profile operation
297        // should be ongoing at a time, as that is ensured in `VolumesDirectory`.
298
299        // If there is a recording already, prepare to replay it.
300        let profile_dir = self.get_profile_directory().await?;
301        let replay_handle = if let Some((id, descriptor, _)) = profile_dir.lookup(name).await? {
302            ensure!(matches!(descriptor, ObjectDescriptor::File), FxfsError::Inconsistent);
303            Some(Box::new(
304                ObjectStore::open_object(self, id, HandleOptions::default(), None).await?,
305            ))
306        } else {
307            None
308        };
309
310        let mut profile_state = self.profile_state.lock();
311
312        info!("Recording new profile '{name}' for volume object {}", self.store.store_object_id());
313        // Begin recording first to ensure that we capture any activity from the replay.
314        self.pager.set_recorder(Some(state.record_new(self, name)));
315        if let Some(handle) = replay_handle {
316            if let Some(guard) = self.scope().try_active_guard() {
317                state.replay_profile(handle, self.clone(), guard);
318                info!(
319                    "Replaying existing profile '{name}' for volume object {}",
320                    self.store.store_object_id()
321                );
322            }
323        }
324        *profile_state = Some(state);
325        Ok(())
326    }
327
328    async fn get_or_create_internal_dir(self: &Arc<Self>) -> Result<Arc<FxDirectory>, Error> {
329        let internal_data_id = self.store().get_or_create_internal_directory_id().await?;
330        let internal_dir = self
331            .get_or_load_node(internal_data_id, ObjectDescriptor::Directory, None)
332            .await?
333            .into_any()
334            .downcast::<FxDirectory>()
335            .unwrap();
336        Ok(internal_dir)
337    }
338
339    pub async fn terminate(&self) {
340        let task = std::mem::replace(&mut *self.background_task.lock(), None);
341        if let Some((task, terminate)) = task {
342            let _ = terminate.send(());
343            task.await;
344        }
345
346        // `NodeCache::terminate` will break any strong reference cycles contained within nodes
347        // (pager registration). The only remaining nodes should be those with open FIDL
348        // connections or vmo references in the process of handling the VMO_ZERO_CHILDREN signal.
349        // `ExecutionScope::shutdown` + `ExecutionScope::wait` will close the open FIDL connections
350        // and synchonrize the signal handling which should result in all nodes flushing and then
351        // dropping. Any async tasks required to flush a node should take an active guard on the
352        // `ExecutionScope` which will prevent `ExecutionScope::wait` from completing until all
353        // nodes are flushed.
354        self.scope.shutdown();
355        self.cache.terminate();
356        self.scope.wait().await;
357
358        // Make sure there are no deferred operations still pending for this volume.
359        Epoch::global().barrier().await;
360
361        // The dirent_cache must be cleared *after* shutting down the scope because there can be
362        // tasks that insert entries into the cache.
363        self.dirent_cache.clear();
364
365        if self.store.filesystem().options().read_only {
366            // If the filesystem is read only, we don't need to flush/sync anything.
367            if self.store.is_unlocked() {
368                self.store.lock_read_only();
369            }
370            return;
371        }
372
373        self.flush_all_files(FlushType::LastChance).await;
374        self.store.filesystem().graveyard().flush().await;
375        if self.store.crypt().is_some() {
376            if let Err(e) = self.store.lock().await {
377                // The store will be left in a safe state and there won't be data-loss unless
378                // there's an issue flushing the journal later.
379                warn!(error:? = e; "Locking store error");
380            }
381        }
382        let sync_status = self
383            .store
384            .filesystem()
385            .sync(SyncOptions { flush_device: true, ..Default::default() })
386            .await;
387        if let Err(e) = sync_status {
388            error!(error:? = e; "Failed to sync filesystem; data may be lost");
389        }
390    }
391
392    /// Attempts to get a node from the node cache. If the node wasn't present in the cache, loads
393    /// the object from the object store, installing the returned node into the cache and returns
394    /// the newly created FxNode backed by the loaded object.  |parent| is only set on the node if
395    /// the node was not present in the cache.  Otherwise, it is ignored.
396    pub async fn get_or_load_node(
397        self: &Arc<Self>,
398        object_id: u64,
399        object_descriptor: ObjectDescriptor,
400        parent: Option<Arc<FxDirectory>>,
401    ) -> Result<Arc<dyn FxNode>, Error> {
402        match self.cache.get_or_reserve(object_id).await {
403            GetResult::Node(node) => Ok(node),
404            GetResult::Placeholder(placeholder) => {
405                let node = match object_descriptor {
406                    ObjectDescriptor::File => FxFile::new(
407                        ObjectStore::open_object(self, object_id, HandleOptions::default(), None)
408                            .await?,
409                    ) as Arc<dyn FxNode>,
410                    ObjectDescriptor::Directory => {
411                        // Can't use open_unchecked because we don't know if the dir is casefolded
412                        // or encrypted.
413                        Arc::new(FxDirectory::new(parent, Directory::open(self, object_id).await?))
414                            as Arc<dyn FxNode>
415                    }
416                    ObjectDescriptor::Symlink => Arc::new(FxSymlink::new(self.clone(), object_id)),
417                    _ => bail!(FxfsError::Inconsistent),
418                };
419                placeholder.commit(&node);
420                Ok(node)
421            }
422        }
423    }
424
425    /// Marks the given directory deleted.
426    pub fn mark_directory_deleted(&self, object_id: u64) {
427        if let Some(node) = self.cache.get(object_id) {
428            // It's possible that node is a placeholder, in which case we don't need to wait for it
429            // to be resolved because it should be blocked behind the locks that are held by the
430            // caller, and once they're dropped, it'll be found to be deleted via the tree.
431            if let Ok(dir) = node.into_any().downcast::<FxDirectory>() {
432                dir.set_deleted();
433            }
434        }
435    }
436
437    /// Removes resources associated with |object_id| (which ought to be a file), if there are no
438    /// open connections to that file.
439    ///
440    /// This must be called *after committing* a transaction which deletes the last reference to
441    /// |object_id|, since before that point, new connections could be established.
442    pub(super) async fn maybe_purge_file(&self, object_id: u64) -> Result<(), Error> {
443        if let Some(node) = self.cache.get(object_id) {
444            node.clone().mark_to_be_purged();
445            return Ok(());
446        }
447        // If this fails, the graveyard should clean it up on next mount.
448        self.store
449            .tombstone_object(
450                object_id,
451                Options { borrow_metadata_space: true, ..Default::default() },
452            )
453            .await?;
454        Ok(())
455    }
456
457    /// Starts the background work task.  This task will periodically:
458    ///   - scan all files and flush them to disk, and
459    ///   - purge unused cached data.
460    /// The task will hold a strong reference to the FxVolume while it is running, so the task must
461    /// be closed later with Self::terminate, or the FxVolume will never be dropped.
462    pub fn start_background_task(
463        self: &Arc<Self>,
464        config: MemoryPressureConfig,
465        mem_monitor: Option<&MemoryPressureMonitor>,
466    ) {
467        let mut background_task = self.background_task.lock();
468        if background_task.is_none() {
469            let (tx, rx) = oneshot::channel();
470
471            let task = if let Some(mem_monitor) = mem_monitor {
472                fasync::Task::spawn(self.clone().background_task(
473                    config,
474                    mem_monitor.get_level_stream(),
475                    rx,
476                ))
477            } else {
478                // With no memory pressure monitoring, just stub the stream out as always pending.
479                fasync::Task::spawn(self.clone().background_task(config, stream::pending(), rx))
480            };
481
482            *background_task = Some((task, tx));
483        }
484    }
485
486    #[trace]
487    async fn background_task(
488        self: Arc<Self>,
489        config: MemoryPressureConfig,
490        mut level_stream: impl Stream<Item = MemoryPressureLevel> + FusedStream + Unpin,
491        terminate: oneshot::Receiver<()>,
492    ) {
493        debug!(store_id = self.store.store_object_id(); "FxVolume::background_task start");
494        let mut terminate = terminate.fuse();
495        // Default to the normal period until updates come from the `level_stream`.
496        let mut level = MemoryPressureLevel::Normal;
497        let mut timer =
498            pin!(fasync::Timer::new(config.for_level(&level).background_task_initial_delay));
499
500        loop {
501            let mut should_terminate = false;
502            let mut should_flush = false;
503            let mut low_mem = false;
504            let mut should_purge_layer_files = false;
505            let mut should_update_cache_limit = false;
506
507            futures::select_biased! {
508                _ = terminate => should_terminate = true,
509                new_level = level_stream.next() => {
510                    // Because `level_stream` will never terminate, this is safe to unwrap.
511                    let new_level = new_level.unwrap();
512                    // At critical levels, it's okay to undertake expensive work immediately
513                    // to reclaim memory.
514                    low_mem = matches!(new_level, MemoryPressureLevel::Critical);
515                    should_purge_layer_files = true;
516                    if new_level != level {
517                        level = new_level;
518                        should_update_cache_limit = true;
519                        let level_config = config.for_level(&level);
520                        timer.as_mut().reset(fasync::MonotonicInstant::after(
521                            level_config.background_task_period.into())
522                        );
523                        debug!(
524                            "Background task period changed to {:?} due to new memory pressure \
525                            level ({:?}).",
526                            config.for_level(&level).background_task_period, level
527                        );
528                    }
529                }
530                _ = timer => {
531                    timer.as_mut().reset(fasync::MonotonicInstant::after(
532                        config.for_level(&level).background_task_period.into())
533                    );
534                    should_flush = true;
535                    // Only purge layer file caches once we have elevated memory pressure.
536                    should_purge_layer_files = !matches!(level, MemoryPressureLevel::Normal);
537                }
538            };
539            if should_terminate {
540                break;
541            }
542            // Maybe close extra files *before* iterating them for flush/low mem.
543            if should_update_cache_limit {
544                self.dirent_cache.set_limit(config.for_level(&level).cache_size_limit);
545            }
546            if should_flush {
547                self.flush_all_files(FlushType::Sync).await;
548                self.dirent_cache.recycle_stale_files();
549            } else if low_mem {
550                // This is a softer version of flushing files, so don't bother if we're flushing.
551                self.minimize_memory().await;
552            }
553            if should_purge_layer_files {
554                for layer in self.store.tree().immutable_layer_set().layers {
555                    layer.purge_cached_data();
556                }
557            }
558        }
559        debug!(store_id = self.store.store_object_id(); "FxVolume::background_task end");
560    }
561
562    /// Reports that a certain number of bytes will be dirtied in a pager-backed VMO.
563    ///
564    /// Note that this function may await flush tasks.
565    pub fn report_pager_dirty(
566        self: Arc<Self>,
567        byte_count: u64,
568        mark_dirty: impl FnOnce() + Send + 'static,
569    ) {
570        let this = self.clone();
571        let callback = move || {
572            mark_dirty();
573            let prev = this.pager_dirty_byte_count.fetch_add(byte_count, Ordering::Relaxed);
574            fxfs_trace::counter!("dirty-bytes", 0, this.name => prev.saturating_add(byte_count));
575        };
576        if let Some(parent) = self.parent.upgrade() {
577            parent.report_pager_dirty(byte_count, self, callback);
578        } else {
579            callback();
580        }
581    }
582
583    /// Reports that a certain number of bytes were cleaned in a pager-backed VMO.
584    pub fn report_pager_clean(&self, byte_count: u64) {
585        if let Some(parent) = self.parent.upgrade() {
586            parent.report_pager_clean(byte_count);
587        }
588        let prev = self.pager_dirty_byte_count.fetch_sub(byte_count, Ordering::Relaxed);
589        fxfs_trace::counter!("dirty-bytes", 0, self.name => prev.saturating_sub(byte_count));
590    }
591
592    #[trace]
593    pub async fn flush_all_files(&self, flush_type: FlushType) {
594        let mut flushed = 0;
595        for file in self.cache.files() {
596            if let Some(node) = file.into_opened_node() {
597                if let Err(e) = FxFile::flush(&node, flush_type).await {
598                    warn!(
599                        store_id = self.store.store_object_id(),
600                        oid = node.object_id(),
601                        error:? = e;
602                        "Failed to flush",
603                    )
604                }
605                if flush_type == FlushType::LastChance {
606                    let file = node.clone();
607                    std::mem::drop(node);
608                    file.force_clean();
609                }
610            }
611            flushed += 1;
612        }
613        debug!(store_id = self.store.store_object_id(), file_count = flushed; "FxVolume flushed");
614    }
615
616    /// Flushes only files with dirty pages.
617    ///
618    /// `PagedObjectHandle` tracks the number dirty pages locally (except for overwrite files) which
619    /// makes determining whether flushing a file will reduce the number of dirty pages efficient.
620    /// `flush_all_files` checks if any metadata needs to be flushed which involves a syscall making
621    /// it significantly slower when there are lots of open files without dirty pages.
622    #[trace]
623    pub async fn minimize_memory(&self) {
624        for file in self.cache.files() {
625            if let Some(node) = file.into_opened_node() {
626                if let Err(e) = node.handle().minimize_memory().await {
627                    warn!(
628                        store_id = self.store.store_object_id(),
629                        oid = node.object_id(),
630                        error:? = e;
631                        "Failed to flush",
632                    )
633                }
634            }
635        }
636    }
637
638    /// Spawns a short term task for the volume that includes a guard that will prevent termination.
639    pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) {
640        if let Some(guard) = self.scope.try_active_guard() {
641            self.executor.spawn_detached(FutureWithGuard::new(guard, task));
642        }
643    }
644
645    /// Tries to unwrap this volume.  If it fails, it will poison the volume so that when it is
646    /// dropped, you get a backtrace.
647    #[cfg(any(test, feature = "testing"))]
648    pub fn try_unwrap(self: Arc<Self>) -> Option<FxVolume> {
649        self.poisoned.store(true, Ordering::Relaxed);
650        match Arc::try_unwrap(self) {
651            Ok(volume) => {
652                volume.poisoned.store(false, Ordering::Relaxed);
653                Some(volume)
654            }
655            Err(this) => {
656                // Log details about all the places where there might be a reference cycle.
657                info!(
658                    "background_task: {}, profile_state: {}, dirent_cache count: {}, \
659                     pager strong file refs={}, no tasks={}",
660                    this.background_task.lock().is_some(),
661                    this.profile_state.lock().is_some(),
662                    this.dirent_cache.len(),
663                    crate::pager::STRONG_FILE_REFS.load(Ordering::Relaxed),
664                    {
665                        let mut no_tasks = pin!(this.scope.wait());
666                        no_tasks
667                            .poll_unpin(&mut std::task::Context::from_waker(
668                                std::task::Waker::noop(),
669                            ))
670                            .is_ready()
671                    },
672                );
673                None
674            }
675        }
676    }
677
678    pub async fn handle_file_backed_volume_provider_requests(
679        this: Weak<Self>,
680        scope: ExecutionScope,
681        mut requests: FileBackedVolumeProviderRequestStream,
682    ) -> Result<(), Error> {
683        while let Some(request) = requests.try_next().await? {
684            match request {
685                FileBackedVolumeProviderRequest::Open {
686                    parent_directory_token,
687                    name,
688                    server_end,
689                    control_handle: _,
690                } => {
691                    // Try and get an active guard before upgrading.
692                    let Some(_guard) = scope.try_active_guard() else {
693                        bail!("Volume shutting down")
694                    };
695                    let Some(this) = this.upgrade() else { bail!("FxVolume dropped") };
696                    match this
697                        .scope
698                        .token_registry()
699                        // NB: For now, we only expect these calls in a regular (non-blob) volume.
700                        // Hard-code the type for simplicity; attempts to call on a blob volume will
701                        // get an error.
702                        .get_owner_and_rights(parent_directory_token)
703                        .and_then(|dir| {
704                            dir.ok_or(zx::Status::BAD_HANDLE).and_then(|(dir, rights)| {
705                                if !rights.contains(fio::Rights::MODIFY_DIRECTORY) {
706                                    return Err(zx::Status::BAD_HANDLE);
707                                }
708                                dir.into_any()
709                                    .downcast::<FxDirectory>()
710                                    .map_err(|_| zx::Status::BAD_HANDLE)
711                            })
712                        }) {
713                        Ok(dir) => {
714                            dir.open_block_file(&name, server_end).await;
715                        }
716                        Err(status) => {
717                            let _ = server_end.close_with_epitaph(status).unwrap_or_else(|e| {
718                                error!(error:? = e; "open failed to send epitaph");
719                            });
720                        }
721                    }
722                }
723            }
724        }
725        Ok(())
726    }
727
728    pub async fn handle_project_id_requests(
729        this: Weak<Self>,
730        scope: ExecutionScope,
731        mut requests: ProjectIdRequestStream,
732    ) -> Result<(), Error> {
733        while let Some(request) = requests.try_next().await? {
734            // Try and get an active guard before upgrading.
735            let Some(_guard) = scope.try_active_guard() else { bail!("Volume shutting down") };
736            let Some(this) = this.upgrade() else { bail!("FxVolume dropped") };
737            let store_id = this.store.store_object_id();
738
739            match request {
740                ProjectIdRequest::SetLimit { responder, project_id, bytes, nodes } => {
741                    let result = if let Some(project_id) = ProjectId::new(project_id) {
742                        this.store()
743                        .set_project_limit(project_id, bytes, nodes)
744                        .await
745                        .map_err(|error| {
746                            error!(error:?, store_id, project_id; "Failed to set project limit");
747                            map_to_raw_status(error)
748                        })
749                    } else {
750                        Err(zx::Status::OUT_OF_RANGE.into_raw())
751                    };
752                    responder.send(result)?
753                }
754                ProjectIdRequest::Clear { responder, project_id } => {
755                    let result = if let Some(project_id) = ProjectId::new(project_id) {
756                        this.store().clear_project_limit(project_id).await.map_err(|error| {
757                            error!(error:?, store_id, project_id; "Failed to clear project limit");
758                            map_to_raw_status(error)
759                        })
760                    } else {
761                        Err(zx::Status::OUT_OF_RANGE.into_raw())
762                    };
763                    responder.send(result)?
764                }
765                ProjectIdRequest::SetForNode { responder, node_id, project_id } => {
766                    let result = if let Some(project_id) = ProjectId::new(project_id) {
767                        this.store()
768                        .set_project_for_node(node_id, project_id)
769                        .await
770                        .map_err(|error| {
771                            error!(error:?, store_id, node_id, project_id; "Failed to apply node.");
772                            map_to_raw_status(error)
773                        })
774                    } else {
775                        Err(zx::Status::OUT_OF_RANGE.into_raw())
776                    };
777                    responder.send(result)?
778                }
779                ProjectIdRequest::GetForNode { responder, node_id } => responder.send(
780                    this.store()
781                        .get_project_for_node(node_id)
782                        .await
783                        .map(ProjectIdExt::raw)
784                        .map_err(|error| {
785                            error!(error:?, store_id, node_id; "Failed to get node.");
786                            map_to_raw_status(error)
787                        }),
788                )?,
789                ProjectIdRequest::ClearForNode { responder, node_id } => responder.send(
790                    this.store().clear_project_for_node(node_id).await.map_err(|error| {
791                        error!(error:?, store_id, node_id; "Failed to clear for node.");
792                        map_to_raw_status(error)
793                    }),
794                )?,
795                ProjectIdRequest::List { responder, token } => {
796                    responder.send(match this.list_projects(&token).await {
797                        Ok((ref entries, ref next_token)) => Ok((entries, next_token.as_ref())),
798                        Err(error) => {
799                            error!(error:?, store_id, token:?; "Failed to list projects.");
800                            Err(map_to_raw_status(error))
801                        }
802                    })?
803                }
804                ProjectIdRequest::Info { responder, project_id } => {
805                    responder.send(match this.project_info(project_id).await {
806                        Ok((ref limit, ref usage)) => Ok((limit, usage)),
807                        Err(error) => {
808                            error!(error:?, store_id, project_id; "Failed to get project info.");
809                            Err(map_to_raw_status(error))
810                        }
811                    })?
812                }
813            }
814        }
815        Ok(())
816    }
817
818    // Maximum entries to fit based on 64KiB message size minus 16 bytes of header, 16 bytes
819    // of vector header, 16 bytes for the optional token header, and 8 bytes of token value.
820    // https://fuchsia.dev/fuchsia-src/development/languages/fidl/guides/max-out-pagination
821    const MAX_PROJECT_ENTRIES: usize = 8184;
822
823    // Calls out to the inner volume to list available projects, removing and re-adding the fidl
824    // wrapper types for the pagination token.
825    async fn list_projects(
826        &self,
827        last_token: &Option<Box<ProjectIterToken>>,
828    ) -> Result<(Vec<u64>, Option<ProjectIterToken>), Error> {
829        let (entries, token) = self
830            .store()
831            .list_projects(
832                last_token.as_ref().and_then(|v| ProjectId::new(v.value)),
833                Self::MAX_PROJECT_ENTRIES,
834            )
835            .await?;
836        Ok((
837            entries.into_iter().map(ProjectId::raw).collect(),
838            token.map(|value| ProjectIterToken { value: value.raw() }),
839        ))
840    }
841
842    async fn project_info(&self, project_id: u64) -> Result<(BytesAndNodes, BytesAndNodes), Error> {
843        let project_id = ProjectId::new(project_id).ok_or(FxfsError::OutOfRange)?;
844        let (limit, usage) = self.store().project_info(project_id).await?;
845        // At least one of them needs to be around to return anything.
846        ensure!(limit.is_some() || usage.is_some(), FxfsError::NotFound);
847        Ok((
848            limit.map_or_else(
849                || BytesAndNodes { bytes: u64::MAX, nodes: u64::MAX },
850                |v| BytesAndNodes { bytes: v.0, nodes: v.1 },
851            ),
852            usage.map_or_else(
853                || BytesAndNodes { bytes: 0, nodes: 0 },
854                |v| BytesAndNodes { bytes: v.0, nodes: v.1 },
855            ),
856        ))
857    }
858}
859
860#[cfg(any(test, feature = "testing"))]
861impl Drop for FxVolume {
862    fn drop(&mut self) {
863        assert!(!*self.poisoned.get_mut());
864    }
865}
866
867impl HandleOwner for FxVolume {}
868
869impl AsRef<ObjectStore> for FxVolume {
870    fn as_ref(&self) -> &ObjectStore {
871        &self.store
872    }
873}
874
875#[async_trait]
876impl FsInspectVolume for FxVolume {
877    async fn get_volume_data(&self) -> Option<VolumeData> {
878        // Don't try to return data if the volume is shutting down.
879        let _guard = self.scope.try_active_guard()?;
880
881        let object_count = self.store().object_count();
882        let (used_bytes, bytes_limit) =
883            self.store.filesystem().allocator().owner_allocation_info(self.store.store_object_id());
884        let encrypted = self.store().crypt().is_some();
885        let port_koid = fasync::EHandle::local().port().as_handle_ref().koid().unwrap().raw_koid();
886        Some(VolumeData { bytes_limit, used_bytes, used_nodes: object_count, encrypted, port_koid })
887    }
888}
889
890pub trait RootDir: FxNode + DirectoryEntry {
891    fn as_directory_entry(self: Arc<Self>) -> Arc<dyn DirectoryEntry>;
892
893    fn serve(self: Arc<Self>, flags: fio::Flags, server_end: ServerEnd<fio::DirectoryMarker>);
894
895    fn as_node(self: Arc<Self>) -> Arc<dyn FxNode>;
896
897    fn register_additional_volume_services(
898        self: Arc<Self>,
899        _svc_dir: &Simple,
900    ) -> Result<(), Error> {
901        Ok(())
902    }
903}
904
905#[derive(Clone)]
906pub struct FxVolumeAndRoot {
907    volume: Arc<FxVolume>,
908    root: Arc<dyn RootDir>,
909
910    // This is used for service connections and anything that isn't the actual volume.
911    admin_scope: ExecutionScope,
912
913    // The outgoing directory that the volume might be served on.
914    outgoing_dir: Arc<Simple>,
915}
916
917impl FxVolumeAndRoot {
918    pub async fn new<T: From<Directory<FxVolume>> + RootDir>(
919        parent: Weak<VolumesDirectory>,
920        store: Arc<ObjectStore>,
921        unique_id: u64,
922        volume_name: String,
923        blob_resupplied_count: Arc<PageRefaultCounter>,
924        memory_pressure_config: MemoryPressureConfig,
925    ) -> Result<Self, Error> {
926        let volume = Arc::new(FxVolume::new(
927            parent,
928            store,
929            unique_id,
930            volume_name,
931            blob_resupplied_count.clone(),
932            memory_pressure_config,
933        )?);
934        let root_object_id = volume.store().root_directory_object_id();
935        let root_dir = Directory::open(&volume, root_object_id).await?;
936        let root = Arc::<T>::new(root_dir.into()) as Arc<dyn RootDir>;
937        volume
938            .cache
939            .get_or_reserve(root_object_id)
940            .await
941            .placeholder()
942            .unwrap()
943            .commit(&root.clone().as_node());
944        Ok(Self {
945            volume,
946            root,
947            admin_scope: ExecutionScope::new(),
948            outgoing_dir: vfs::directory::immutable::simple(),
949        })
950    }
951
952    pub fn volume(&self) -> &Arc<FxVolume> {
953        &self.volume
954    }
955
956    pub fn root(&self) -> &Arc<dyn RootDir> {
957        &self.root
958    }
959
960    pub fn admin_scope(&self) -> &ExecutionScope {
961        &self.admin_scope
962    }
963
964    pub fn outgoing_dir(&self) -> &Arc<Simple> {
965        &self.outgoing_dir
966    }
967
968    // The same as root but downcasted to FxDirectory.
969    pub fn root_dir(&self) -> Arc<FxDirectory> {
970        self.root().clone().into_any().downcast::<FxDirectory>().expect("Invalid type for root")
971    }
972
973    pub fn into_volume(self) -> Arc<FxVolume> {
974        self.volume
975    }
976}
977
978// The correct number here is arguably u64::MAX - 1 (because node 0 is reserved). There's a bug
979// where inspect test cases fail if we try and use that, possibly because of a signed/unsigned bug.
980// See https://fxbug.dev/42168242.  Until that's fixed, we'll have to use i64::MAX.
981const TOTAL_NODES: u64 = i64::MAX as u64;
982
983// An array used to initialize the FilesystemInfo |name| field. This just spells "fxfs" 0-padded to
984// 32 bytes.
985const FXFS_INFO_NAME_FIDL: [i8; 32] = [
986    0x66, 0x78, 0x66, 0x73, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
987    0, 0, 0, 0,
988];
989
990fn info_to_filesystem_info(
991    info: filesystem::Info,
992    block_size: u64,
993    object_count: u64,
994    fs_id: u64,
995) -> fio::FilesystemInfo {
996    fio::FilesystemInfo {
997        total_bytes: info.total_bytes,
998        used_bytes: info.used_bytes,
999        total_nodes: TOTAL_NODES,
1000        used_nodes: object_count,
1001        // TODO(https://fxbug.dev/42175592): Support free_shared_pool_bytes.
1002        free_shared_pool_bytes: 0,
1003        fs_id,
1004        block_size: block_size as u32,
1005        max_filename_size: fio::MAX_NAME_LENGTH as u32,
1006        fs_type: fidl_fuchsia_fs::VfsType::Fxfs.into_primitive(),
1007        padding: 0,
1008        name: FXFS_INFO_NAME_FIDL,
1009    }
1010}
1011
1012#[cfg(test)]
1013mod tests {
1014    use super::DIRENT_CACHE_LIMIT;
1015    use crate::fuchsia::file::FxFile;
1016    use crate::fuchsia::fxblob::testing::{self as blob_testing, BlobFixture};
1017    use crate::fuchsia::memory_pressure::MemoryPressureLevel;
1018    use crate::fuchsia::pager::PagerBacked;
1019    use crate::fuchsia::profile::{RECORDED, new_profile_state};
1020    use crate::fuchsia::testing::{
1021        TestFixture, TestFixtureOptions, close_dir_checked, close_file_checked, open_dir,
1022        open_dir_checked, open_file, open_file_checked,
1023    };
1024    use crate::fuchsia::volume::{FxVolume, MemoryPressureConfig, MemoryPressureLevelConfig};
1025    use crate::fuchsia::volumes_directory::VolumesDirectory;
1026    use delivery_blob::CompressionMode;
1027    use fidl_fuchsia_fxfs::{BytesAndNodes, ProjectIdMarker};
1028    use fidl_fuchsia_io as fio;
1029    use fs_inspect::FsInspectVolume;
1030    use fuchsia_async::{self as fasync, TimeoutExt as _};
1031    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
1032    use fuchsia_fs::file;
1033    use fxfs::filesystem::{FxFilesystem, FxFilesystemBuilder};
1034    use fxfs::fsck::{fsck, fsck_volume};
1035    use fxfs::object_store::directory::replace_child;
1036    use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
1037    use fxfs::object_store::volume::root_volume;
1038    use fxfs::object_store::{HandleOptions, ObjectDescriptor, ObjectStore, StoreOptions};
1039    use fxfs_crypt_common::CryptBase;
1040    use fxfs_crypto::{Crypt, WrappingKeyId};
1041    use fxfs_insecure_crypto::new_insecure_crypt;
1042    use refaults_vmo::PageRefaultCounter;
1043    use std::sync::atomic::Ordering;
1044    use std::sync::{Arc, Weak};
1045    use std::time::Duration;
1046    use storage_device::DeviceHolder;
1047    use storage_device::fake_device::FakeDevice;
1048    use zx::Status;
1049
1050    const WRAPPING_KEY_ID: WrappingKeyId = u128::to_le_bytes(123);
1051
1052    #[fuchsia::test(threads = 10)]
1053    async fn test_rename_different_dirs() {
1054        use zx::Event;
1055
1056        let fixture = TestFixture::new().await;
1057        let root = fixture.root();
1058
1059        let src = open_dir_checked(
1060            &root,
1061            "foo",
1062            fio::Flags::FLAG_MAYBE_CREATE
1063                | fio::PERM_READABLE
1064                | fio::PERM_WRITABLE
1065                | fio::Flags::PROTOCOL_DIRECTORY,
1066            Default::default(),
1067        )
1068        .await;
1069
1070        let dst = open_dir_checked(
1071            &root,
1072            "bar",
1073            fio::Flags::FLAG_MAYBE_CREATE
1074                | fio::PERM_READABLE
1075                | fio::PERM_WRITABLE
1076                | fio::Flags::PROTOCOL_DIRECTORY,
1077            Default::default(),
1078        )
1079        .await;
1080
1081        let f = open_file_checked(
1082            &root,
1083            "foo/a",
1084            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1085            &Default::default(),
1086        )
1087        .await;
1088        close_file_checked(f).await;
1089
1090        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1091        Status::ok(status).expect("get_token failed");
1092        src.rename("a", Event::from(dst_token.unwrap()), "b")
1093            .await
1094            .expect("FIDL call failed")
1095            .expect("rename failed");
1096
1097        assert_eq!(
1098            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1099                .await
1100                .expect_err("Open succeeded")
1101                .root_cause()
1102                .downcast_ref::<Status>()
1103                .expect("No status"),
1104            &Status::NOT_FOUND,
1105        );
1106        let f =
1107            open_file_checked(&root, "bar/b", fio::Flags::PROTOCOL_FILE, &Default::default()).await;
1108        close_file_checked(f).await;
1109
1110        close_dir_checked(dst).await;
1111        close_dir_checked(src).await;
1112        fixture.close().await;
1113    }
1114
1115    #[fuchsia::test(threads = 10)]
1116    async fn test_rename_same_dir() {
1117        use zx::Event;
1118        let fixture = TestFixture::new().await;
1119        let root = fixture.root();
1120
1121        let src = open_dir_checked(
1122            &root,
1123            "foo",
1124            fio::Flags::FLAG_MAYBE_CREATE
1125                | fio::PERM_READABLE
1126                | fio::PERM_WRITABLE
1127                | fio::Flags::PROTOCOL_DIRECTORY,
1128            Default::default(),
1129        )
1130        .await;
1131
1132        let f = open_file_checked(
1133            &root,
1134            "foo/a",
1135            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1136            &Default::default(),
1137        )
1138        .await;
1139        close_file_checked(f).await;
1140
1141        let (status, src_token) = src.get_token().await.expect("FIDL call failed");
1142        Status::ok(status).expect("get_token failed");
1143        src.rename("a", Event::from(src_token.unwrap()), "b")
1144            .await
1145            .expect("FIDL call failed")
1146            .expect("rename failed");
1147
1148        assert_eq!(
1149            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1150                .await
1151                .expect_err("Open succeeded")
1152                .root_cause()
1153                .downcast_ref::<Status>()
1154                .expect("No status"),
1155            &Status::NOT_FOUND,
1156        );
1157        let f =
1158            open_file_checked(&root, "foo/b", fio::Flags::PROTOCOL_FILE, &Default::default()).await;
1159        close_file_checked(f).await;
1160
1161        close_dir_checked(src).await;
1162        fixture.close().await;
1163    }
1164
1165    #[fuchsia::test(threads = 10)]
1166    async fn test_rename_overwrites_file() {
1167        use zx::Event;
1168        let fixture = TestFixture::new().await;
1169        let root = fixture.root();
1170
1171        let src = open_dir_checked(
1172            &root,
1173            "foo",
1174            fio::Flags::FLAG_MAYBE_CREATE
1175                | fio::PERM_READABLE
1176                | fio::PERM_WRITABLE
1177                | fio::Flags::PROTOCOL_DIRECTORY,
1178            Default::default(),
1179        )
1180        .await;
1181
1182        let dst = open_dir_checked(
1183            &root,
1184            "bar",
1185            fio::Flags::FLAG_MAYBE_CREATE
1186                | fio::PERM_READABLE
1187                | fio::PERM_WRITABLE
1188                | fio::Flags::PROTOCOL_DIRECTORY,
1189            Default::default(),
1190        )
1191        .await;
1192
1193        // The src file is non-empty.
1194        let src_file = open_file_checked(
1195            &root,
1196            "foo/a",
1197            fio::Flags::FLAG_MAYBE_CREATE
1198                | fio::PERM_READABLE
1199                | fio::PERM_WRITABLE
1200                | fio::Flags::PROTOCOL_FILE,
1201            &Default::default(),
1202        )
1203        .await;
1204        let buf = vec![0xaa as u8; 8192];
1205        file::write(&src_file, buf.as_slice()).await.expect("Failed to write to file");
1206        close_file_checked(src_file).await;
1207
1208        // The dst file is empty (so we can distinguish it).
1209        let f = open_file_checked(
1210            &root,
1211            "bar/b",
1212            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1213            &Default::default(),
1214        )
1215        .await;
1216        close_file_checked(f).await;
1217
1218        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1219        Status::ok(status).expect("get_token failed");
1220        src.rename("a", Event::from(dst_token.unwrap()), "b")
1221            .await
1222            .expect("FIDL call failed")
1223            .expect("rename failed");
1224
1225        assert_eq!(
1226            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1227                .await
1228                .expect_err("Open succeeded")
1229                .root_cause()
1230                .downcast_ref::<Status>()
1231                .expect("No status"),
1232            &Status::NOT_FOUND,
1233        );
1234        let file = open_file_checked(
1235            &root,
1236            "bar/b",
1237            fio::PERM_READABLE | fio::Flags::PROTOCOL_FILE,
1238            &Default::default(),
1239        )
1240        .await;
1241        let buf = file::read(&file).await.expect("read file failed");
1242        assert_eq!(buf, vec![0xaa as u8; 8192]);
1243        close_file_checked(file).await;
1244
1245        close_dir_checked(dst).await;
1246        close_dir_checked(src).await;
1247        fixture.close().await;
1248    }
1249
1250    #[fuchsia::test(threads = 10)]
1251    async fn test_rename_overwrites_dir() {
1252        use zx::Event;
1253        let fixture = TestFixture::new().await;
1254        let root = fixture.root();
1255
1256        let src = open_dir_checked(
1257            &root,
1258            "foo",
1259            fio::Flags::FLAG_MAYBE_CREATE
1260                | fio::PERM_READABLE
1261                | fio::PERM_WRITABLE
1262                | fio::Flags::PROTOCOL_DIRECTORY,
1263            Default::default(),
1264        )
1265        .await;
1266
1267        let dst = open_dir_checked(
1268            &root,
1269            "bar",
1270            fio::Flags::FLAG_MAYBE_CREATE
1271                | fio::PERM_READABLE
1272                | fio::PERM_WRITABLE
1273                | fio::Flags::PROTOCOL_DIRECTORY,
1274            Default::default(),
1275        )
1276        .await;
1277
1278        // The src dir is non-empty.
1279        open_dir_checked(
1280            &root,
1281            "foo/a",
1282            fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
1283            Default::default(),
1284        )
1285        .await;
1286        open_file_checked(
1287            &root,
1288            "foo/a/file",
1289            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1290            &Default::default(),
1291        )
1292        .await;
1293        open_dir_checked(
1294            &root,
1295            "bar/b",
1296            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_DIRECTORY,
1297            Default::default(),
1298        )
1299        .await;
1300
1301        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1302        Status::ok(status).expect("get_token failed");
1303        src.rename("a", Event::from(dst_token.unwrap()), "b")
1304            .await
1305            .expect("FIDL call failed")
1306            .expect("rename failed");
1307
1308        assert_eq!(
1309            open_dir(&root, "foo/a", fio::Flags::PROTOCOL_DIRECTORY, &Default::default())
1310                .await
1311                .expect_err("Open succeeded")
1312                .root_cause()
1313                .downcast_ref::<Status>()
1314                .expect("No status"),
1315            &Status::NOT_FOUND,
1316        );
1317        let f =
1318            open_file_checked(&root, "bar/b/file", fio::Flags::PROTOCOL_FILE, &Default::default())
1319                .await;
1320        close_file_checked(f).await;
1321
1322        close_dir_checked(dst).await;
1323        close_dir_checked(src).await;
1324
1325        fixture.close().await;
1326    }
1327
1328    #[fuchsia::test]
1329    async fn test_background_flush() {
1330        let fixture = TestFixture::new().await;
1331        {
1332            let file = open_file_checked(
1333                fixture.root(),
1334                "file",
1335                fio::Flags::FLAG_MAYBE_CREATE
1336                    | fio::PERM_READABLE
1337                    | fio::PERM_WRITABLE
1338                    | fio::Flags::PROTOCOL_FILE,
1339                &Default::default(),
1340            )
1341            .await;
1342            let object_id = file
1343                .get_attributes(fio::NodeAttributesQuery::ID)
1344                .await
1345                .expect("Fidl get attr")
1346                .expect("get attr")
1347                .1
1348                .id
1349                .unwrap();
1350
1351            // Write some data to the file, which will only go to the cache for now.
1352            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1353
1354            // Initialized to the default size.
1355            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1356            let volume = fixture.volume().volume().clone();
1357
1358            let data_has_persisted = || async {
1359                // We have to reopen the object each time since this is a distinct handle from the
1360                // one managed by the FxFile.
1361                let object =
1362                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1363                        .await
1364                        .expect("open_object failed");
1365                let data = object.contents(8192).await.expect("read failed");
1366                data.len() == 1 && data[..] == [123u8]
1367            };
1368            assert!(!data_has_persisted().await);
1369
1370            fixture.volume().volume().start_background_task(
1371                MemoryPressureConfig {
1372                    mem_normal: MemoryPressureLevelConfig {
1373                        background_task_period: Duration::from_millis(100),
1374                        background_task_initial_delay: Duration::from_millis(100),
1375                        ..Default::default()
1376                    },
1377                    mem_warning: Default::default(),
1378                    mem_critical: Default::default(),
1379                },
1380                None,
1381            );
1382
1383            const MAX_WAIT: Duration = Duration::from_secs(20);
1384            let wait_increments = Duration::from_millis(400);
1385            let mut total_waited = Duration::ZERO;
1386
1387            while total_waited < MAX_WAIT {
1388                fasync::Timer::new(wait_increments).await;
1389                total_waited += wait_increments;
1390
1391                if data_has_persisted().await {
1392                    break;
1393                }
1394            }
1395
1396            assert!(data_has_persisted().await);
1397        }
1398
1399        fixture.close().await;
1400    }
1401
1402    #[fuchsia::test(threads = 2)]
1403    async fn test_background_flush_with_warning_memory_pressure() {
1404        let fixture = TestFixture::new().await;
1405        {
1406            let file = open_file_checked(
1407                fixture.root(),
1408                "file",
1409                fio::Flags::FLAG_MAYBE_CREATE
1410                    | fio::PERM_READABLE
1411                    | fio::PERM_WRITABLE
1412                    | fio::Flags::PROTOCOL_FILE,
1413                &Default::default(),
1414            )
1415            .await;
1416            let object_id = file
1417                .get_attributes(fio::NodeAttributesQuery::ID)
1418                .await
1419                .expect("Fidl get attr")
1420                .expect("get attr")
1421                .1
1422                .id
1423                .unwrap();
1424
1425            // Write some data to the file, which will only go to the cache for now.
1426            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1427
1428            // Initialized to the default size.
1429            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1430            let volume = fixture.volume().volume().clone();
1431
1432            let data_has_persisted = || async {
1433                // We have to reopen the object each time since this is a distinct handle from the
1434                // one managed by the FxFile.
1435                let object =
1436                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1437                        .await
1438                        .expect("open_object failed");
1439                let data = object.contents(8192).await.expect("read failed");
1440                data.len() == 1 && data[..] == [123u8]
1441            };
1442            assert!(!data_has_persisted().await);
1443
1444            // Configure the flush task to only flush quickly on warning.
1445            let flush_config = MemoryPressureConfig {
1446                mem_normal: MemoryPressureLevelConfig {
1447                    background_task_period: Duration::from_secs(20),
1448                    cache_size_limit: DIRENT_CACHE_LIMIT,
1449                    ..Default::default()
1450                },
1451                mem_warning: MemoryPressureLevelConfig {
1452                    background_task_period: Duration::from_millis(100),
1453                    cache_size_limit: 100,
1454                    background_task_initial_delay: Duration::from_millis(100),
1455                    ..Default::default()
1456                },
1457                mem_critical: MemoryPressureLevelConfig {
1458                    background_task_period: Duration::from_secs(20),
1459                    cache_size_limit: 50,
1460                    ..Default::default()
1461                },
1462            };
1463            fixture.volume().volume().start_background_task(
1464                flush_config,
1465                fixture.volumes_directory().memory_pressure_monitor(),
1466            );
1467
1468            // Send the memory pressure update.
1469            fixture
1470                .memory_pressure_proxy()
1471                .on_level_changed(MemoryPressureLevel::Warning)
1472                .await
1473                .expect("Failed to send memory pressure level change");
1474
1475            // Wait a bit of time for the flush to occur (but less than the normal and critical
1476            // periods).
1477            const MAX_WAIT: Duration = Duration::from_secs(3);
1478            let wait_increments = Duration::from_millis(400);
1479            let mut total_waited = Duration::ZERO;
1480
1481            while total_waited < MAX_WAIT {
1482                fasync::Timer::new(wait_increments).await;
1483                total_waited += wait_increments;
1484
1485                if data_has_persisted().await {
1486                    break;
1487                }
1488            }
1489
1490            assert!(data_has_persisted().await);
1491            assert_eq!(fixture.volume().volume().dirent_cache().limit(), 100);
1492        }
1493
1494        fixture.close().await;
1495    }
1496
1497    #[fuchsia::test(threads = 2)]
1498    async fn test_background_flush_with_critical_memory_pressure() {
1499        let fixture = TestFixture::new().await;
1500        {
1501            let file = open_file_checked(
1502                fixture.root(),
1503                "file",
1504                fio::Flags::FLAG_MAYBE_CREATE
1505                    | fio::PERM_READABLE
1506                    | fio::PERM_WRITABLE
1507                    | fio::Flags::PROTOCOL_FILE,
1508                &Default::default(),
1509            )
1510            .await;
1511            let object_id = file
1512                .get_attributes(fio::NodeAttributesQuery::ID)
1513                .await
1514                .expect("Fidl get attr")
1515                .expect("get attr")
1516                .1
1517                .id
1518                .unwrap();
1519
1520            // Write some data to the file, which will only go to the cache for now.
1521            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1522
1523            // Initialized to the default size.
1524            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1525            let volume = fixture.volume().volume().clone();
1526
1527            let data_has_persisted = || async {
1528                // We have to reopen the object each time since this is a distinct handle from the
1529                // one managed by the FxFile.
1530                let object =
1531                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1532                        .await
1533                        .expect("open_object failed");
1534                let data = object.contents(8192).await.expect("read failed");
1535                data.len() == 1 && data[..] == [123u8]
1536            };
1537            assert!(!data_has_persisted().await);
1538
1539            let flush_config = MemoryPressureConfig {
1540                mem_normal: MemoryPressureLevelConfig {
1541                    cache_size_limit: DIRENT_CACHE_LIMIT,
1542                    ..Default::default()
1543                },
1544                mem_warning: MemoryPressureLevelConfig {
1545                    cache_size_limit: 100,
1546                    ..Default::default()
1547                },
1548                mem_critical: MemoryPressureLevelConfig {
1549                    cache_size_limit: 50,
1550                    ..Default::default()
1551                },
1552            };
1553            fixture.volume().volume().start_background_task(
1554                flush_config,
1555                fixture.volumes_directory().memory_pressure_monitor(),
1556            );
1557
1558            // Send the memory pressure update.
1559            fixture
1560                .memory_pressure_proxy()
1561                .on_level_changed(MemoryPressureLevel::Critical)
1562                .await
1563                .expect("Failed to send memory pressure level change");
1564
1565            // Critical memory should trigger a flush immediately so expect a flush very quickly.
1566            const MAX_WAIT: Duration = Duration::from_secs(2);
1567            let wait_increments = Duration::from_millis(400);
1568            let mut total_waited = Duration::ZERO;
1569
1570            while total_waited < MAX_WAIT {
1571                fasync::Timer::new(wait_increments).await;
1572                total_waited += wait_increments;
1573
1574                if data_has_persisted().await {
1575                    break;
1576                }
1577            }
1578
1579            assert!(data_has_persisted().await);
1580            assert_eq!(fixture.volume().volume().dirent_cache().limit(), 50);
1581        }
1582
1583        fixture.close().await;
1584    }
1585
1586    // This test verifies that it is safe to query a volume after it is unmounted in case of a race
1587    // during unmount/shutdown.
1588    #[fuchsia::test(threads = 2)]
1589    async fn test_query_info_unmounted_volume() {
1590        const TEST_VOLUME: &str = "test_1234";
1591        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1592
1593        let fixture = TestFixture::new().await;
1594        {
1595            let volumes_directory = fixture.volumes_directory();
1596            let volume = volumes_directory
1597                .create_and_mount_volume(TEST_VOLUME, Some(crypt.clone()), false, None)
1598                .await
1599                .unwrap();
1600
1601            assert!(volume.volume().get_volume_data().await.is_some());
1602
1603            // Unmount it but keep a reference.
1604            volumes_directory
1605                .lock()
1606                .await
1607                .unmount(volume.volume().store().store_object_id())
1608                .await
1609                .expect("unmount failed");
1610
1611            // This returns None, but doesn't crash.
1612            assert!(volume.volume().get_volume_data().await.is_none());
1613        }
1614        fixture.close().await;
1615    }
1616
1617    #[fuchsia::test]
1618    async fn test_project_limit_persistence() {
1619        const BYTES_LIMIT_1: u64 = 123456;
1620        const NODES_LIMIT_1: u64 = 4321;
1621        const BYTES_LIMIT_2: u64 = 456789;
1622        const NODES_LIMIT_2: u64 = 9876;
1623        const VOLUME_NAME: &str = "A";
1624        const FILE_NAME: &str = "B";
1625        const PROJECT_ID: u64 = 42;
1626        const PROJECT_ID2: u64 = 343;
1627        let volume_store_id;
1628        let node_id;
1629        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
1630        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1631        {
1632            let blob_resupplied_count =
1633                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1634            let volumes_directory = VolumesDirectory::new(
1635                root_volume(filesystem.clone()).await.unwrap(),
1636                Weak::new(),
1637                None,
1638                blob_resupplied_count,
1639                MemoryPressureConfig::default(),
1640            )
1641            .await
1642            .unwrap();
1643
1644            let volume_and_root = volumes_directory
1645                .create_and_mount_volume(VOLUME_NAME, None, false, None)
1646                .await
1647                .expect("create unencrypted volume failed");
1648            volume_store_id = volume_and_root.volume().store().store_object_id();
1649
1650            let (volume_dir_proxy, dir_server_end) =
1651                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1652            volumes_directory
1653                .serve_volume(&volume_and_root, dir_server_end, false)
1654                .expect("serve_volume failed");
1655
1656            let project_proxy =
1657                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1658                    .expect("Unable to connect to project id service");
1659
1660            project_proxy
1661                .set_limit(0, BYTES_LIMIT_1, NODES_LIMIT_1)
1662                .await
1663                .unwrap()
1664                .expect_err("Should not set limits for project id 0");
1665
1666            assert_eq!(
1667                project_proxy.clear(0).await.unwrap().expect_err("Should not clear project id 0"),
1668                Status::OUT_OF_RANGE.into_raw()
1669            );
1670
1671            assert_eq!(
1672                project_proxy
1673                    .info(0)
1674                    .await
1675                    .unwrap()
1676                    .expect_err("Should not get info for project id 0"),
1677                Status::OUT_OF_RANGE.into_raw()
1678            );
1679
1680            project_proxy
1681                .set_limit(PROJECT_ID, BYTES_LIMIT_1, NODES_LIMIT_1)
1682                .await
1683                .unwrap()
1684                .expect("To set limits");
1685            {
1686                let BytesAndNodes { bytes, nodes } =
1687                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1688                assert_eq!(bytes, BYTES_LIMIT_1);
1689                assert_eq!(nodes, NODES_LIMIT_1);
1690            }
1691
1692            let file_proxy = {
1693                let (root_proxy, root_server_end) =
1694                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1695                volume_dir_proxy
1696                    .open(
1697                        "root",
1698                        fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
1699                        &Default::default(),
1700                        root_server_end.into_channel(),
1701                    )
1702                    .expect("Failed to open volume root");
1703
1704                open_file_checked(
1705                    &root_proxy,
1706                    FILE_NAME,
1707                    fio::Flags::FLAG_MAYBE_CREATE
1708                        | fio::PERM_READABLE
1709                        | fio::PERM_WRITABLE
1710                        | fio::Flags::PROTOCOL_FILE,
1711                    &Default::default(),
1712                )
1713                .await
1714            };
1715
1716            let (_, immutable_attributes) =
1717                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
1718            node_id = immutable_attributes.id.unwrap();
1719
1720            project_proxy
1721                .set_for_node(node_id, 0)
1722                .await
1723                .unwrap()
1724                .expect_err("Should not set 0 project id");
1725
1726            project_proxy
1727                .set_for_node(node_id, PROJECT_ID)
1728                .await
1729                .unwrap()
1730                .expect("Setting project on node");
1731
1732            project_proxy
1733                .set_limit(PROJECT_ID, BYTES_LIMIT_2, NODES_LIMIT_2)
1734                .await
1735                .unwrap()
1736                .expect("To set limits");
1737            {
1738                let BytesAndNodes { bytes, nodes } =
1739                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1740                assert_eq!(bytes, BYTES_LIMIT_2);
1741                assert_eq!(nodes, NODES_LIMIT_2);
1742            }
1743
1744            assert_eq!(
1745                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1746                PROJECT_ID
1747            );
1748
1749            volumes_directory.terminate().await;
1750            filesystem.close().await.expect("close filesystem failed");
1751        }
1752        device = filesystem.take_device().await;
1753        device.ensure_unique();
1754        device.reopen(false);
1755        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1756        {
1757            fsck(filesystem.clone()).await.expect("Fsck");
1758            fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
1759            let blob_resupplied_count =
1760                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1761            let volumes_directory = VolumesDirectory::new(
1762                root_volume(filesystem.clone()).await.unwrap(),
1763                Weak::new(),
1764                None,
1765                blob_resupplied_count,
1766                MemoryPressureConfig::default(),
1767            )
1768            .await
1769            .unwrap();
1770            let volume_and_root = volumes_directory
1771                .mount_volume(VOLUME_NAME, None, false)
1772                .await
1773                .expect("mount unencrypted volume failed");
1774
1775            let (volume_proxy, _scope) = crate::volumes_directory::serve_startup_volume_proxy(
1776                &volumes_directory,
1777                VOLUME_NAME,
1778            );
1779
1780            let project_proxy = {
1781                let (volume_dir_proxy, dir_server_end) =
1782                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1783                volumes_directory
1784                    .serve_volume(&volume_and_root, dir_server_end, false)
1785                    .expect("serve_volume failed");
1786
1787                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1788                    .expect("Unable to connect to project id service")
1789            };
1790
1791            let usage_bytes_and_nodes = {
1792                let (
1793                    BytesAndNodes { bytes: limit_bytes, nodes: limit_nodes },
1794                    usage_bytes_and_nodes,
1795                ) = project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info");
1796                assert_eq!(limit_bytes, BYTES_LIMIT_2);
1797                assert_eq!(limit_nodes, NODES_LIMIT_2);
1798                usage_bytes_and_nodes
1799            };
1800
1801            // Should be unable to clear the project limit, due to being in use.
1802            project_proxy.clear(PROJECT_ID).await.unwrap().expect("To clear limits");
1803
1804            assert_eq!(
1805                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1806                PROJECT_ID
1807            );
1808            project_proxy
1809                .set_for_node(node_id, PROJECT_ID2)
1810                .await
1811                .unwrap()
1812                .expect("Changing project");
1813            assert_eq!(
1814                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1815                PROJECT_ID2
1816            );
1817
1818            assert_eq!(
1819                project_proxy.info(PROJECT_ID).await.unwrap().expect_err("Expect missing limits"),
1820                Status::NOT_FOUND.into_raw()
1821            );
1822            assert_eq!(
1823                project_proxy.info(PROJECT_ID2).await.unwrap().expect("Fetching project info").1,
1824                usage_bytes_and_nodes
1825            );
1826
1827            std::mem::drop(volume_proxy);
1828            volumes_directory.terminate().await;
1829            std::mem::drop(volumes_directory);
1830            filesystem.close().await.expect("close filesystem failed");
1831        }
1832        device = filesystem.take_device().await;
1833        device.ensure_unique();
1834        device.reopen(false);
1835        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1836        fsck(filesystem.clone()).await.expect("Fsck");
1837        fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
1838        let blob_resupplied_count =
1839            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1840        let volumes_directory = VolumesDirectory::new(
1841            root_volume(filesystem.clone()).await.unwrap(),
1842            Weak::new(),
1843            None,
1844            blob_resupplied_count,
1845            MemoryPressureConfig::default(),
1846        )
1847        .await
1848        .unwrap();
1849        let volume_and_root = volumes_directory
1850            .mount_volume(VOLUME_NAME, None, false)
1851            .await
1852            .expect("mount unencrypted volume failed");
1853        let (volume_dir_proxy, dir_server_end) =
1854            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1855        volumes_directory
1856            .serve_volume(&volume_and_root, dir_server_end, false)
1857            .expect("serve_volume failed");
1858        let project_proxy = connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1859            .expect("Unable to connect to project id service");
1860        assert_eq!(
1861            project_proxy.info(PROJECT_ID).await.unwrap().expect_err("Expect missing limits"),
1862            Status::NOT_FOUND.into_raw()
1863        );
1864        volumes_directory.terminate().await;
1865        std::mem::drop(volumes_directory);
1866        filesystem.close().await.expect("close filesystem failed");
1867    }
1868
1869    #[fuchsia::test]
1870    async fn test_project_limit_accounting() {
1871        const BYTES_LIMIT: u64 = 123456;
1872        const NODES_LIMIT: u64 = 4321;
1873        const VOLUME_NAME: &str = "A";
1874        const FILE_NAME: &str = "B";
1875        const PROJECT_ID: u64 = 42;
1876        let volume_store_id;
1877        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
1878        let first_object_id;
1879        let mut bytes_usage;
1880        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1881        {
1882            let blob_resupplied_count =
1883                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1884            let volumes_directory = VolumesDirectory::new(
1885                root_volume(filesystem.clone()).await.unwrap(),
1886                Weak::new(),
1887                None,
1888                blob_resupplied_count,
1889                MemoryPressureConfig::default(),
1890            )
1891            .await
1892            .unwrap();
1893
1894            let volume_and_root = volumes_directory
1895                .create_and_mount_volume(
1896                    VOLUME_NAME,
1897                    Some(Arc::new(new_insecure_crypt())),
1898                    false,
1899                    None,
1900                )
1901                .await
1902                .expect("create unencrypted volume failed");
1903            volume_store_id = volume_and_root.volume().store().store_object_id();
1904
1905            let (volume_dir_proxy, dir_server_end) =
1906                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1907            volumes_directory
1908                .serve_volume(&volume_and_root, dir_server_end, false)
1909                .expect("serve_volume failed");
1910
1911            let project_proxy =
1912                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1913                    .expect("Unable to connect to project id service");
1914
1915            project_proxy
1916                .set_limit(PROJECT_ID, BYTES_LIMIT, NODES_LIMIT)
1917                .await
1918                .unwrap()
1919                .expect("To set limits");
1920            {
1921                let BytesAndNodes { bytes, nodes } =
1922                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1923                assert_eq!(bytes, BYTES_LIMIT);
1924                assert_eq!(nodes, NODES_LIMIT);
1925            }
1926
1927            let file_proxy = {
1928                let (root_proxy, root_server_end) =
1929                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1930                volume_dir_proxy
1931                    .open(
1932                        "root",
1933                        fio::PERM_READABLE | fio::PERM_WRITABLE,
1934                        &Default::default(),
1935                        root_server_end.into_channel(),
1936                    )
1937                    .expect("Failed to open volume root");
1938
1939                open_file_checked(
1940                    &root_proxy,
1941                    FILE_NAME,
1942                    fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
1943                    &Default::default(),
1944                )
1945                .await
1946            };
1947
1948            assert_eq!(
1949                8192,
1950                file_proxy
1951                    .write(&vec![0xff as u8; 8192])
1952                    .await
1953                    .expect("FIDL call failed")
1954                    .map_err(Status::from_raw)
1955                    .expect("File write was successful")
1956            );
1957            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
1958
1959            {
1960                let BytesAndNodes { bytes, nodes } =
1961                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
1962                assert_eq!(bytes, 0);
1963                assert_eq!(nodes, 0);
1964            }
1965
1966            let (_, immutable_attributes) =
1967                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
1968            let node_id = immutable_attributes.id.unwrap();
1969
1970            first_object_id = node_id;
1971            project_proxy
1972                .set_for_node(node_id, PROJECT_ID)
1973                .await
1974                .unwrap()
1975                .expect("Setting project on node");
1976
1977            bytes_usage = {
1978                let BytesAndNodes { bytes, nodes } =
1979                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
1980                assert!(bytes > 0);
1981                assert_eq!(nodes, 1);
1982                bytes
1983            };
1984
1985            // Grow the file by a block.
1986            assert_eq!(
1987                8192,
1988                file_proxy
1989                    .write(&vec![0xff as u8; 8192])
1990                    .await
1991                    .expect("FIDL call failed")
1992                    .map_err(Status::from_raw)
1993                    .expect("File write was successful")
1994            );
1995            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
1996            bytes_usage = {
1997                let BytesAndNodes { bytes, nodes } =
1998                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
1999                assert!(bytes > bytes_usage);
2000                assert_eq!(nodes, 1);
2001                bytes
2002            };
2003
2004            volumes_directory.terminate().await;
2005            filesystem.close().await.expect("close filesystem failed");
2006        }
2007        device = filesystem.take_device().await;
2008        device.ensure_unique();
2009        device.reopen(false);
2010        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2011        {
2012            fsck(filesystem.clone()).await.expect("Fsck");
2013            fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
2014                .await
2015                .expect("Fsck volume");
2016            let blob_resupplied_count =
2017                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2018            let volumes_directory = VolumesDirectory::new(
2019                root_volume(filesystem.clone()).await.unwrap(),
2020                Weak::new(),
2021                None,
2022                blob_resupplied_count,
2023                MemoryPressureConfig::default(),
2024            )
2025            .await
2026            .unwrap();
2027            let volume_and_root = volumes_directory
2028                .mount_volume(VOLUME_NAME, Some(Arc::new(new_insecure_crypt())), false)
2029                .await
2030                .expect("mount unencrypted volume failed");
2031
2032            let (root_proxy, project_proxy) = {
2033                let (volume_dir_proxy, dir_server_end) =
2034                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2035                volumes_directory
2036                    .serve_volume(&volume_and_root, dir_server_end, false)
2037                    .expect("serve_volume failed");
2038
2039                let (root_proxy, root_server_end) =
2040                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2041                volume_dir_proxy
2042                    .open(
2043                        "root",
2044                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2045                        &Default::default(),
2046                        root_server_end.into_channel(),
2047                    )
2048                    .expect("Failed to open volume root");
2049                let project_proxy = {
2050                    connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2051                        .expect("Unable to connect to project id service")
2052                };
2053                (root_proxy, project_proxy)
2054            };
2055
2056            {
2057                let BytesAndNodes { bytes, nodes } =
2058                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2059                assert_eq!(bytes, bytes_usage);
2060                assert_eq!(nodes, 1);
2061            }
2062
2063            assert_eq!(
2064                project_proxy
2065                    .get_for_node(first_object_id)
2066                    .await
2067                    .unwrap()
2068                    .expect("Checking project"),
2069                PROJECT_ID
2070            );
2071            root_proxy
2072                .unlink(FILE_NAME, &fio::UnlinkOptions::default())
2073                .await
2074                .expect("FIDL call failed")
2075                .expect("unlink failed");
2076            filesystem.graveyard().flush().await;
2077
2078            {
2079                let BytesAndNodes { bytes, nodes } =
2080                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2081                assert_eq!(bytes, 0);
2082                assert_eq!(nodes, 0);
2083            }
2084
2085            let file_proxy = open_file_checked(
2086                &root_proxy,
2087                FILE_NAME,
2088                fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2089                &Default::default(),
2090            )
2091            .await;
2092
2093            let (_, immutable_attributes) =
2094                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2095            let node_id = immutable_attributes.id.unwrap();
2096
2097            project_proxy
2098                .set_for_node(node_id, PROJECT_ID)
2099                .await
2100                .unwrap()
2101                .expect("Applying project");
2102
2103            bytes_usage = {
2104                let BytesAndNodes { bytes, nodes } =
2105                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2106                // Empty file should have less space than the non-empty file from above.
2107                assert!(bytes < bytes_usage);
2108                assert_eq!(nodes, 1);
2109                bytes
2110            };
2111
2112            assert_eq!(
2113                8192,
2114                file_proxy
2115                    .write(&vec![0xff as u8; 8192])
2116                    .await
2117                    .expect("FIDL call failed")
2118                    .map_err(Status::from_raw)
2119                    .expect("File write was successful")
2120            );
2121            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
2122            bytes_usage = {
2123                let BytesAndNodes { bytes, nodes } =
2124                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2125                assert!(bytes > bytes_usage);
2126                assert_eq!(nodes, 1);
2127                bytes
2128            };
2129
2130            // Trim to zero. Bytes should decrease.
2131            file_proxy.resize(0).await.expect("FIDL call failed").expect("Resize file");
2132            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
2133            {
2134                let BytesAndNodes { bytes, nodes } =
2135                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2136                assert!(bytes < bytes_usage);
2137                assert_eq!(nodes, 1);
2138            };
2139
2140            // Dropping node from project. Usage should go to zero.
2141            project_proxy
2142                .clear_for_node(node_id)
2143                .await
2144                .expect("FIDL call failed")
2145                .expect("Clear failed.");
2146            {
2147                let BytesAndNodes { bytes, nodes } =
2148                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2149                assert_eq!(bytes, 0);
2150                assert_eq!(nodes, 0);
2151            };
2152
2153            volumes_directory.terminate().await;
2154            filesystem.close().await.expect("close filesystem failed");
2155        }
2156        device = filesystem.take_device().await;
2157        device.ensure_unique();
2158        device.reopen(false);
2159        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2160        fsck(filesystem.clone()).await.expect("Fsck");
2161        fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
2162            .await
2163            .expect("Fsck volume");
2164        filesystem.close().await.expect("close filesystem failed");
2165    }
2166
2167    #[fuchsia::test]
2168    async fn test_project_node_inheritance() {
2169        const BYTES_LIMIT: u64 = 123456;
2170        const NODES_LIMIT: u64 = 4321;
2171        const VOLUME_NAME: &str = "A";
2172        const DIR_NAME: &str = "B";
2173        const SUBDIR_NAME: &str = "C";
2174        const FILE_NAME: &str = "D";
2175        const PROJECT_ID: u64 = 42;
2176        let volume_store_id;
2177        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
2178        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2179        {
2180            let blob_resupplied_count =
2181                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2182            let volumes_directory = VolumesDirectory::new(
2183                root_volume(filesystem.clone()).await.unwrap(),
2184                Weak::new(),
2185                None,
2186                blob_resupplied_count,
2187                MemoryPressureConfig::default(),
2188            )
2189            .await
2190            .unwrap();
2191
2192            let volume_and_root = volumes_directory
2193                .create_and_mount_volume(
2194                    VOLUME_NAME,
2195                    Some(Arc::new(new_insecure_crypt())),
2196                    false,
2197                    None,
2198                )
2199                .await
2200                .expect("create unencrypted volume failed");
2201            volume_store_id = volume_and_root.volume().store().store_object_id();
2202
2203            let (volume_dir_proxy, dir_server_end) =
2204                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2205            volumes_directory
2206                .serve_volume(&volume_and_root, dir_server_end, false)
2207                .expect("serve_volume failed");
2208
2209            let project_proxy =
2210                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2211                    .expect("Unable to connect to project id service");
2212
2213            project_proxy
2214                .set_limit(PROJECT_ID, BYTES_LIMIT, NODES_LIMIT)
2215                .await
2216                .unwrap()
2217                .expect("To set limits");
2218
2219            let dir_proxy = {
2220                let (root_proxy, root_server_end) =
2221                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2222                volume_dir_proxy
2223                    .open(
2224                        "root",
2225                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2226                        &Default::default(),
2227                        root_server_end.into_channel(),
2228                    )
2229                    .expect("Failed to open volume root");
2230
2231                open_dir_checked(
2232                    &root_proxy,
2233                    DIR_NAME,
2234                    fio::Flags::FLAG_MAYBE_CREATE
2235                        | fio::PERM_READABLE
2236                        | fio::PERM_WRITABLE
2237                        | fio::Flags::PROTOCOL_DIRECTORY,
2238                    Default::default(),
2239                )
2240                .await
2241            };
2242            {
2243                let (_, immutable_attributes) =
2244                    dir_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2245                let node_id = immutable_attributes.id.unwrap();
2246
2247                project_proxy
2248                    .set_for_node(node_id, PROJECT_ID)
2249                    .await
2250                    .unwrap()
2251                    .expect("Setting project on node");
2252            }
2253
2254            let subdir_proxy = open_dir_checked(
2255                &dir_proxy,
2256                SUBDIR_NAME,
2257                fio::Flags::FLAG_MAYBE_CREATE
2258                    | fio::PERM_READABLE
2259                    | fio::PERM_WRITABLE
2260                    | fio::Flags::PROTOCOL_DIRECTORY,
2261                Default::default(),
2262            )
2263            .await;
2264            {
2265                let (_, immutable_attributes) = subdir_proxy
2266                    .get_attributes(fio::NodeAttributesQuery::ID)
2267                    .await
2268                    .unwrap()
2269                    .unwrap();
2270                let node_id = immutable_attributes.id.unwrap();
2271
2272                assert_eq!(
2273                    project_proxy
2274                        .get_for_node(node_id)
2275                        .await
2276                        .unwrap()
2277                        .expect("Setting project on node"),
2278                    PROJECT_ID
2279                );
2280            }
2281
2282            let file_proxy = open_file_checked(
2283                &subdir_proxy,
2284                FILE_NAME,
2285                fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2286                &Default::default(),
2287            )
2288            .await;
2289            {
2290                let (_, immutable_attributes) =
2291                    file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2292                let node_id = immutable_attributes.id.unwrap();
2293
2294                assert_eq!(
2295                    project_proxy
2296                        .get_for_node(node_id)
2297                        .await
2298                        .unwrap()
2299                        .expect("Setting project on node"),
2300                    PROJECT_ID
2301                );
2302            }
2303
2304            // An unnamed temporary file is created slightly differently to a regular file object.
2305            // Just in case, check that it inherits project ID as well.
2306            let tmpfile_proxy = open_file_checked(
2307                &subdir_proxy,
2308                ".",
2309                fio::Flags::PROTOCOL_FILE
2310                    | fio::Flags::FLAG_CREATE_AS_UNNAMED_TEMPORARY
2311                    | fio::PERM_READABLE,
2312                &fio::Options::default(),
2313            )
2314            .await;
2315            {
2316                let (_, immutable_attributes) = tmpfile_proxy
2317                    .get_attributes(fio::NodeAttributesQuery::ID)
2318                    .await
2319                    .unwrap()
2320                    .unwrap();
2321                let node_id: u64 = immutable_attributes.id.unwrap();
2322                assert_eq!(
2323                    project_proxy
2324                        .get_for_node(node_id)
2325                        .await
2326                        .unwrap()
2327                        .expect("Setting project on node"),
2328                    PROJECT_ID
2329                );
2330            }
2331
2332            let BytesAndNodes { nodes, .. } =
2333                project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2334            assert_eq!(nodes, 3);
2335            volumes_directory.terminate().await;
2336            filesystem.close().await.expect("close filesystem failed");
2337        }
2338        device = filesystem.take_device().await;
2339        device.ensure_unique();
2340        device.reopen(false);
2341        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2342        fsck(filesystem.clone()).await.expect("Fsck");
2343        fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
2344            .await
2345            .expect("Fsck volume");
2346        filesystem.close().await.expect("close filesystem failed");
2347    }
2348
2349    #[fuchsia::test]
2350    async fn test_project_listing() {
2351        const VOLUME_NAME: &str = "A";
2352        const FILE_NAME: &str = "B";
2353        const NON_ZERO_PROJECT_ID: u64 = 3;
2354        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
2355        let volume_store_id;
2356        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2357        {
2358            let blob_resupplied_count =
2359                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2360            let volumes_directory = VolumesDirectory::new(
2361                root_volume(filesystem.clone()).await.unwrap(),
2362                Weak::new(),
2363                None,
2364                blob_resupplied_count,
2365                MemoryPressureConfig::default(),
2366            )
2367            .await
2368            .unwrap();
2369            let volume_and_root = volumes_directory
2370                .create_and_mount_volume(VOLUME_NAME, None, false, None)
2371                .await
2372                .expect("create unencrypted volume failed");
2373            volume_store_id = volume_and_root.volume().store().store_object_id();
2374
2375            let (volume_dir_proxy, dir_server_end) =
2376                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2377            volumes_directory
2378                .serve_volume(&volume_and_root, dir_server_end, false)
2379                .expect("serve_volume failed");
2380            let project_proxy =
2381                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2382                    .expect("Unable to connect to project id service");
2383            // This is just to ensure that the small numbers below can be used for this test.
2384            assert!(FxVolume::MAX_PROJECT_ENTRIES >= 4);
2385            // Create a bunch of proxies. 3 more than the limit to ensure pagination.
2386            let num_entries = u64::try_from(FxVolume::MAX_PROJECT_ENTRIES + 3).unwrap();
2387            for project_id in 1..=num_entries {
2388                project_proxy.set_limit(project_id, 1, 1).await.unwrap().expect("To set limits");
2389            }
2390
2391            // Add one usage entry to be interspersed with the limit entries. Verifies that the
2392            // iterator will progress passed it with no effect.
2393            let file_proxy = {
2394                let (root_proxy, root_server_end) =
2395                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2396                volume_dir_proxy
2397                    .open(
2398                        "root",
2399                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2400                        &Default::default(),
2401                        root_server_end.into_channel(),
2402                    )
2403                    .expect("Failed to open volume root");
2404
2405                open_file_checked(
2406                    &root_proxy,
2407                    FILE_NAME,
2408                    fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2409                    &Default::default(),
2410                )
2411                .await
2412            };
2413            let (_, immutable_attributes) =
2414                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2415            let node_id = immutable_attributes.id.unwrap();
2416            project_proxy
2417                .set_for_node(node_id, NON_ZERO_PROJECT_ID)
2418                .await
2419                .unwrap()
2420                .expect("Setting project on node");
2421            {
2422                let BytesAndNodes { nodes, .. } = project_proxy
2423                    .info(NON_ZERO_PROJECT_ID)
2424                    .await
2425                    .unwrap()
2426                    .expect("Fetching project info")
2427                    .1;
2428                assert_eq!(nodes, 1);
2429            }
2430
2431            // If this `unwrap()` fails, it is likely the MAX_PROJECT_ENTRIES is too large for fidl.
2432            let (mut entries, mut next_token) =
2433                project_proxy.list(None).await.unwrap().expect("To get project listing");
2434            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2435            assert!(next_token.is_some());
2436            assert!(entries.contains(&1));
2437            assert!(entries.contains(&3));
2438            assert!(!entries.contains(&num_entries));
2439            // Page two should have a small set at the end.
2440            (entries, next_token) = project_proxy
2441                .list(next_token.as_deref())
2442                .await
2443                .unwrap()
2444                .expect("To get project listing");
2445            assert_eq!(entries.len(), 3);
2446            assert!(next_token.is_none());
2447            assert!(entries.contains(&num_entries));
2448            assert!(!entries.contains(&1));
2449            assert!(!entries.contains(&3));
2450            // Delete a couple and list all again, but one has usage still.
2451            project_proxy.clear(1).await.unwrap().expect("Clear project");
2452            project_proxy.clear(3).await.unwrap().expect("Clear project");
2453            (entries, next_token) =
2454                project_proxy.list(None).await.unwrap().expect("To get project listing");
2455            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2456            assert!(next_token.is_some());
2457            assert!(!entries.contains(&num_entries));
2458            assert!(!entries.contains(&1));
2459            assert!(entries.contains(&3));
2460            (entries, next_token) = project_proxy
2461                .list(next_token.as_deref())
2462                .await
2463                .unwrap()
2464                .expect("To get project listing");
2465            assert_eq!(entries.len(), 2);
2466            assert!(next_token.is_none());
2467            assert!(entries.contains(&num_entries));
2468            // Delete two more to hit the edge case.
2469            project_proxy.clear(2).await.unwrap().expect("Clear project");
2470            project_proxy.clear(4).await.unwrap().expect("Clear project");
2471            (entries, next_token) =
2472                project_proxy.list(None).await.unwrap().expect("To get project listing");
2473            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2474            assert!(next_token.is_none());
2475            assert!(entries.contains(&num_entries));
2476            volumes_directory.terminate().await;
2477            filesystem.close().await.expect("close filesystem failed");
2478        }
2479        device = filesystem.take_device().await;
2480        device.ensure_unique();
2481        device.reopen(false);
2482        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2483        fsck(filesystem.clone()).await.expect("Fsck");
2484        fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
2485        filesystem.close().await.expect("close filesystem failed");
2486    }
2487
2488    #[fuchsia::test(threads = 10)]
2489    async fn test_profile_blob() {
2490        let mut hashes = Vec::new();
2491        let device = {
2492            let fixture = blob_testing::new_blob_fixture().await;
2493
2494            for i in 0..3u64 {
2495                let hash =
2496                    fixture.write_blob(i.to_string().as_bytes(), CompressionMode::Never).await;
2497                hashes.push(hash);
2498            }
2499            fixture.close().await
2500        };
2501        device.ensure_unique();
2502
2503        device.reopen(false);
2504        let mut device = {
2505            let fixture = blob_testing::open_blob_fixture(device).await;
2506            fixture
2507                .volume()
2508                .volume()
2509                .record_or_replay_profile(new_profile_state(true), "foo")
2510                .await
2511                .expect("Recording");
2512
2513            // Page in the zero offsets only to avoid readahead strangeness.
2514            let mut writable = [0u8];
2515            for hash in &hashes {
2516                let vmo = fixture.get_blob_vmo(*hash).await;
2517                vmo.read(&mut writable, 0).expect("Vmo read");
2518            }
2519            fixture.volume().volume().stop_profile_tasks().await;
2520            fixture.close().await
2521        };
2522
2523        // Do this multiple times to ensure that the re-recording doesn't drop anything.
2524        for i in 0..3 {
2525            device.ensure_unique();
2526            device.reopen(false);
2527            let fixture = blob_testing::open_blob_fixture(device).await;
2528            {
2529                // Ensure that nothing is paged in right now.
2530                for hash in &hashes {
2531                    let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2532                    assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2533                }
2534
2535                fixture
2536                    .volume()
2537                    .volume()
2538                    .record_or_replay_profile(new_profile_state(true), "foo")
2539                    .await
2540                    .expect("Replaying");
2541
2542                // Move the file in flight to ensure a new version lands to be used next time.
2543                {
2544                    let store_id = fixture.volume().volume().store().store_object_id();
2545                    let dir = fixture.volume().volume().get_profile_directory().await.unwrap();
2546                    let old_file = dir.lookup("foo").await.unwrap().unwrap().0;
2547                    let mut transaction = fixture
2548                        .fs()
2549                        .clone()
2550                        .new_transaction(
2551                            lock_keys!(
2552                                LockKey::object(store_id, dir.object_id()),
2553                                LockKey::object(store_id, old_file),
2554                            ),
2555                            Options::default(),
2556                        )
2557                        .await
2558                        .unwrap();
2559                    replace_child(&mut transaction, Some((&dir, "foo")), (&dir, &i.to_string()))
2560                        .await
2561                        .expect("Replace old profile.");
2562                    transaction.commit().await.unwrap();
2563                    assert!(
2564                        dir.lookup("foo").await.unwrap().is_none(),
2565                        "Old profile should be moved"
2566                    );
2567                }
2568
2569                // Await all data being played back by checking that things have paged in.
2570                async {
2571                    for hash in &hashes {
2572                        // Fetch vmo this way as well to ensure that the open is counting the file
2573                        // as used in the current recording.
2574                        let _vmo = fixture.get_blob_vmo(*hash).await;
2575                        let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2576                        while blob.vmo().info().unwrap().populated_bytes == 0 {
2577                            fasync::Timer::new(Duration::from_millis(25)).await;
2578                        }
2579                    }
2580                }
2581                .on_timeout(std::time::Duration::from_secs(120), || {
2582                    panic!("Replay did not page in for all VMOs")
2583                })
2584                .await;
2585
2586                // Complete the recording.
2587                fixture.volume().volume().stop_profile_tasks().await;
2588            }
2589            device = fixture.close().await;
2590        }
2591    }
2592
2593    #[fuchsia::test(threads = 10)]
2594    async fn test_profile_file() {
2595        let mut hashes = Vec::new();
2596        let crypt_file_id;
2597        let device = {
2598            let fixture = TestFixture::new().await;
2599            // Include a crypt file to access during the recording. It should not be added.
2600            {
2601                let crypt_dir = open_dir_checked(
2602                    fixture.root(),
2603                    "crypt_dir",
2604                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2605                    Default::default(),
2606                )
2607                .await;
2608                let crypt: Arc<CryptBase> = fixture.crypt().unwrap();
2609                crypt
2610                    .add_wrapping_key(WRAPPING_KEY_ID, [1; 32].into())
2611                    .expect("add_wrapping_key failed");
2612                crypt_dir
2613                    .update_attributes(&fio::MutableNodeAttributes {
2614                        wrapping_key_id: Some(WRAPPING_KEY_ID),
2615                        ..Default::default()
2616                    })
2617                    .await
2618                    .expect("update_attributes wire call failed")
2619                    .expect("update_attributes failed");
2620                let crypt_file = open_file_checked(
2621                    &crypt_dir,
2622                    "crypt_file",
2623                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2624                    &Default::default(),
2625                )
2626                .await;
2627                crypt_file.write("asdf".as_bytes()).await.unwrap().expect("Writing crypt file");
2628                crypt_file_id = crypt_file
2629                    .get_attributes(fio::NodeAttributesQuery::ID)
2630                    .await
2631                    .unwrap()
2632                    .expect("Get id")
2633                    .1
2634                    .id
2635                    .expect("Reading id in response");
2636            }
2637
2638            for i in 0..3u64 {
2639                let file_proxy = open_file_checked(
2640                    fixture.root(),
2641                    &i.to_string(),
2642                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2643                    &Default::default(),
2644                )
2645                .await;
2646                file_proxy.write(i.to_string().as_bytes()).await.unwrap().expect("Writing file");
2647                let id = file_proxy
2648                    .get_attributes(fio::NodeAttributesQuery::ID)
2649                    .await
2650                    .unwrap()
2651                    .expect("Get id")
2652                    .1
2653                    .id
2654                    .expect("Reading id in response");
2655                hashes.push((i, id));
2656            }
2657            fixture.close().await
2658        };
2659        device.ensure_unique();
2660
2661        device.reopen(false);
2662        let mut device = {
2663            let fixture = TestFixture::open(
2664                device,
2665                TestFixtureOptions { format: false, ..Default::default() },
2666            )
2667            .await;
2668            fixture
2669                .volume()
2670                .volume()
2671                .record_or_replay_profile(new_profile_state(false), "foo")
2672                .await
2673                .expect("Recording");
2674
2675            {
2676                let crypt: Arc<CryptBase> = fixture.crypt().unwrap();
2677                crypt
2678                    .add_wrapping_key(WRAPPING_KEY_ID, [1; 32].into())
2679                    .expect("add wrapping key failed");
2680                let crypt_file = open_file_checked(
2681                    fixture.root(),
2682                    "crypt_dir/crypt_file",
2683                    fio::PERM_READABLE,
2684                    &Default::default(),
2685                )
2686                .await;
2687                crypt_file.read(1).await.unwrap().expect("Reading crypt file");
2688            }
2689            // Page in the zero offsets only to avoid readahead strangeness.
2690            for (i, _) in &hashes {
2691                let file_proxy = open_file_checked(
2692                    fixture.root(),
2693                    &i.to_string(),
2694                    fio::PERM_READABLE,
2695                    &Default::default(),
2696                )
2697                .await;
2698                file_proxy.read(1).await.unwrap().expect("Reading file");
2699            }
2700            fixture.volume().volume().stop_profile_tasks().await;
2701            fixture.close().await
2702        };
2703
2704        // Do this multiple times to ensure that the re-recording doesn't drop anything.
2705        for i in 0..3 {
2706            device.ensure_unique();
2707            device.reopen(false);
2708            let fixture = TestFixture::open(
2709                device,
2710                TestFixtureOptions { format: false, ..Default::default() },
2711            )
2712            .await;
2713            {
2714                // Need to get the root vmo to check committed bytes.
2715                let volume = fixture.volume().volume().clone();
2716                // Ensure that nothing is paged in right now.
2717                {
2718                    let crypt_file = volume
2719                        .get_or_load_node(crypt_file_id, ObjectDescriptor::File, None)
2720                        .await
2721                        .expect("Opening file internally")
2722                        .into_any()
2723                        .downcast::<FxFile>()
2724                        .expect("Should be file");
2725                    assert_eq!(crypt_file.vmo().info().unwrap().populated_bytes, 0);
2726                }
2727                for (_, id) in &hashes {
2728                    let file = volume
2729                        .get_or_load_node(*id, ObjectDescriptor::File, None)
2730                        .await
2731                        .expect("Opening file internally")
2732                        .into_any()
2733                        .downcast::<FxFile>()
2734                        .expect("Should be file");
2735                    assert_eq!(file.vmo().info().unwrap().populated_bytes, 0);
2736                }
2737
2738                fixture
2739                    .volume()
2740                    .volume()
2741                    .record_or_replay_profile(new_profile_state(false), "foo")
2742                    .await
2743                    .expect("Replaying");
2744
2745                // Move the file in flight to ensure a new version lands to be used next time.
2746                {
2747                    let store_id = fixture.volume().volume().store().store_object_id();
2748                    let dir = fixture.volume().volume().get_profile_directory().await.unwrap();
2749                    let old_file = dir.lookup("foo").await.unwrap().unwrap().0;
2750                    let mut transaction = fixture
2751                        .fs()
2752                        .clone()
2753                        .new_transaction(
2754                            lock_keys!(
2755                                LockKey::object(store_id, dir.object_id()),
2756                                LockKey::object(store_id, old_file),
2757                            ),
2758                            Options::default(),
2759                        )
2760                        .await
2761                        .unwrap();
2762                    replace_child(&mut transaction, Some((&dir, "foo")), (&dir, &i.to_string()))
2763                        .await
2764                        .expect("Replace old profile.");
2765                    transaction.commit().await.unwrap();
2766                    assert!(
2767                        dir.lookup("foo").await.unwrap().is_none(),
2768                        "Old profile should be moved"
2769                    );
2770                }
2771
2772                // Await all data being played back by checking that things have paged in.
2773                async {
2774                    for (_, id) in &hashes {
2775                        let file = volume
2776                            .get_or_load_node(*id, ObjectDescriptor::File, None)
2777                            .await
2778                            .expect("Opening file internally")
2779                            .into_any()
2780                            .downcast::<FxFile>()
2781                            .expect("Should be file");
2782                        while file.vmo().info().unwrap().populated_bytes == 0 {
2783                            fasync::Timer::new(Duration::from_millis(25)).await;
2784                        }
2785                    }
2786                }
2787                .on_timeout(std::time::Duration::from_secs(120), || {
2788                    panic!("Replay did not page in for all VMOs")
2789                })
2790                .await;
2791                // The crypt file access should not have been recorded or replayed.
2792                {
2793                    let crypt_file = volume
2794                        .get_or_load_node(crypt_file_id, ObjectDescriptor::File, None)
2795                        .await
2796                        .expect("Opening file internally")
2797                        .into_any()
2798                        .downcast::<FxFile>()
2799                        .expect("Should be file");
2800                    assert_eq!(crypt_file.vmo().info().unwrap().populated_bytes, 0);
2801                }
2802
2803                // Open all the files to show that they have been used.
2804                for (i, _) in &hashes {
2805                    let _file_proxy = open_file_checked(
2806                        fixture.root(),
2807                        &i.to_string(),
2808                        fio::PERM_READABLE,
2809                        &Default::default(),
2810                    )
2811                    .await;
2812                }
2813
2814                // Complete the recording.
2815                fixture.volume().volume().stop_profile_tasks().await;
2816            }
2817            device = fixture.close().await;
2818        }
2819    }
2820
2821    #[fuchsia::test(threads = 10)]
2822    async fn test_profile_update() {
2823        let mut hashes = Vec::new();
2824        let device = {
2825            let fixture = blob_testing::new_blob_fixture().await;
2826            for i in 0..2u64 {
2827                let hash =
2828                    fixture.write_blob(i.to_string().as_bytes(), CompressionMode::Never).await;
2829                hashes.push(hash);
2830            }
2831            fixture.close().await
2832        };
2833        device.ensure_unique();
2834
2835        device.reopen(false);
2836        let device = {
2837            let fixture = blob_testing::open_blob_fixture(device).await;
2838
2839            {
2840                let volume = fixture.volume().volume();
2841                volume
2842                    .record_or_replay_profile(new_profile_state(true), "foo")
2843                    .await
2844                    .expect("Recording");
2845
2846                let original_recorded = RECORDED.load(Ordering::Relaxed);
2847
2848                // Page in the zero offsets only to avoid readahead strangeness.
2849                {
2850                    let mut writable = [0u8];
2851                    let hash = &hashes[0];
2852                    let vmo = fixture.get_blob_vmo(*hash).await;
2853                    vmo.read(&mut writable, 0).expect("Vmo read");
2854                }
2855
2856                // The recording happens asynchronously, so we must wait.  This is crude, but it's
2857                // only for testing and it's simple.
2858                while RECORDED.load(Ordering::Relaxed) == original_recorded {
2859                    fasync::Timer::new(std::time::Duration::from_millis(10)).await;
2860                }
2861
2862                volume.stop_profile_tasks().await;
2863            }
2864            fixture.close().await
2865        };
2866
2867        device.ensure_unique();
2868        device.reopen(false);
2869        let fixture = blob_testing::open_blob_fixture(device).await;
2870        {
2871            // Need to get the root vmo to check committed bytes.
2872            // Ensure that nothing is paged in right now.
2873            for hash in &hashes {
2874                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2875                assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2876            }
2877
2878            let volume = fixture.volume().volume();
2879
2880            volume
2881                .record_or_replay_profile(new_profile_state(true), "foo")
2882                .await
2883                .expect("Replaying");
2884
2885            // Await all data being played back by checking that things have paged in.
2886            async {
2887                let hash = &hashes[0];
2888                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2889                while blob.vmo().info().unwrap().populated_bytes == 0 {
2890                    fasync::Timer::new(Duration::from_millis(25)).await;
2891                }
2892            }
2893            .on_timeout(std::time::Duration::from_secs(120), || {
2894                panic!("Replay did not page in for all VMOs")
2895            })
2896            .await;
2897
2898            let original_recorded = RECORDED.load(Ordering::Relaxed);
2899
2900            // Record the new profile that will overwrite it.
2901            {
2902                let mut writable = [0u8];
2903                let hash = &hashes[1];
2904                let vmo = fixture.get_blob_vmo(*hash).await;
2905                vmo.read(&mut writable, 0).expect("Vmo read");
2906            }
2907
2908            // The recording happens asynchronously, so we must wait.  This is crude, but it's only
2909            // for testing and it's simple.
2910            while RECORDED.load(Ordering::Relaxed) == original_recorded {
2911                fasync::Timer::new(std::time::Duration::from_millis(10)).await;
2912            }
2913
2914            // Complete the recording.
2915            volume.stop_profile_tasks().await;
2916        }
2917        let device = fixture.close().await;
2918
2919        device.ensure_unique();
2920        device.reopen(false);
2921        let fixture = blob_testing::open_blob_fixture(device).await;
2922        {
2923            // Need to get the root vmo to check committed bytes.
2924            // Ensure that nothing is paged in right now.
2925            for hash in &hashes {
2926                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2927                assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2928            }
2929
2930            fixture
2931                .volume()
2932                .volume()
2933                .record_or_replay_profile(new_profile_state(true), "foo")
2934                .await
2935                .expect("Replaying");
2936
2937            // Await all data being played back by checking that things have paged in.
2938            async {
2939                let hash = &hashes[1];
2940                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2941                while blob.vmo().info().unwrap().populated_bytes == 0 {
2942                    fasync::Timer::new(Duration::from_millis(25)).await;
2943                }
2944            }
2945            .on_timeout(std::time::Duration::from_secs(30), || {
2946                panic!("Replay did not page in for all VMOs")
2947            })
2948            .await;
2949
2950            // Complete the recording.
2951            fixture.volume().volume().stop_profile_tasks().await;
2952
2953            // Verify that first blob was not paged in as the it should be dropped from the profile.
2954            {
2955                let hash = &hashes[0];
2956                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2957                assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2958            }
2959        }
2960        fixture.close().await;
2961    }
2962
2963    #[fuchsia::test(threads = 10)]
2964    async fn test_unencrypted_volume() {
2965        let fixture = TestFixture::new_unencrypted().await;
2966        let root = fixture.root();
2967
2968        let f = open_file_checked(
2969            &root,
2970            "foo",
2971            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
2972            &Default::default(),
2973        )
2974        .await;
2975        close_file_checked(f).await;
2976
2977        fixture.close().await;
2978    }
2979
2980    #[fuchsia::test]
2981    async fn test_read_only_unencrypted_volume() {
2982        // Make a new Fxfs filesystem with an unencrypted volume named "vol".
2983        let fs = {
2984            let device = fxfs::filesystem::mkfs_with_volume(
2985                DeviceHolder::new(FakeDevice::new(8192, 512)),
2986                "vol",
2987                None,
2988            )
2989            .await
2990            .unwrap();
2991            // Re-open the device as read-only and mount the filesystem as read-only.
2992            device.reopen(true);
2993            FxFilesystemBuilder::new().read_only(true).open(device).await.unwrap()
2994        };
2995        // Ensure we can access the volume and gracefully terminate any tasks.
2996        {
2997            let root_volume = root_volume(fs.clone()).await.unwrap();
2998            let store = root_volume.volume("vol", StoreOptions::default()).await.unwrap();
2999            let unique_id = store.store_object_id();
3000            let blob_resupplied_count =
3001                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
3002            let volume = FxVolume::new(
3003                Weak::new(),
3004                store,
3005                unique_id,
3006                "vol".to_owned(),
3007                blob_resupplied_count,
3008                MemoryPressureConfig::default(),
3009            )
3010            .unwrap();
3011            volume.terminate().await;
3012        }
3013        // Close the filesystem, and make sure we don't have any dangling references.
3014        fs.close().await.unwrap();
3015        let device = fs.take_device().await;
3016        device.ensure_unique();
3017    }
3018
3019    #[fuchsia::test]
3020    async fn test_read_only_encrypted_volume() {
3021        let crypt: Arc<CryptBase> = Arc::new(new_insecure_crypt());
3022        // Make a new Fxfs filesystem with an encrypted volume named "vol".
3023        let fs = {
3024            let device = fxfs::filesystem::mkfs_with_volume(
3025                DeviceHolder::new(FakeDevice::new(8192, 512)),
3026                "vol",
3027                Some(crypt.clone()),
3028            )
3029            .await
3030            .unwrap();
3031            // Re-open the device as read-only and mount the filesystem as read-only.
3032            device.reopen(true);
3033            FxFilesystemBuilder::new().read_only(true).open(device).await.unwrap()
3034        };
3035        // Ensure we can access the volume and gracefully terminate any tasks.
3036        {
3037            let root_volume = root_volume(fs.clone()).await.unwrap();
3038            let store = root_volume
3039                .volume("vol", StoreOptions { crypt: Some(crypt), ..StoreOptions::default() })
3040                .await
3041                .unwrap();
3042            let unique_id = store.store_object_id();
3043            let blob_resupplied_count =
3044                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
3045            let volume = FxVolume::new(
3046                Weak::new(),
3047                store,
3048                unique_id,
3049                "vol".to_owned(),
3050                blob_resupplied_count,
3051                MemoryPressureConfig::default(),
3052            )
3053            .unwrap();
3054            volume.terminate().await;
3055        }
3056        // Close the filesystem, and make sure we don't have any dangling references.
3057        fs.close().await.unwrap();
3058        let device = fs.take_device().await;
3059        device.ensure_unique();
3060    }
3061}