Skip to main content

fxfs_platform/fuchsia/
volume.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::component::map_to_raw_status;
6use crate::fuchsia::directory::FxDirectory;
7use crate::fuchsia::dirent_cache::DirentCache;
8use crate::fuchsia::file::FxFile;
9use crate::fuchsia::memory_pressure::{MemoryPressureLevel, MemoryPressureMonitor};
10use crate::fuchsia::node::{FxNode, GetResult, NodeCache};
11use crate::fuchsia::pager::Pager;
12use crate::fuchsia::profile::ProfileState;
13use crate::fuchsia::symlink::FxSymlink;
14use crate::fuchsia::volumes_directory::VolumesDirectory;
15use anyhow::{Error, bail, ensure};
16use async_trait::async_trait;
17use fidl::endpoints::ServerEnd;
18use fidl_fuchsia_fxfs::{
19    BytesAndNodes, FileBackedVolumeProviderRequest, FileBackedVolumeProviderRequestStream,
20    ProjectIdRequest, ProjectIdRequestStream, ProjectIterToken,
21};
22use fs_inspect::{FsInspectVolume, VolumeData};
23use fuchsia_async::epoch::Epoch;
24use fuchsia_sync::Mutex;
25use futures::channel::oneshot;
26use futures::stream::{self, FusedStream, Stream};
27use futures::{FutureExt, StreamExt, TryStreamExt};
28use fxfs::errors::FxfsError;
29use fxfs::filesystem::{self, SyncOptions};
30use fxfs::future_with_guard::FutureWithGuard;
31use fxfs::log::*;
32use fxfs::object_store::directory::Directory;
33use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
34use fxfs::object_store::{HandleOptions, HandleOwner, ObjectDescriptor, ObjectStore};
35use refaults_vmo::PageRefaultCounter;
36use std::future::Future;
37use std::pin::pin;
38#[cfg(any(test, feature = "testing"))]
39use std::sync::atomic::AtomicBool;
40use std::sync::atomic::{AtomicU64, Ordering};
41use std::sync::{Arc, Weak};
42use std::time::Duration;
43use vfs::directory::entry::DirectoryEntry;
44use vfs::directory::simple::Simple;
45use vfs::execution_scope::ExecutionScope;
46use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
47
48// LINT.IfChange
49// TODO:(b/299919008) Fix this number to something reasonable, or maybe just for fxblob.
50const DIRENT_CACHE_LIMIT: usize = 2000;
51// LINT.ThenChange(//src/storage/stressor/src/aggressive.rs)
52
53/// The smallest read-ahead size. All other read-ahead sizes will be a multiple of this read-ahead
54/// size. Having this property allows for chunking metadata at this granularity.
55pub const BASE_READ_AHEAD_SIZE: u64 = 32 * 1024;
56pub const MAX_READ_AHEAD_SIZE: u64 = BASE_READ_AHEAD_SIZE * 4;
57
58const PROFILE_DIRECTORY: &str = "profiles";
59
60#[derive(Clone, Copy, Debug)]
61pub struct MemoryPressureLevelConfig {
62    /// The period to wait between flushes, as well as perform other background maintenance tasks
63    /// (e.g. purging caches).
64    pub background_task_period: Duration,
65
66    /// The limit of cached nodes.
67    pub cache_size_limit: usize,
68
69    /// The initial delay before the background task runs. The background task has a longer initial
70    /// delay to avoid running the task during boot.
71    pub background_task_initial_delay: Duration,
72
73    /// The amount of read-ahead to do. The read-ahead size is reduce when under memory pressure.
74    /// The kernel starts evicting pages when under memory pressure so over supplied pages are less
75    /// likely to be used before being evicted.
76    pub read_ahead_size: u64,
77}
78
79impl Default for MemoryPressureLevelConfig {
80    fn default() -> Self {
81        Self {
82            background_task_period: Duration::from_secs(20),
83            cache_size_limit: DIRENT_CACHE_LIMIT,
84            background_task_initial_delay: Duration::from_secs(70),
85            read_ahead_size: MAX_READ_AHEAD_SIZE,
86        }
87    }
88}
89
90#[derive(Clone, Copy, Debug)]
91pub struct MemoryPressureConfig {
92    /// The configuration to use at [`MemoryPressureLevel::Normal`].
93    pub mem_normal: MemoryPressureLevelConfig,
94
95    /// The configuration to use at [`MemoryPressureLevel::Warning`].
96    pub mem_warning: MemoryPressureLevelConfig,
97
98    /// The configuration to use at [`MemoryPressureLevel::Critical`].
99    pub mem_critical: MemoryPressureLevelConfig,
100}
101
102impl MemoryPressureConfig {
103    pub fn for_level(&self, level: &MemoryPressureLevel) -> &MemoryPressureLevelConfig {
104        match level {
105            MemoryPressureLevel::Normal => &self.mem_normal,
106            MemoryPressureLevel::Warning => &self.mem_warning,
107            MemoryPressureLevel::Critical => &self.mem_critical,
108        }
109    }
110}
111
112impl Default for MemoryPressureConfig {
113    fn default() -> Self {
114        // TODO(https://fxbug.dev/42061389): investigate a smarter strategy for determining flush
115        // frequency.
116        Self {
117            mem_normal: MemoryPressureLevelConfig {
118                background_task_period: Duration::from_secs(20),
119                cache_size_limit: DIRENT_CACHE_LIMIT,
120                background_task_initial_delay: Duration::from_secs(70),
121                read_ahead_size: MAX_READ_AHEAD_SIZE,
122            },
123            mem_warning: MemoryPressureLevelConfig {
124                background_task_period: Duration::from_secs(5),
125                cache_size_limit: 100,
126                background_task_initial_delay: Duration::from_secs(5),
127                read_ahead_size: BASE_READ_AHEAD_SIZE * 2,
128            },
129            mem_critical: MemoryPressureLevelConfig {
130                background_task_period: Duration::from_millis(1500),
131                cache_size_limit: 20,
132                background_task_initial_delay: Duration::from_millis(1500),
133                read_ahead_size: BASE_READ_AHEAD_SIZE,
134            },
135        }
136    }
137}
138
139/// FxVolume represents an opened volume. It is also a (weak) cache for all opened Nodes within the
140/// volume.
141pub struct FxVolume {
142    parent: Weak<VolumesDirectory>,
143    cache: NodeCache,
144    store: Arc<ObjectStore>,
145    pager: Pager,
146    executor: fasync::EHandle,
147
148    // A tuple of the actual task and a channel to signal to terminate the task.
149    background_task: Mutex<Option<(fasync::Task<()>, oneshot::Sender<()>)>>,
150
151    // Unique identifier of the filesystem that owns this volume.
152    fs_id: u64,
153
154    // The execution scope for this volume.
155    scope: ExecutionScope,
156
157    dirent_cache: DirentCache,
158
159    profile_state: Mutex<Option<Box<dyn ProfileState>>>,
160
161    /// This is updated based on the memory-pressure level.
162    read_ahead_size: AtomicU64,
163
164    #[cfg(any(test, feature = "testing"))]
165    poisoned: AtomicBool,
166
167    blob_resupplied_count: Arc<PageRefaultCounter>,
168}
169
170#[fxfs_trace::trace]
171impl FxVolume {
172    pub fn new(
173        parent: Weak<VolumesDirectory>,
174        store: Arc<ObjectStore>,
175        fs_id: u64,
176        blob_resupplied_count: Arc<PageRefaultCounter>,
177        memory_pressure_config: MemoryPressureConfig,
178    ) -> Result<Self, Error> {
179        let scope = ExecutionScope::new();
180        Ok(Self {
181            parent,
182            cache: NodeCache::new(),
183            store,
184            pager: Pager::new(scope.clone())?,
185            executor: fasync::EHandle::local(),
186            background_task: Mutex::new(None),
187            fs_id,
188            scope,
189            dirent_cache: DirentCache::new(memory_pressure_config.mem_normal.cache_size_limit),
190            profile_state: Mutex::new(None),
191            read_ahead_size: AtomicU64::new(memory_pressure_config.mem_normal.read_ahead_size),
192            #[cfg(any(test, feature = "testing"))]
193            poisoned: AtomicBool::new(false),
194            blob_resupplied_count,
195        })
196    }
197
198    pub fn store(&self) -> &Arc<ObjectStore> {
199        &self.store
200    }
201
202    pub fn cache(&self) -> &NodeCache {
203        &self.cache
204    }
205
206    pub fn dirent_cache(&self) -> &DirentCache {
207        &self.dirent_cache
208    }
209
210    pub fn pager(&self) -> &Pager {
211        &self.pager
212    }
213
214    pub fn id(&self) -> u64 {
215        self.fs_id
216    }
217
218    pub fn scope(&self) -> &ExecutionScope {
219        &self.scope
220    }
221
222    pub fn blob_resupplied_count(&self) -> &PageRefaultCounter {
223        &self.blob_resupplied_count
224    }
225
226    /// Reports the filesystem info, but if the volume has a space limit applied then the space
227    /// available and space used are reported based on the volume instead.
228    pub fn filesystem_info_for_volume(&self) -> fio::FilesystemInfo {
229        let allocator = self.store.filesystem().allocator();
230        let info =
231            if let Some(limit) = allocator.get_owner_bytes_limit(self.store.store_object_id()) {
232                filesystem::Info {
233                    used_bytes: allocator.get_owner_bytes_used(self.store.store_object_id()),
234                    total_bytes: limit,
235                }
236            } else {
237                self.store.filesystem().get_info()
238            };
239
240        info_to_filesystem_info(info, self.store.block_size(), self.store.object_count(), self.id())
241    }
242
243    /// Stop profiling, recover resources from it and finalize recordings.
244    pub async fn stop_profile_tasks(self: &Arc<Self>) {
245        let Some(mut state) = self.profile_state.lock().take() else { return };
246        state.wait_for_replay_to_finish().await;
247        self.pager.set_recorder(None);
248        let _ = state.wait_for_recording_to_finish().await;
249    }
250
251    /// Opens or creates the profile directory in the volume's internal directory.
252    pub async fn get_profile_directory(self: &Arc<Self>) -> Result<Directory<FxVolume>, Error> {
253        let internal_dir = self
254            .get_or_create_internal_dir()
255            .await
256            .map_err(|e| e.context("Opening internal directory"))?;
257        // Have to do separate calls to create the profile dir if necessary.
258        let mut transaction = self
259            .store()
260            .filesystem()
261            .new_transaction(
262                lock_keys![LockKey::object(
263                    self.store().store_object_id(),
264                    internal_dir.object_id(),
265                )],
266                Options::default(),
267            )
268            .await?;
269        Ok(match internal_dir.directory().lookup(PROFILE_DIRECTORY).await? {
270            Some((object_id, _, _)) => {
271                Directory::open_unchecked(self.clone(), object_id, None, false)
272            }
273            None => {
274                let new_dir = internal_dir
275                    .directory()
276                    .create_child_dir(&mut transaction, PROFILE_DIRECTORY)
277                    .await?;
278                transaction.commit().await?;
279                new_dir
280            }
281        })
282    }
283
284    /// Starts recording a profile for the volume under the name given, and if a profile exists
285    /// under that same name it is replayed and will be replaced after by the new recording if it
286    /// is cleanly shutdown and finalized.
287    pub async fn record_or_replay_profile(
288        self: &Arc<Self>,
289        mut state: Box<dyn ProfileState>,
290        name: &str,
291    ) -> Result<(), Error> {
292        // We don't meddle in FxDirectory or FxFile here because we don't want a paged object.
293        // Normally we ensure that there's only one copy by using the Node cache on the volume, but
294        // that would create FxFile, so in this case we just assume that only one profile operation
295        // should be ongoing at a time, as that is ensured in `VolumesDirectory`.
296
297        // If there is a recording already, prepare to replay it.
298        let profile_dir = self.get_profile_directory().await?;
299        let replay_handle = if let Some((id, descriptor, _)) = profile_dir.lookup(name).await? {
300            ensure!(matches!(descriptor, ObjectDescriptor::File), FxfsError::Inconsistent);
301            Some(Box::new(
302                ObjectStore::open_object(self, id, HandleOptions::default(), None).await?,
303            ))
304        } else {
305            None
306        };
307
308        let mut profile_state = self.profile_state.lock();
309
310        info!("Recording new profile '{name}' for volume object {}", self.store.store_object_id());
311        // Begin recording first to ensure that we capture any activity from the replay.
312        self.pager.set_recorder(Some(state.record_new(self, name)));
313        if let Some(handle) = replay_handle {
314            if let Some(guard) = self.scope().try_active_guard() {
315                state.replay_profile(handle, self.clone(), guard);
316                info!(
317                    "Replaying existing profile '{name}' for volume object {}",
318                    self.store.store_object_id()
319                );
320            }
321        }
322        *profile_state = Some(state);
323        Ok(())
324    }
325
326    async fn get_or_create_internal_dir(self: &Arc<Self>) -> Result<Arc<FxDirectory>, Error> {
327        let internal_data_id = self.store().get_or_create_internal_directory_id().await?;
328        let internal_dir = self
329            .get_or_load_node(internal_data_id, ObjectDescriptor::Directory, None)
330            .await?
331            .into_any()
332            .downcast::<FxDirectory>()
333            .unwrap();
334        Ok(internal_dir)
335    }
336
337    pub async fn terminate(&self) {
338        let task = std::mem::replace(&mut *self.background_task.lock(), None);
339        if let Some((task, terminate)) = task {
340            let _ = terminate.send(());
341            task.await;
342        }
343
344        // `NodeCache::terminate` will break any strong reference cycles contained within nodes
345        // (pager registration). The only remaining nodes should be those with open FIDL
346        // connections or vmo references in the process of handling the VMO_ZERO_CHILDREN signal.
347        // `ExecutionScope::shutdown` + `ExecutionScope::wait` will close the open FIDL connections
348        // and synchonrize the signal handling which should result in all nodes flushing and then
349        // dropping. Any async tasks required to flush a node should take an active guard on the
350        // `ExecutionScope` which will prevent `ExecutionScope::wait` from completing until all
351        // nodes are flushed.
352        self.scope.shutdown();
353        self.cache.terminate();
354        self.scope.wait().await;
355
356        // Make sure there are no deferred operations still pending for this volume.
357        Epoch::global().barrier().await;
358
359        // The dirent_cache must be cleared *after* shutting down the scope because there can be
360        // tasks that insert entries into the cache.
361        self.dirent_cache.clear();
362
363        if self.store.filesystem().options().read_only {
364            // If the filesystem is read only, we don't need to flush/sync anything.
365            if self.store.is_unlocked() {
366                self.store.lock_read_only();
367            }
368            return;
369        }
370
371        self.flush_all_files(true).await;
372        self.store.filesystem().graveyard().flush().await;
373        if self.store.crypt().is_some() {
374            if let Err(e) = self.store.lock().await {
375                // The store will be left in a safe state and there won't be data-loss unless
376                // there's an issue flushing the journal later.
377                warn!(error:? = e; "Locking store error");
378            }
379        }
380        let sync_status = self
381            .store
382            .filesystem()
383            .sync(SyncOptions { flush_device: true, ..Default::default() })
384            .await;
385        if let Err(e) = sync_status {
386            error!(error:? = e; "Failed to sync filesystem; data may be lost");
387        }
388    }
389
390    /// Attempts to get a node from the node cache. If the node wasn't present in the cache, loads
391    /// the object from the object store, installing the returned node into the cache and returns
392    /// the newly created FxNode backed by the loaded object.  |parent| is only set on the node if
393    /// the node was not present in the cache.  Otherwise, it is ignored.
394    pub async fn get_or_load_node(
395        self: &Arc<Self>,
396        object_id: u64,
397        object_descriptor: ObjectDescriptor,
398        parent: Option<Arc<FxDirectory>>,
399    ) -> Result<Arc<dyn FxNode>, Error> {
400        match self.cache.get_or_reserve(object_id).await {
401            GetResult::Node(node) => Ok(node),
402            GetResult::Placeholder(placeholder) => {
403                let node = match object_descriptor {
404                    ObjectDescriptor::File => FxFile::new(
405                        ObjectStore::open_object(self, object_id, HandleOptions::default(), None)
406                            .await?,
407                    ) as Arc<dyn FxNode>,
408                    ObjectDescriptor::Directory => {
409                        // Can't use open_unchecked because we don't know if the dir is casefolded
410                        // or encrypted.
411                        Arc::new(FxDirectory::new(parent, Directory::open(self, object_id).await?))
412                            as Arc<dyn FxNode>
413                    }
414                    ObjectDescriptor::Symlink => Arc::new(FxSymlink::new(self.clone(), object_id)),
415                    _ => bail!(FxfsError::Inconsistent),
416                };
417                placeholder.commit(&node);
418                Ok(node)
419            }
420        }
421    }
422
423    /// Marks the given directory deleted.
424    pub fn mark_directory_deleted(&self, object_id: u64) {
425        if let Some(node) = self.cache.get(object_id) {
426            // It's possible that node is a placeholder, in which case we don't need to wait for it
427            // to be resolved because it should be blocked behind the locks that are held by the
428            // caller, and once they're dropped, it'll be found to be deleted via the tree.
429            if let Ok(dir) = node.into_any().downcast::<FxDirectory>() {
430                dir.set_deleted();
431            }
432        }
433    }
434
435    /// Removes resources associated with |object_id| (which ought to be a file), if there are no
436    /// open connections to that file.
437    ///
438    /// This must be called *after committing* a transaction which deletes the last reference to
439    /// |object_id|, since before that point, new connections could be established.
440    pub(super) async fn maybe_purge_file(&self, object_id: u64) -> Result<(), Error> {
441        if let Some(node) = self.cache.get(object_id) {
442            node.mark_to_be_purged();
443            return Ok(());
444        }
445        // If this fails, the graveyard should clean it up on next mount.
446        self.store
447            .tombstone_object(
448                object_id,
449                Options { borrow_metadata_space: true, ..Default::default() },
450            )
451            .await?;
452        Ok(())
453    }
454
455    /// Starts the background work task.  This task will periodically:
456    ///   - scan all files and flush them to disk, and
457    ///   - purge unused cached data.
458    /// The task will hold a strong reference to the FxVolume while it is running, so the task must
459    /// be closed later with Self::terminate, or the FxVolume will never be dropped.
460    pub fn start_background_task(
461        self: &Arc<Self>,
462        config: MemoryPressureConfig,
463        mem_monitor: Option<&MemoryPressureMonitor>,
464    ) {
465        let mut background_task = self.background_task.lock();
466        if background_task.is_none() {
467            let (tx, rx) = oneshot::channel();
468
469            let task = if let Some(mem_monitor) = mem_monitor {
470                fasync::Task::spawn(self.clone().background_task(
471                    config,
472                    mem_monitor.get_level_stream(),
473                    rx,
474                ))
475            } else {
476                // With no memory pressure monitoring, just stub the stream out as always pending.
477                fasync::Task::spawn(self.clone().background_task(config, stream::pending(), rx))
478            };
479
480            *background_task = Some((task, tx));
481        }
482    }
483
484    #[trace]
485    async fn background_task(
486        self: Arc<Self>,
487        config: MemoryPressureConfig,
488        mut level_stream: impl Stream<Item = MemoryPressureLevel> + FusedStream + Unpin,
489        terminate: oneshot::Receiver<()>,
490    ) {
491        debug!(store_id = self.store.store_object_id(); "FxVolume::background_task start");
492        let mut terminate = terminate.fuse();
493        // Default to the normal period until updates come from the `level_stream`.
494        let mut level = MemoryPressureLevel::Normal;
495        let mut timer =
496            pin!(fasync::Timer::new(config.for_level(&level).background_task_initial_delay));
497
498        loop {
499            let mut should_terminate = false;
500            let mut should_flush = false;
501            let mut low_mem = false;
502            let mut should_purge_layer_files = false;
503            let mut should_update_cache_limit = false;
504
505            futures::select_biased! {
506                _ = terminate => should_terminate = true,
507                new_level = level_stream.next() => {
508                    // Because `level_stream` will never terminate, this is safe to unwrap.
509                    let new_level = new_level.unwrap();
510                    // At critical levels, it's okay to undertake expensive work immediately
511                    // to reclaim memory.
512                    low_mem = matches!(new_level, MemoryPressureLevel::Critical);
513                    should_purge_layer_files = true;
514                    if new_level != level {
515                        level = new_level;
516                        should_update_cache_limit = true;
517                        let level_config = config.for_level(&level);
518                        self.read_ahead_size.store(level_config.read_ahead_size, Ordering::Relaxed);
519                        timer.as_mut().reset(fasync::MonotonicInstant::after(
520                            level_config.background_task_period.into())
521                        );
522                        debug!(
523                            "Background task period changed to {:?} due to new memory pressure \
524                            level ({:?}).",
525                            config.for_level(&level).background_task_period, level
526                        );
527                    }
528                }
529                _ = timer => {
530                    timer.as_mut().reset(fasync::MonotonicInstant::after(
531                        config.for_level(&level).background_task_period.into())
532                    );
533                    should_flush = true;
534                    // Only purge layer file caches once we have elevated memory pressure.
535                    should_purge_layer_files = !matches!(level, MemoryPressureLevel::Normal);
536                }
537            };
538            if should_terminate {
539                break;
540            }
541            // Maybe close extra files *before* iterating them for flush/low mem.
542            if should_update_cache_limit {
543                self.dirent_cache.set_limit(config.for_level(&level).cache_size_limit);
544            }
545            if should_flush {
546                self.flush_all_files(false).await;
547                self.dirent_cache.recycle_stale_files();
548            } else if low_mem {
549                // This is a softer version of flushing files, so don't bother if we're flushing.
550                self.minimize_memory().await;
551            }
552            if should_purge_layer_files {
553                for layer in self.store.tree().immutable_layer_set().layers {
554                    layer.purge_cached_data();
555                }
556            }
557        }
558        debug!(store_id = self.store.store_object_id(); "FxVolume::background_task end");
559    }
560
561    /// Reports that a certain number of bytes will be dirtied in a pager-backed VMO.
562    ///
563    /// Note that this function may await flush tasks.
564    pub fn report_pager_dirty(
565        self: Arc<Self>,
566        byte_count: u64,
567        mark_dirty: impl FnOnce() + Send + 'static,
568    ) {
569        if let Some(parent) = self.parent.upgrade() {
570            parent.report_pager_dirty(byte_count, self, mark_dirty);
571        } else {
572            mark_dirty();
573        }
574    }
575
576    /// Reports that a certain number of bytes were cleaned in a pager-backed VMO.
577    pub fn report_pager_clean(&self, byte_count: u64) {
578        if let Some(parent) = self.parent.upgrade() {
579            parent.report_pager_clean(byte_count);
580        }
581    }
582
583    #[trace]
584    pub async fn flush_all_files(&self, last_chance: bool) {
585        let mut flushed = 0;
586        for file in self.cache.files() {
587            if let Some(node) = file.into_opened_node() {
588                if let Err(e) = FxFile::flush(&node, last_chance).await {
589                    warn!(
590                        store_id = self.store.store_object_id(),
591                        oid = node.object_id(),
592                        error:? = e;
593                        "Failed to flush",
594                    )
595                }
596                if last_chance {
597                    let file = node.clone();
598                    std::mem::drop(node);
599                    file.force_clean();
600                }
601            }
602            flushed += 1;
603        }
604        debug!(store_id = self.store.store_object_id(), file_count = flushed; "FxVolume flushed");
605    }
606
607    /// Flushes only files with dirty pages.
608    ///
609    /// `PagedObjectHandle` tracks the number dirty pages locally (except for overwrite files) which
610    /// makes determining whether flushing a file will reduce the number of dirty pages efficient.
611    /// `flush_all_files` checks if any metadata needs to be flushed which involves a syscall making
612    /// it significantly slower when there are lots of open files without dirty pages.
613    #[trace]
614    pub async fn minimize_memory(&self) {
615        for file in self.cache.files() {
616            if let Some(node) = file.into_opened_node() {
617                if let Err(e) = node.handle().minimize_memory().await {
618                    warn!(
619                        store_id = self.store.store_object_id(),
620                        oid = node.object_id(),
621                        error:? = e;
622                        "Failed to flush",
623                    )
624                }
625            }
626        }
627    }
628
629    /// Spawns a short term task for the volume that includes a guard that will prevent termination.
630    pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) {
631        if let Some(guard) = self.scope.try_active_guard() {
632            self.executor.spawn_detached(FutureWithGuard::new(guard, task));
633        }
634    }
635
636    /// Returns the current read-ahead size that should be used based on the current memory-pressure
637    /// level.
638    pub fn read_ahead_size(&self) -> u64 {
639        self.read_ahead_size.load(Ordering::Relaxed)
640    }
641
642    /// Tries to unwrap this volume.  If it fails, it will poison the volume so that when it is
643    /// dropped, you get a backtrace.
644    #[cfg(any(test, feature = "testing"))]
645    pub fn try_unwrap(self: Arc<Self>) -> Option<FxVolume> {
646        self.poisoned.store(true, Ordering::Relaxed);
647        match Arc::try_unwrap(self) {
648            Ok(volume) => {
649                volume.poisoned.store(false, Ordering::Relaxed);
650                Some(volume)
651            }
652            Err(this) => {
653                // Log details about all the places where there might be a reference cycle.
654                info!(
655                    "background_task: {}, profile_state: {}, dirent_cache count: {}, \
656                     pager strong file refs={}, no tasks={}",
657                    this.background_task.lock().is_some(),
658                    this.profile_state.lock().is_some(),
659                    this.dirent_cache.len(),
660                    crate::pager::STRONG_FILE_REFS.load(Ordering::Relaxed),
661                    {
662                        let mut no_tasks = pin!(this.scope.wait());
663                        no_tasks
664                            .poll_unpin(&mut std::task::Context::from_waker(
665                                std::task::Waker::noop(),
666                            ))
667                            .is_ready()
668                    },
669                );
670                None
671            }
672        }
673    }
674
675    pub async fn handle_file_backed_volume_provider_requests(
676        this: Weak<Self>,
677        scope: ExecutionScope,
678        mut requests: FileBackedVolumeProviderRequestStream,
679    ) -> Result<(), Error> {
680        while let Some(request) = requests.try_next().await? {
681            match request {
682                FileBackedVolumeProviderRequest::Open {
683                    parent_directory_token,
684                    name,
685                    server_end,
686                    control_handle: _,
687                } => {
688                    // Try and get an active guard before upgrading.
689                    let Some(_guard) = scope.try_active_guard() else {
690                        bail!("Volume shutting down")
691                    };
692                    let Some(this) = this.upgrade() else { bail!("FxVolume dropped") };
693                    match this
694                        .scope
695                        .token_registry()
696                        // NB: For now, we only expect these calls in a regular (non-blob) volume.
697                        // Hard-code the type for simplicity; attempts to call on a blob volume will
698                        // get an error.
699                        .get_owner(parent_directory_token)
700                        .and_then(|dir| {
701                            dir.ok_or(zx::Status::BAD_HANDLE).and_then(|dir| {
702                                dir.into_any()
703                                    .downcast::<FxDirectory>()
704                                    .map_err(|_| zx::Status::BAD_HANDLE)
705                            })
706                        }) {
707                        Ok(dir) => {
708                            dir.open_block_file(&name, server_end).await;
709                        }
710                        Err(status) => {
711                            let _ = server_end.close_with_epitaph(status).unwrap_or_else(|e| {
712                                error!(error:? = e; "open failed to send epitaph");
713                            });
714                        }
715                    }
716                }
717            }
718        }
719        Ok(())
720    }
721
722    pub async fn handle_project_id_requests(
723        this: Weak<Self>,
724        scope: ExecutionScope,
725        mut requests: ProjectIdRequestStream,
726    ) -> Result<(), Error> {
727        while let Some(request) = requests.try_next().await? {
728            // Try and get an active guard before upgrading.
729            let Some(_guard) = scope.try_active_guard() else { bail!("Volume shutting down") };
730            let Some(this) = this.upgrade() else { bail!("FxVolume dropped") };
731            let store_id = this.store.store_object_id();
732
733            match request {
734                ProjectIdRequest::SetLimit { responder, project_id, bytes, nodes } => responder
735                    .send(
736                    this.store().set_project_limit(project_id, bytes, nodes).await.map_err(
737                        |error| {
738                            error!(error:?, store_id, project_id; "Failed to set project limit");
739                            map_to_raw_status(error)
740                        },
741                    ),
742                )?,
743                ProjectIdRequest::Clear { responder, project_id } => responder.send(
744                    this.store().clear_project_limit(project_id).await.map_err(|error| {
745                        error!(error:?, store_id, project_id; "Failed to clear project limit");
746                        map_to_raw_status(error)
747                    }),
748                )?,
749                ProjectIdRequest::SetForNode { responder, node_id, project_id } => {
750                    responder
751                        .send(this.store().set_project_for_node(node_id, project_id).await.map_err(
752                        |error| {
753                            error!(error:?, store_id, node_id, project_id; "Failed to apply node.");
754                            map_to_raw_status(error)
755                        },
756                    ))?
757                }
758                ProjectIdRequest::GetForNode { responder, node_id } => responder.send(
759                    this.store().get_project_for_node(node_id).await.map_err(|error| {
760                        error!(error:?, store_id, node_id; "Failed to get node.");
761                        map_to_raw_status(error)
762                    }),
763                )?,
764                ProjectIdRequest::ClearForNode { responder, node_id } => responder.send(
765                    this.store().clear_project_for_node(node_id).await.map_err(|error| {
766                        error!(error:?, store_id, node_id; "Failed to clear for node.");
767                        map_to_raw_status(error)
768                    }),
769                )?,
770                ProjectIdRequest::List { responder, token } => {
771                    responder.send(match this.list_projects(&token).await {
772                        Ok((ref entries, ref next_token)) => Ok((entries, next_token.as_ref())),
773                        Err(error) => {
774                            error!(error:?, store_id, token:?; "Failed to list projects.");
775                            Err(map_to_raw_status(error))
776                        }
777                    })?
778                }
779                ProjectIdRequest::Info { responder, project_id } => {
780                    responder.send(match this.project_info(project_id).await {
781                        Ok((ref limit, ref usage)) => Ok((limit, usage)),
782                        Err(error) => {
783                            error!(error:?, store_id, project_id; "Failed to get project info.");
784                            Err(map_to_raw_status(error))
785                        }
786                    })?
787                }
788            }
789        }
790        Ok(())
791    }
792
793    // Maximum entries to fit based on 64KiB message size minus 16 bytes of header, 16 bytes
794    // of vector header, 16 bytes for the optional token header, and 8 bytes of token value.
795    // https://fuchsia.dev/fuchsia-src/development/languages/fidl/guides/max-out-pagination
796    const MAX_PROJECT_ENTRIES: usize = 8184;
797
798    // Calls out to the inner volume to list available projects, removing and re-adding the fidl
799    // wrapper types for the pagination token.
800    async fn list_projects(
801        &self,
802        last_token: &Option<Box<ProjectIterToken>>,
803    ) -> Result<(Vec<u64>, Option<ProjectIterToken>), Error> {
804        let (entries, token) = self
805            .store()
806            .list_projects(
807                match last_token {
808                    None => 0,
809                    Some(v) => v.value,
810                },
811                Self::MAX_PROJECT_ENTRIES,
812            )
813            .await?;
814        Ok((entries, token.map(|value| ProjectIterToken { value })))
815    }
816
817    async fn project_info(&self, project_id: u64) -> Result<(BytesAndNodes, BytesAndNodes), Error> {
818        let (limit, usage) = self.store().project_info(project_id).await?;
819        // At least one of them needs to be around to return anything.
820        ensure!(limit.is_some() || usage.is_some(), FxfsError::NotFound);
821        Ok((
822            limit.map_or_else(
823                || BytesAndNodes { bytes: u64::MAX, nodes: u64::MAX },
824                |v| BytesAndNodes { bytes: v.0, nodes: v.1 },
825            ),
826            usage.map_or_else(
827                || BytesAndNodes { bytes: 0, nodes: 0 },
828                |v| BytesAndNodes { bytes: v.0, nodes: v.1 },
829            ),
830        ))
831    }
832}
833
834#[cfg(any(test, feature = "testing"))]
835impl Drop for FxVolume {
836    fn drop(&mut self) {
837        assert!(!*self.poisoned.get_mut());
838    }
839}
840
841impl HandleOwner for FxVolume {}
842
843impl AsRef<ObjectStore> for FxVolume {
844    fn as_ref(&self) -> &ObjectStore {
845        &self.store
846    }
847}
848
849#[async_trait]
850impl FsInspectVolume for FxVolume {
851    async fn get_volume_data(&self) -> VolumeData {
852        let object_count = self.store().object_count();
853        let (used_bytes, bytes_limit) =
854            self.store.filesystem().allocator().owner_allocation_info(self.store.store_object_id());
855        let encrypted = self.store().crypt().is_some();
856        let port_koid = fasync::EHandle::local().port().as_handle_ref().koid().unwrap().raw_koid();
857        VolumeData { bytes_limit, used_bytes, used_nodes: object_count, encrypted, port_koid }
858    }
859}
860
861pub trait RootDir: FxNode + DirectoryEntry {
862    fn as_directory_entry(self: Arc<Self>) -> Arc<dyn DirectoryEntry>;
863
864    fn serve(self: Arc<Self>, flags: fio::Flags, server_end: ServerEnd<fio::DirectoryMarker>);
865
866    fn as_node(self: Arc<Self>) -> Arc<dyn FxNode>;
867
868    fn register_additional_volume_services(
869        self: Arc<Self>,
870        _svc_dir: &Simple,
871    ) -> Result<(), Error> {
872        Ok(())
873    }
874}
875
876#[derive(Clone)]
877pub struct FxVolumeAndRoot {
878    volume: Arc<FxVolume>,
879    root: Arc<dyn RootDir>,
880
881    // This is used for service connections and anything that isn't the actual volume.
882    admin_scope: ExecutionScope,
883
884    // The outgoing directory that the volume might be served on.
885    outgoing_dir: Arc<Simple>,
886}
887
888impl FxVolumeAndRoot {
889    pub async fn new<T: From<Directory<FxVolume>> + RootDir>(
890        parent: Weak<VolumesDirectory>,
891        store: Arc<ObjectStore>,
892        unique_id: u64,
893        blob_resupplied_count: Arc<PageRefaultCounter>,
894        memory_pressure_config: MemoryPressureConfig,
895    ) -> Result<Self, Error> {
896        let volume = Arc::new(FxVolume::new(
897            parent,
898            store,
899            unique_id,
900            blob_resupplied_count.clone(),
901            memory_pressure_config,
902        )?);
903        let root_object_id = volume.store().root_directory_object_id();
904        let root_dir = Directory::open(&volume, root_object_id).await?;
905        let root = Arc::<T>::new(root_dir.into()) as Arc<dyn RootDir>;
906        volume
907            .cache
908            .get_or_reserve(root_object_id)
909            .await
910            .placeholder()
911            .unwrap()
912            .commit(&root.clone().as_node());
913        Ok(Self {
914            volume,
915            root,
916            admin_scope: ExecutionScope::new(),
917            outgoing_dir: vfs::directory::immutable::simple(),
918        })
919    }
920
921    pub fn volume(&self) -> &Arc<FxVolume> {
922        &self.volume
923    }
924
925    pub fn root(&self) -> &Arc<dyn RootDir> {
926        &self.root
927    }
928
929    pub fn admin_scope(&self) -> &ExecutionScope {
930        &self.admin_scope
931    }
932
933    pub fn outgoing_dir(&self) -> &Arc<Simple> {
934        &self.outgoing_dir
935    }
936
937    // The same as root but downcasted to FxDirectory.
938    pub fn root_dir(&self) -> Arc<FxDirectory> {
939        self.root().clone().into_any().downcast::<FxDirectory>().expect("Invalid type for root")
940    }
941
942    pub fn into_volume(self) -> Arc<FxVolume> {
943        self.volume
944    }
945}
946
947// The correct number here is arguably u64::MAX - 1 (because node 0 is reserved). There's a bug
948// where inspect test cases fail if we try and use that, possibly because of a signed/unsigned bug.
949// See https://fxbug.dev/42168242.  Until that's fixed, we'll have to use i64::MAX.
950const TOTAL_NODES: u64 = i64::MAX as u64;
951
952// An array used to initialize the FilesystemInfo |name| field. This just spells "fxfs" 0-padded to
953// 32 bytes.
954const FXFS_INFO_NAME_FIDL: [i8; 32] = [
955    0x66, 0x78, 0x66, 0x73, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
956    0, 0, 0, 0,
957];
958
959fn info_to_filesystem_info(
960    info: filesystem::Info,
961    block_size: u64,
962    object_count: u64,
963    fs_id: u64,
964) -> fio::FilesystemInfo {
965    fio::FilesystemInfo {
966        total_bytes: info.total_bytes,
967        used_bytes: info.used_bytes,
968        total_nodes: TOTAL_NODES,
969        used_nodes: object_count,
970        // TODO(https://fxbug.dev/42175592): Support free_shared_pool_bytes.
971        free_shared_pool_bytes: 0,
972        fs_id,
973        block_size: block_size as u32,
974        max_filename_size: fio::MAX_NAME_LENGTH as u32,
975        fs_type: fidl_fuchsia_fs::VfsType::Fxfs.into_primitive(),
976        padding: 0,
977        name: FXFS_INFO_NAME_FIDL,
978    }
979}
980
981#[cfg(test)]
982mod tests {
983    use super::DIRENT_CACHE_LIMIT;
984    use crate::fuchsia::file::FxFile;
985    use crate::fuchsia::fxblob::testing::{self as blob_testing, BlobFixture};
986    use crate::fuchsia::memory_pressure::MemoryPressureLevel;
987    use crate::fuchsia::pager::PagerBacked;
988    use crate::fuchsia::profile::{RECORDED, new_profile_state};
989    use crate::fuchsia::testing::{
990        TestFixture, TestFixtureOptions, close_dir_checked, close_file_checked, open_dir,
991        open_dir_checked, open_file, open_file_checked,
992    };
993    use crate::fuchsia::volume::{
994        BASE_READ_AHEAD_SIZE, FxVolume, MemoryPressureConfig, MemoryPressureLevelConfig,
995    };
996    use crate::fuchsia::volumes_directory::VolumesDirectory;
997    use crate::volume::MAX_READ_AHEAD_SIZE;
998    use delivery_blob::CompressionMode;
999    use fidl_fuchsia_fxfs::{BytesAndNodes, ProjectIdMarker};
1000    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
1001    use fuchsia_fs::file;
1002    use fxfs::filesystem::{FxFilesystem, FxFilesystemBuilder};
1003    use fxfs::fsck::{fsck, fsck_volume};
1004    use fxfs::object_store::directory::replace_child;
1005    use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
1006    use fxfs::object_store::volume::root_volume;
1007    use fxfs::object_store::{HandleOptions, ObjectDescriptor, ObjectStore, StoreOptions};
1008    use fxfs_crypt_common::CryptBase;
1009    use fxfs_crypto::WrappingKeyId;
1010    use fxfs_insecure_crypto::new_insecure_crypt;
1011    use refaults_vmo::PageRefaultCounter;
1012    use std::sync::atomic::Ordering;
1013    use std::sync::{Arc, Weak};
1014    use std::time::Duration;
1015    use storage_device::DeviceHolder;
1016    use storage_device::fake_device::FakeDevice;
1017    use zx::Status;
1018    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
1019
1020    const WRAPPING_KEY_ID: WrappingKeyId = u128::to_le_bytes(123);
1021
1022    #[fuchsia::test(threads = 10)]
1023    async fn test_rename_different_dirs() {
1024        use zx::Event;
1025
1026        let fixture = TestFixture::new().await;
1027        let root = fixture.root();
1028
1029        let src = open_dir_checked(
1030            &root,
1031            "foo",
1032            fio::Flags::FLAG_MAYBE_CREATE
1033                | fio::PERM_READABLE
1034                | fio::PERM_WRITABLE
1035                | fio::Flags::PROTOCOL_DIRECTORY,
1036            Default::default(),
1037        )
1038        .await;
1039
1040        let dst = open_dir_checked(
1041            &root,
1042            "bar",
1043            fio::Flags::FLAG_MAYBE_CREATE
1044                | fio::PERM_READABLE
1045                | fio::PERM_WRITABLE
1046                | fio::Flags::PROTOCOL_DIRECTORY,
1047            Default::default(),
1048        )
1049        .await;
1050
1051        let f = open_file_checked(
1052            &root,
1053            "foo/a",
1054            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1055            &Default::default(),
1056        )
1057        .await;
1058        close_file_checked(f).await;
1059
1060        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1061        Status::ok(status).expect("get_token failed");
1062        src.rename("a", Event::from(dst_token.unwrap()), "b")
1063            .await
1064            .expect("FIDL call failed")
1065            .expect("rename failed");
1066
1067        assert_eq!(
1068            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1069                .await
1070                .expect_err("Open succeeded")
1071                .root_cause()
1072                .downcast_ref::<Status>()
1073                .expect("No status"),
1074            &Status::NOT_FOUND,
1075        );
1076        let f =
1077            open_file_checked(&root, "bar/b", fio::Flags::PROTOCOL_FILE, &Default::default()).await;
1078        close_file_checked(f).await;
1079
1080        close_dir_checked(dst).await;
1081        close_dir_checked(src).await;
1082        fixture.close().await;
1083    }
1084
1085    #[fuchsia::test(threads = 10)]
1086    async fn test_rename_same_dir() {
1087        use zx::Event;
1088        let fixture = TestFixture::new().await;
1089        let root = fixture.root();
1090
1091        let src = open_dir_checked(
1092            &root,
1093            "foo",
1094            fio::Flags::FLAG_MAYBE_CREATE
1095                | fio::PERM_READABLE
1096                | fio::PERM_WRITABLE
1097                | fio::Flags::PROTOCOL_DIRECTORY,
1098            Default::default(),
1099        )
1100        .await;
1101
1102        let f = open_file_checked(
1103            &root,
1104            "foo/a",
1105            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1106            &Default::default(),
1107        )
1108        .await;
1109        close_file_checked(f).await;
1110
1111        let (status, src_token) = src.get_token().await.expect("FIDL call failed");
1112        Status::ok(status).expect("get_token failed");
1113        src.rename("a", Event::from(src_token.unwrap()), "b")
1114            .await
1115            .expect("FIDL call failed")
1116            .expect("rename failed");
1117
1118        assert_eq!(
1119            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1120                .await
1121                .expect_err("Open succeeded")
1122                .root_cause()
1123                .downcast_ref::<Status>()
1124                .expect("No status"),
1125            &Status::NOT_FOUND,
1126        );
1127        let f =
1128            open_file_checked(&root, "foo/b", fio::Flags::PROTOCOL_FILE, &Default::default()).await;
1129        close_file_checked(f).await;
1130
1131        close_dir_checked(src).await;
1132        fixture.close().await;
1133    }
1134
1135    #[fuchsia::test(threads = 10)]
1136    async fn test_rename_overwrites_file() {
1137        use zx::Event;
1138        let fixture = TestFixture::new().await;
1139        let root = fixture.root();
1140
1141        let src = open_dir_checked(
1142            &root,
1143            "foo",
1144            fio::Flags::FLAG_MAYBE_CREATE
1145                | fio::PERM_READABLE
1146                | fio::PERM_WRITABLE
1147                | fio::Flags::PROTOCOL_DIRECTORY,
1148            Default::default(),
1149        )
1150        .await;
1151
1152        let dst = open_dir_checked(
1153            &root,
1154            "bar",
1155            fio::Flags::FLAG_MAYBE_CREATE
1156                | fio::PERM_READABLE
1157                | fio::PERM_WRITABLE
1158                | fio::Flags::PROTOCOL_DIRECTORY,
1159            Default::default(),
1160        )
1161        .await;
1162
1163        // The src file is non-empty.
1164        let src_file = open_file_checked(
1165            &root,
1166            "foo/a",
1167            fio::Flags::FLAG_MAYBE_CREATE
1168                | fio::PERM_READABLE
1169                | fio::PERM_WRITABLE
1170                | fio::Flags::PROTOCOL_FILE,
1171            &Default::default(),
1172        )
1173        .await;
1174        let buf = vec![0xaa as u8; 8192];
1175        file::write(&src_file, buf.as_slice()).await.expect("Failed to write to file");
1176        close_file_checked(src_file).await;
1177
1178        // The dst file is empty (so we can distinguish it).
1179        let f = open_file_checked(
1180            &root,
1181            "bar/b",
1182            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1183            &Default::default(),
1184        )
1185        .await;
1186        close_file_checked(f).await;
1187
1188        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1189        Status::ok(status).expect("get_token failed");
1190        src.rename("a", Event::from(dst_token.unwrap()), "b")
1191            .await
1192            .expect("FIDL call failed")
1193            .expect("rename failed");
1194
1195        assert_eq!(
1196            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1197                .await
1198                .expect_err("Open succeeded")
1199                .root_cause()
1200                .downcast_ref::<Status>()
1201                .expect("No status"),
1202            &Status::NOT_FOUND,
1203        );
1204        let file = open_file_checked(
1205            &root,
1206            "bar/b",
1207            fio::PERM_READABLE | fio::Flags::PROTOCOL_FILE,
1208            &Default::default(),
1209        )
1210        .await;
1211        let buf = file::read(&file).await.expect("read file failed");
1212        assert_eq!(buf, vec![0xaa as u8; 8192]);
1213        close_file_checked(file).await;
1214
1215        close_dir_checked(dst).await;
1216        close_dir_checked(src).await;
1217        fixture.close().await;
1218    }
1219
1220    #[fuchsia::test(threads = 10)]
1221    async fn test_rename_overwrites_dir() {
1222        use zx::Event;
1223        let fixture = TestFixture::new().await;
1224        let root = fixture.root();
1225
1226        let src = open_dir_checked(
1227            &root,
1228            "foo",
1229            fio::Flags::FLAG_MAYBE_CREATE
1230                | fio::PERM_READABLE
1231                | fio::PERM_WRITABLE
1232                | fio::Flags::PROTOCOL_DIRECTORY,
1233            Default::default(),
1234        )
1235        .await;
1236
1237        let dst = open_dir_checked(
1238            &root,
1239            "bar",
1240            fio::Flags::FLAG_MAYBE_CREATE
1241                | fio::PERM_READABLE
1242                | fio::PERM_WRITABLE
1243                | fio::Flags::PROTOCOL_DIRECTORY,
1244            Default::default(),
1245        )
1246        .await;
1247
1248        // The src dir is non-empty.
1249        open_dir_checked(
1250            &root,
1251            "foo/a",
1252            fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
1253            Default::default(),
1254        )
1255        .await;
1256        open_file_checked(
1257            &root,
1258            "foo/a/file",
1259            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1260            &Default::default(),
1261        )
1262        .await;
1263        open_dir_checked(
1264            &root,
1265            "bar/b",
1266            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_DIRECTORY,
1267            Default::default(),
1268        )
1269        .await;
1270
1271        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1272        Status::ok(status).expect("get_token failed");
1273        src.rename("a", Event::from(dst_token.unwrap()), "b")
1274            .await
1275            .expect("FIDL call failed")
1276            .expect("rename failed");
1277
1278        assert_eq!(
1279            open_dir(&root, "foo/a", fio::Flags::PROTOCOL_DIRECTORY, &Default::default())
1280                .await
1281                .expect_err("Open succeeded")
1282                .root_cause()
1283                .downcast_ref::<Status>()
1284                .expect("No status"),
1285            &Status::NOT_FOUND,
1286        );
1287        let f =
1288            open_file_checked(&root, "bar/b/file", fio::Flags::PROTOCOL_FILE, &Default::default())
1289                .await;
1290        close_file_checked(f).await;
1291
1292        close_dir_checked(dst).await;
1293        close_dir_checked(src).await;
1294
1295        fixture.close().await;
1296    }
1297
1298    #[fuchsia::test]
1299    async fn test_background_flush() {
1300        let fixture = TestFixture::new().await;
1301        {
1302            let file = open_file_checked(
1303                fixture.root(),
1304                "file",
1305                fio::Flags::FLAG_MAYBE_CREATE
1306                    | fio::PERM_READABLE
1307                    | fio::PERM_WRITABLE
1308                    | fio::Flags::PROTOCOL_FILE,
1309                &Default::default(),
1310            )
1311            .await;
1312            let object_id = file
1313                .get_attributes(fio::NodeAttributesQuery::ID)
1314                .await
1315                .expect("Fidl get attr")
1316                .expect("get attr")
1317                .1
1318                .id
1319                .unwrap();
1320
1321            // Write some data to the file, which will only go to the cache for now.
1322            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1323
1324            // Initialized to the default size.
1325            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1326            let volume = fixture.volume().volume().clone();
1327
1328            let data_has_persisted = || async {
1329                // We have to reopen the object each time since this is a distinct handle from the
1330                // one managed by the FxFile.
1331                let object =
1332                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1333                        .await
1334                        .expect("open_object failed");
1335                let data = object.contents(8192).await.expect("read failed");
1336                data.len() == 1 && data[..] == [123u8]
1337            };
1338            assert!(!data_has_persisted().await);
1339
1340            fixture.volume().volume().start_background_task(
1341                MemoryPressureConfig {
1342                    mem_normal: MemoryPressureLevelConfig {
1343                        background_task_period: Duration::from_millis(100),
1344                        background_task_initial_delay: Duration::from_millis(100),
1345                        ..Default::default()
1346                    },
1347                    mem_warning: Default::default(),
1348                    mem_critical: Default::default(),
1349                },
1350                None,
1351            );
1352
1353            const MAX_WAIT: Duration = Duration::from_secs(20);
1354            let wait_increments = Duration::from_millis(400);
1355            let mut total_waited = Duration::ZERO;
1356
1357            while total_waited < MAX_WAIT {
1358                fasync::Timer::new(wait_increments).await;
1359                total_waited += wait_increments;
1360
1361                if data_has_persisted().await {
1362                    break;
1363                }
1364            }
1365
1366            assert!(data_has_persisted().await);
1367        }
1368
1369        fixture.close().await;
1370    }
1371
1372    #[fuchsia::test(threads = 2)]
1373    async fn test_background_flush_with_warning_memory_pressure() {
1374        let fixture = TestFixture::new().await;
1375        {
1376            let file = open_file_checked(
1377                fixture.root(),
1378                "file",
1379                fio::Flags::FLAG_MAYBE_CREATE
1380                    | fio::PERM_READABLE
1381                    | fio::PERM_WRITABLE
1382                    | fio::Flags::PROTOCOL_FILE,
1383                &Default::default(),
1384            )
1385            .await;
1386            let object_id = file
1387                .get_attributes(fio::NodeAttributesQuery::ID)
1388                .await
1389                .expect("Fidl get attr")
1390                .expect("get attr")
1391                .1
1392                .id
1393                .unwrap();
1394
1395            // Write some data to the file, which will only go to the cache for now.
1396            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1397
1398            // Initialized to the default size.
1399            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1400            let volume = fixture.volume().volume().clone();
1401
1402            let data_has_persisted = || async {
1403                // We have to reopen the object each time since this is a distinct handle from the
1404                // one managed by the FxFile.
1405                let object =
1406                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1407                        .await
1408                        .expect("open_object failed");
1409                let data = object.contents(8192).await.expect("read failed");
1410                data.len() == 1 && data[..] == [123u8]
1411            };
1412            assert!(!data_has_persisted().await);
1413
1414            // Configure the flush task to only flush quickly on warning.
1415            let flush_config = MemoryPressureConfig {
1416                mem_normal: MemoryPressureLevelConfig {
1417                    background_task_period: Duration::from_secs(20),
1418                    cache_size_limit: DIRENT_CACHE_LIMIT,
1419                    ..Default::default()
1420                },
1421                mem_warning: MemoryPressureLevelConfig {
1422                    background_task_period: Duration::from_millis(100),
1423                    cache_size_limit: 100,
1424                    background_task_initial_delay: Duration::from_millis(100),
1425                    ..Default::default()
1426                },
1427                mem_critical: MemoryPressureLevelConfig {
1428                    background_task_period: Duration::from_secs(20),
1429                    cache_size_limit: 50,
1430                    ..Default::default()
1431                },
1432            };
1433            fixture.volume().volume().start_background_task(
1434                flush_config,
1435                fixture.volumes_directory().memory_pressure_monitor(),
1436            );
1437
1438            // Send the memory pressure update.
1439            fixture
1440                .memory_pressure_proxy()
1441                .on_level_changed(MemoryPressureLevel::Warning)
1442                .await
1443                .expect("Failed to send memory pressure level change");
1444
1445            // Wait a bit of time for the flush to occur (but less than the normal and critical
1446            // periods).
1447            const MAX_WAIT: Duration = Duration::from_secs(3);
1448            let wait_increments = Duration::from_millis(400);
1449            let mut total_waited = Duration::ZERO;
1450
1451            while total_waited < MAX_WAIT {
1452                fasync::Timer::new(wait_increments).await;
1453                total_waited += wait_increments;
1454
1455                if data_has_persisted().await {
1456                    break;
1457                }
1458            }
1459
1460            assert!(data_has_persisted().await);
1461            assert_eq!(fixture.volume().volume().dirent_cache().limit(), 100);
1462        }
1463
1464        fixture.close().await;
1465    }
1466
1467    #[fuchsia::test(threads = 2)]
1468    async fn test_background_flush_with_critical_memory_pressure() {
1469        let fixture = TestFixture::new().await;
1470        {
1471            let file = open_file_checked(
1472                fixture.root(),
1473                "file",
1474                fio::Flags::FLAG_MAYBE_CREATE
1475                    | fio::PERM_READABLE
1476                    | fio::PERM_WRITABLE
1477                    | fio::Flags::PROTOCOL_FILE,
1478                &Default::default(),
1479            )
1480            .await;
1481            let object_id = file
1482                .get_attributes(fio::NodeAttributesQuery::ID)
1483                .await
1484                .expect("Fidl get attr")
1485                .expect("get attr")
1486                .1
1487                .id
1488                .unwrap();
1489
1490            // Write some data to the file, which will only go to the cache for now.
1491            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1492
1493            // Initialized to the default size.
1494            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1495            let volume = fixture.volume().volume().clone();
1496
1497            let data_has_persisted = || async {
1498                // We have to reopen the object each time since this is a distinct handle from the
1499                // one managed by the FxFile.
1500                let object =
1501                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1502                        .await
1503                        .expect("open_object failed");
1504                let data = object.contents(8192).await.expect("read failed");
1505                data.len() == 1 && data[..] == [123u8]
1506            };
1507            assert!(!data_has_persisted().await);
1508
1509            let flush_config = MemoryPressureConfig {
1510                mem_normal: MemoryPressureLevelConfig {
1511                    cache_size_limit: DIRENT_CACHE_LIMIT,
1512                    ..Default::default()
1513                },
1514                mem_warning: MemoryPressureLevelConfig {
1515                    cache_size_limit: 100,
1516                    ..Default::default()
1517                },
1518                mem_critical: MemoryPressureLevelConfig {
1519                    cache_size_limit: 50,
1520                    ..Default::default()
1521                },
1522            };
1523            fixture.volume().volume().start_background_task(
1524                flush_config,
1525                fixture.volumes_directory().memory_pressure_monitor(),
1526            );
1527
1528            // Send the memory pressure update.
1529            fixture
1530                .memory_pressure_proxy()
1531                .on_level_changed(MemoryPressureLevel::Critical)
1532                .await
1533                .expect("Failed to send memory pressure level change");
1534
1535            // Critical memory should trigger a flush immediately so expect a flush very quickly.
1536            const MAX_WAIT: Duration = Duration::from_secs(2);
1537            let wait_increments = Duration::from_millis(400);
1538            let mut total_waited = Duration::ZERO;
1539
1540            while total_waited < MAX_WAIT {
1541                fasync::Timer::new(wait_increments).await;
1542                total_waited += wait_increments;
1543
1544                if data_has_persisted().await {
1545                    break;
1546                }
1547            }
1548
1549            assert!(data_has_persisted().await);
1550            assert_eq!(fixture.volume().volume().dirent_cache().limit(), 50);
1551        }
1552
1553        fixture.close().await;
1554    }
1555
1556    #[fuchsia::test(threads = 2)]
1557    async fn test_memory_pressure_signal_updates_read_ahead_size() {
1558        let fixture = TestFixture::new().await;
1559        {
1560            let flush_config = MemoryPressureConfig {
1561                mem_normal: MemoryPressureLevelConfig {
1562                    read_ahead_size: 12 * 1024,
1563                    ..Default::default()
1564                },
1565                mem_warning: MemoryPressureLevelConfig {
1566                    read_ahead_size: 8 * 1024,
1567                    ..Default::default()
1568                },
1569                mem_critical: MemoryPressureLevelConfig {
1570                    read_ahead_size: 4 * 1024,
1571                    ..Default::default()
1572                },
1573            };
1574            let volume = fixture.volume().volume().clone();
1575            volume.start_background_task(
1576                flush_config,
1577                fixture.volumes_directory().memory_pressure_monitor(),
1578            );
1579            let wait_for_read_ahead_to_change =
1580                async |level: MemoryPressureLevel, expected: u64| {
1581                    fixture
1582                        .memory_pressure_proxy()
1583                        .on_level_changed(level)
1584                        .await
1585                        .expect("Failed to send memory pressure level change");
1586                    const MAX_WAIT: Duration = Duration::from_secs(2);
1587                    let wait_increments = Duration::from_millis(400);
1588                    let mut total_waited = Duration::ZERO;
1589                    while total_waited < MAX_WAIT {
1590                        if volume.read_ahead_size() == expected {
1591                            break;
1592                        }
1593                        fasync::Timer::new(wait_increments).await;
1594                        total_waited += wait_increments;
1595                    }
1596                    assert_eq!(volume.read_ahead_size(), expected);
1597                };
1598            wait_for_read_ahead_to_change(MemoryPressureLevel::Critical, 4 * 1024).await;
1599            wait_for_read_ahead_to_change(MemoryPressureLevel::Warning, 8 * 1024).await;
1600            wait_for_read_ahead_to_change(MemoryPressureLevel::Normal, 12 * 1024).await;
1601            wait_for_read_ahead_to_change(MemoryPressureLevel::Critical, 4 * 1024).await;
1602        }
1603        fixture.close().await;
1604    }
1605
1606    #[fuchsia::test]
1607    async fn test_project_limit_persistence() {
1608        const BYTES_LIMIT_1: u64 = 123456;
1609        const NODES_LIMIT_1: u64 = 4321;
1610        const BYTES_LIMIT_2: u64 = 456789;
1611        const NODES_LIMIT_2: u64 = 9876;
1612        const VOLUME_NAME: &str = "A";
1613        const FILE_NAME: &str = "B";
1614        const PROJECT_ID: u64 = 42;
1615        const PROJECT_ID2: u64 = 343;
1616        let volume_store_id;
1617        let node_id;
1618        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
1619        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1620        {
1621            let blob_resupplied_count =
1622                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1623            let volumes_directory = VolumesDirectory::new(
1624                root_volume(filesystem.clone()).await.unwrap(),
1625                Weak::new(),
1626                None,
1627                blob_resupplied_count,
1628                MemoryPressureConfig::default(),
1629            )
1630            .await
1631            .unwrap();
1632
1633            let volume_and_root = volumes_directory
1634                .create_and_mount_volume(VOLUME_NAME, None, false, None)
1635                .await
1636                .expect("create unencrypted volume failed");
1637            volume_store_id = volume_and_root.volume().store().store_object_id();
1638
1639            let (volume_dir_proxy, dir_server_end) =
1640                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1641            volumes_directory
1642                .serve_volume(&volume_and_root, dir_server_end, false)
1643                .expect("serve_volume failed");
1644
1645            let project_proxy =
1646                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1647                    .expect("Unable to connect to project id service");
1648
1649            project_proxy
1650                .set_limit(0, BYTES_LIMIT_1, NODES_LIMIT_1)
1651                .await
1652                .unwrap()
1653                .expect_err("Should not set limits for project id 0");
1654
1655            project_proxy
1656                .set_limit(PROJECT_ID, BYTES_LIMIT_1, NODES_LIMIT_1)
1657                .await
1658                .unwrap()
1659                .expect("To set limits");
1660            {
1661                let BytesAndNodes { bytes, nodes } =
1662                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1663                assert_eq!(bytes, BYTES_LIMIT_1);
1664                assert_eq!(nodes, NODES_LIMIT_1);
1665            }
1666
1667            let file_proxy = {
1668                let (root_proxy, root_server_end) =
1669                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1670                volume_dir_proxy
1671                    .open(
1672                        "root",
1673                        fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
1674                        &Default::default(),
1675                        root_server_end.into_channel(),
1676                    )
1677                    .expect("Failed to open volume root");
1678
1679                open_file_checked(
1680                    &root_proxy,
1681                    FILE_NAME,
1682                    fio::Flags::FLAG_MAYBE_CREATE
1683                        | fio::PERM_READABLE
1684                        | fio::PERM_WRITABLE
1685                        | fio::Flags::PROTOCOL_FILE,
1686                    &Default::default(),
1687                )
1688                .await
1689            };
1690
1691            let (_, immutable_attributes) =
1692                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
1693            node_id = immutable_attributes.id.unwrap();
1694
1695            project_proxy
1696                .set_for_node(node_id, 0)
1697                .await
1698                .unwrap()
1699                .expect_err("Should not set 0 project id");
1700
1701            project_proxy
1702                .set_for_node(node_id, PROJECT_ID)
1703                .await
1704                .unwrap()
1705                .expect("Setting project on node");
1706
1707            project_proxy
1708                .set_limit(PROJECT_ID, BYTES_LIMIT_2, NODES_LIMIT_2)
1709                .await
1710                .unwrap()
1711                .expect("To set limits");
1712            {
1713                let BytesAndNodes { bytes, nodes } =
1714                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1715                assert_eq!(bytes, BYTES_LIMIT_2);
1716                assert_eq!(nodes, NODES_LIMIT_2);
1717            }
1718
1719            assert_eq!(
1720                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1721                PROJECT_ID
1722            );
1723
1724            volumes_directory.terminate().await;
1725            filesystem.close().await.expect("close filesystem failed");
1726        }
1727        device = filesystem.take_device().await;
1728        device.ensure_unique();
1729        device.reopen(false);
1730        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1731        {
1732            fsck(filesystem.clone()).await.expect("Fsck");
1733            fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
1734            let blob_resupplied_count =
1735                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1736            let volumes_directory = VolumesDirectory::new(
1737                root_volume(filesystem.clone()).await.unwrap(),
1738                Weak::new(),
1739                None,
1740                blob_resupplied_count,
1741                MemoryPressureConfig::default(),
1742            )
1743            .await
1744            .unwrap();
1745            let volume_and_root = volumes_directory
1746                .mount_volume(VOLUME_NAME, None, false)
1747                .await
1748                .expect("mount unencrypted volume failed");
1749
1750            let (volume_proxy, _scope) = crate::volumes_directory::serve_startup_volume_proxy(
1751                &volumes_directory,
1752                VOLUME_NAME,
1753            );
1754
1755            let project_proxy = {
1756                let (volume_dir_proxy, dir_server_end) =
1757                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1758                volumes_directory
1759                    .serve_volume(&volume_and_root, dir_server_end, false)
1760                    .expect("serve_volume failed");
1761
1762                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1763                    .expect("Unable to connect to project id service")
1764            };
1765
1766            let usage_bytes_and_nodes = {
1767                let (
1768                    BytesAndNodes { bytes: limit_bytes, nodes: limit_nodes },
1769                    usage_bytes_and_nodes,
1770                ) = project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info");
1771                assert_eq!(limit_bytes, BYTES_LIMIT_2);
1772                assert_eq!(limit_nodes, NODES_LIMIT_2);
1773                usage_bytes_and_nodes
1774            };
1775
1776            // Should be unable to clear the project limit, due to being in use.
1777            project_proxy.clear(PROJECT_ID).await.unwrap().expect("To clear limits");
1778
1779            assert_eq!(
1780                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1781                PROJECT_ID
1782            );
1783            project_proxy
1784                .set_for_node(node_id, PROJECT_ID2)
1785                .await
1786                .unwrap()
1787                .expect("Changing project");
1788            assert_eq!(
1789                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1790                PROJECT_ID2
1791            );
1792
1793            assert_eq!(
1794                project_proxy.info(PROJECT_ID).await.unwrap().expect_err("Expect missing limits"),
1795                Status::NOT_FOUND.into_raw()
1796            );
1797            assert_eq!(
1798                project_proxy.info(PROJECT_ID2).await.unwrap().expect("Fetching project info").1,
1799                usage_bytes_and_nodes
1800            );
1801
1802            std::mem::drop(volume_proxy);
1803            volumes_directory.terminate().await;
1804            std::mem::drop(volumes_directory);
1805            filesystem.close().await.expect("close filesystem failed");
1806        }
1807        device = filesystem.take_device().await;
1808        device.ensure_unique();
1809        device.reopen(false);
1810        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1811        fsck(filesystem.clone()).await.expect("Fsck");
1812        fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
1813        let blob_resupplied_count =
1814            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1815        let volumes_directory = VolumesDirectory::new(
1816            root_volume(filesystem.clone()).await.unwrap(),
1817            Weak::new(),
1818            None,
1819            blob_resupplied_count,
1820            MemoryPressureConfig::default(),
1821        )
1822        .await
1823        .unwrap();
1824        let volume_and_root = volumes_directory
1825            .mount_volume(VOLUME_NAME, None, false)
1826            .await
1827            .expect("mount unencrypted volume failed");
1828        let (volume_dir_proxy, dir_server_end) =
1829            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1830        volumes_directory
1831            .serve_volume(&volume_and_root, dir_server_end, false)
1832            .expect("serve_volume failed");
1833        let project_proxy = connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1834            .expect("Unable to connect to project id service");
1835        assert_eq!(
1836            project_proxy.info(PROJECT_ID).await.unwrap().expect_err("Expect missing limits"),
1837            Status::NOT_FOUND.into_raw()
1838        );
1839        volumes_directory.terminate().await;
1840        std::mem::drop(volumes_directory);
1841        filesystem.close().await.expect("close filesystem failed");
1842    }
1843
1844    #[fuchsia::test]
1845    async fn test_project_limit_accounting() {
1846        const BYTES_LIMIT: u64 = 123456;
1847        const NODES_LIMIT: u64 = 4321;
1848        const VOLUME_NAME: &str = "A";
1849        const FILE_NAME: &str = "B";
1850        const PROJECT_ID: u64 = 42;
1851        let volume_store_id;
1852        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
1853        let first_object_id;
1854        let mut bytes_usage;
1855        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1856        {
1857            let blob_resupplied_count =
1858                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1859            let volumes_directory = VolumesDirectory::new(
1860                root_volume(filesystem.clone()).await.unwrap(),
1861                Weak::new(),
1862                None,
1863                blob_resupplied_count,
1864                MemoryPressureConfig::default(),
1865            )
1866            .await
1867            .unwrap();
1868
1869            let volume_and_root = volumes_directory
1870                .create_and_mount_volume(
1871                    VOLUME_NAME,
1872                    Some(Arc::new(new_insecure_crypt())),
1873                    false,
1874                    None,
1875                )
1876                .await
1877                .expect("create unencrypted volume failed");
1878            volume_store_id = volume_and_root.volume().store().store_object_id();
1879
1880            let (volume_dir_proxy, dir_server_end) =
1881                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1882            volumes_directory
1883                .serve_volume(&volume_and_root, dir_server_end, false)
1884                .expect("serve_volume failed");
1885
1886            let project_proxy =
1887                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1888                    .expect("Unable to connect to project id service");
1889
1890            project_proxy
1891                .set_limit(PROJECT_ID, BYTES_LIMIT, NODES_LIMIT)
1892                .await
1893                .unwrap()
1894                .expect("To set limits");
1895            {
1896                let BytesAndNodes { bytes, nodes } =
1897                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1898                assert_eq!(bytes, BYTES_LIMIT);
1899                assert_eq!(nodes, NODES_LIMIT);
1900            }
1901
1902            let file_proxy = {
1903                let (root_proxy, root_server_end) =
1904                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1905                volume_dir_proxy
1906                    .open(
1907                        "root",
1908                        fio::PERM_READABLE | fio::PERM_WRITABLE,
1909                        &Default::default(),
1910                        root_server_end.into_channel(),
1911                    )
1912                    .expect("Failed to open volume root");
1913
1914                open_file_checked(
1915                    &root_proxy,
1916                    FILE_NAME,
1917                    fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
1918                    &Default::default(),
1919                )
1920                .await
1921            };
1922
1923            assert_eq!(
1924                8192,
1925                file_proxy
1926                    .write(&vec![0xff as u8; 8192])
1927                    .await
1928                    .expect("FIDL call failed")
1929                    .map_err(Status::from_raw)
1930                    .expect("File write was successful")
1931            );
1932            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
1933
1934            {
1935                let BytesAndNodes { bytes, nodes } =
1936                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
1937                assert_eq!(bytes, 0);
1938                assert_eq!(nodes, 0);
1939            }
1940
1941            let (_, immutable_attributes) =
1942                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
1943            let node_id = immutable_attributes.id.unwrap();
1944
1945            first_object_id = node_id;
1946            project_proxy
1947                .set_for_node(node_id, PROJECT_ID)
1948                .await
1949                .unwrap()
1950                .expect("Setting project on node");
1951
1952            bytes_usage = {
1953                let BytesAndNodes { bytes, nodes } =
1954                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
1955                assert!(bytes > 0);
1956                assert_eq!(nodes, 1);
1957                bytes
1958            };
1959
1960            // Grow the file by a block.
1961            assert_eq!(
1962                8192,
1963                file_proxy
1964                    .write(&vec![0xff as u8; 8192])
1965                    .await
1966                    .expect("FIDL call failed")
1967                    .map_err(Status::from_raw)
1968                    .expect("File write was successful")
1969            );
1970            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
1971            bytes_usage = {
1972                let BytesAndNodes { bytes, nodes } =
1973                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
1974                assert!(bytes > bytes_usage);
1975                assert_eq!(nodes, 1);
1976                bytes
1977            };
1978
1979            volumes_directory.terminate().await;
1980            filesystem.close().await.expect("close filesystem failed");
1981        }
1982        device = filesystem.take_device().await;
1983        device.ensure_unique();
1984        device.reopen(false);
1985        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1986        {
1987            fsck(filesystem.clone()).await.expect("Fsck");
1988            fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
1989                .await
1990                .expect("Fsck volume");
1991            let blob_resupplied_count =
1992                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1993            let volumes_directory = VolumesDirectory::new(
1994                root_volume(filesystem.clone()).await.unwrap(),
1995                Weak::new(),
1996                None,
1997                blob_resupplied_count,
1998                MemoryPressureConfig::default(),
1999            )
2000            .await
2001            .unwrap();
2002            let volume_and_root = volumes_directory
2003                .mount_volume(VOLUME_NAME, Some(Arc::new(new_insecure_crypt())), false)
2004                .await
2005                .expect("mount unencrypted volume failed");
2006
2007            let (root_proxy, project_proxy) = {
2008                let (volume_dir_proxy, dir_server_end) =
2009                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2010                volumes_directory
2011                    .serve_volume(&volume_and_root, dir_server_end, false)
2012                    .expect("serve_volume failed");
2013
2014                let (root_proxy, root_server_end) =
2015                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2016                volume_dir_proxy
2017                    .open(
2018                        "root",
2019                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2020                        &Default::default(),
2021                        root_server_end.into_channel(),
2022                    )
2023                    .expect("Failed to open volume root");
2024                let project_proxy = {
2025                    connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2026                        .expect("Unable to connect to project id service")
2027                };
2028                (root_proxy, project_proxy)
2029            };
2030
2031            {
2032                let BytesAndNodes { bytes, nodes } =
2033                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2034                assert_eq!(bytes, bytes_usage);
2035                assert_eq!(nodes, 1);
2036            }
2037
2038            assert_eq!(
2039                project_proxy
2040                    .get_for_node(first_object_id)
2041                    .await
2042                    .unwrap()
2043                    .expect("Checking project"),
2044                PROJECT_ID
2045            );
2046            root_proxy
2047                .unlink(FILE_NAME, &fio::UnlinkOptions::default())
2048                .await
2049                .expect("FIDL call failed")
2050                .expect("unlink failed");
2051            filesystem.graveyard().flush().await;
2052
2053            {
2054                let BytesAndNodes { bytes, nodes } =
2055                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2056                assert_eq!(bytes, 0);
2057                assert_eq!(nodes, 0);
2058            }
2059
2060            let file_proxy = open_file_checked(
2061                &root_proxy,
2062                FILE_NAME,
2063                fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2064                &Default::default(),
2065            )
2066            .await;
2067
2068            let (_, immutable_attributes) =
2069                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2070            let node_id = immutable_attributes.id.unwrap();
2071
2072            project_proxy
2073                .set_for_node(node_id, PROJECT_ID)
2074                .await
2075                .unwrap()
2076                .expect("Applying project");
2077
2078            bytes_usage = {
2079                let BytesAndNodes { bytes, nodes } =
2080                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2081                // Empty file should have less space than the non-empty file from above.
2082                assert!(bytes < bytes_usage);
2083                assert_eq!(nodes, 1);
2084                bytes
2085            };
2086
2087            assert_eq!(
2088                8192,
2089                file_proxy
2090                    .write(&vec![0xff as u8; 8192])
2091                    .await
2092                    .expect("FIDL call failed")
2093                    .map_err(Status::from_raw)
2094                    .expect("File write was successful")
2095            );
2096            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
2097            bytes_usage = {
2098                let BytesAndNodes { bytes, nodes } =
2099                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2100                assert!(bytes > bytes_usage);
2101                assert_eq!(nodes, 1);
2102                bytes
2103            };
2104
2105            // Trim to zero. Bytes should decrease.
2106            file_proxy.resize(0).await.expect("FIDL call failed").expect("Resize file");
2107            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
2108            {
2109                let BytesAndNodes { bytes, nodes } =
2110                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2111                assert!(bytes < bytes_usage);
2112                assert_eq!(nodes, 1);
2113            };
2114
2115            // Dropping node from project. Usage should go to zero.
2116            project_proxy
2117                .clear_for_node(node_id)
2118                .await
2119                .expect("FIDL call failed")
2120                .expect("Clear failed.");
2121            {
2122                let BytesAndNodes { bytes, nodes } =
2123                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2124                assert_eq!(bytes, 0);
2125                assert_eq!(nodes, 0);
2126            };
2127
2128            volumes_directory.terminate().await;
2129            filesystem.close().await.expect("close filesystem failed");
2130        }
2131        device = filesystem.take_device().await;
2132        device.ensure_unique();
2133        device.reopen(false);
2134        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2135        fsck(filesystem.clone()).await.expect("Fsck");
2136        fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
2137            .await
2138            .expect("Fsck volume");
2139        filesystem.close().await.expect("close filesystem failed");
2140    }
2141
2142    #[fuchsia::test]
2143    async fn test_project_node_inheritance() {
2144        const BYTES_LIMIT: u64 = 123456;
2145        const NODES_LIMIT: u64 = 4321;
2146        const VOLUME_NAME: &str = "A";
2147        const DIR_NAME: &str = "B";
2148        const SUBDIR_NAME: &str = "C";
2149        const FILE_NAME: &str = "D";
2150        const PROJECT_ID: u64 = 42;
2151        let volume_store_id;
2152        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
2153        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2154        {
2155            let blob_resupplied_count =
2156                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2157            let volumes_directory = VolumesDirectory::new(
2158                root_volume(filesystem.clone()).await.unwrap(),
2159                Weak::new(),
2160                None,
2161                blob_resupplied_count,
2162                MemoryPressureConfig::default(),
2163            )
2164            .await
2165            .unwrap();
2166
2167            let volume_and_root = volumes_directory
2168                .create_and_mount_volume(
2169                    VOLUME_NAME,
2170                    Some(Arc::new(new_insecure_crypt())),
2171                    false,
2172                    None,
2173                )
2174                .await
2175                .expect("create unencrypted volume failed");
2176            volume_store_id = volume_and_root.volume().store().store_object_id();
2177
2178            let (volume_dir_proxy, dir_server_end) =
2179                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2180            volumes_directory
2181                .serve_volume(&volume_and_root, dir_server_end, false)
2182                .expect("serve_volume failed");
2183
2184            let project_proxy =
2185                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2186                    .expect("Unable to connect to project id service");
2187
2188            project_proxy
2189                .set_limit(PROJECT_ID, BYTES_LIMIT, NODES_LIMIT)
2190                .await
2191                .unwrap()
2192                .expect("To set limits");
2193
2194            let dir_proxy = {
2195                let (root_proxy, root_server_end) =
2196                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2197                volume_dir_proxy
2198                    .open(
2199                        "root",
2200                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2201                        &Default::default(),
2202                        root_server_end.into_channel(),
2203                    )
2204                    .expect("Failed to open volume root");
2205
2206                open_dir_checked(
2207                    &root_proxy,
2208                    DIR_NAME,
2209                    fio::Flags::FLAG_MAYBE_CREATE
2210                        | fio::PERM_READABLE
2211                        | fio::PERM_WRITABLE
2212                        | fio::Flags::PROTOCOL_DIRECTORY,
2213                    Default::default(),
2214                )
2215                .await
2216            };
2217            {
2218                let (_, immutable_attributes) =
2219                    dir_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2220                let node_id = immutable_attributes.id.unwrap();
2221
2222                project_proxy
2223                    .set_for_node(node_id, PROJECT_ID)
2224                    .await
2225                    .unwrap()
2226                    .expect("Setting project on node");
2227            }
2228
2229            let subdir_proxy = open_dir_checked(
2230                &dir_proxy,
2231                SUBDIR_NAME,
2232                fio::Flags::FLAG_MAYBE_CREATE
2233                    | fio::PERM_READABLE
2234                    | fio::PERM_WRITABLE
2235                    | fio::Flags::PROTOCOL_DIRECTORY,
2236                Default::default(),
2237            )
2238            .await;
2239            {
2240                let (_, immutable_attributes) = subdir_proxy
2241                    .get_attributes(fio::NodeAttributesQuery::ID)
2242                    .await
2243                    .unwrap()
2244                    .unwrap();
2245                let node_id = immutable_attributes.id.unwrap();
2246
2247                assert_eq!(
2248                    project_proxy
2249                        .get_for_node(node_id)
2250                        .await
2251                        .unwrap()
2252                        .expect("Setting project on node"),
2253                    PROJECT_ID
2254                );
2255            }
2256
2257            let file_proxy = open_file_checked(
2258                &subdir_proxy,
2259                FILE_NAME,
2260                fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2261                &Default::default(),
2262            )
2263            .await;
2264            {
2265                let (_, immutable_attributes) =
2266                    file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2267                let node_id = immutable_attributes.id.unwrap();
2268
2269                assert_eq!(
2270                    project_proxy
2271                        .get_for_node(node_id)
2272                        .await
2273                        .unwrap()
2274                        .expect("Setting project on node"),
2275                    PROJECT_ID
2276                );
2277            }
2278
2279            // An unnamed temporary file is created slightly differently to a regular file object.
2280            // Just in case, check that it inherits project ID as well.
2281            let tmpfile_proxy = open_file_checked(
2282                &subdir_proxy,
2283                ".",
2284                fio::Flags::PROTOCOL_FILE
2285                    | fio::Flags::FLAG_CREATE_AS_UNNAMED_TEMPORARY
2286                    | fio::PERM_READABLE,
2287                &fio::Options::default(),
2288            )
2289            .await;
2290            {
2291                let (_, immutable_attributes) = tmpfile_proxy
2292                    .get_attributes(fio::NodeAttributesQuery::ID)
2293                    .await
2294                    .unwrap()
2295                    .unwrap();
2296                let node_id: u64 = immutable_attributes.id.unwrap();
2297                assert_eq!(
2298                    project_proxy
2299                        .get_for_node(node_id)
2300                        .await
2301                        .unwrap()
2302                        .expect("Setting project on node"),
2303                    PROJECT_ID
2304                );
2305            }
2306
2307            let BytesAndNodes { nodes, .. } =
2308                project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2309            assert_eq!(nodes, 3);
2310            volumes_directory.terminate().await;
2311            filesystem.close().await.expect("close filesystem failed");
2312        }
2313        device = filesystem.take_device().await;
2314        device.ensure_unique();
2315        device.reopen(false);
2316        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2317        fsck(filesystem.clone()).await.expect("Fsck");
2318        fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
2319            .await
2320            .expect("Fsck volume");
2321        filesystem.close().await.expect("close filesystem failed");
2322    }
2323
2324    #[fuchsia::test]
2325    async fn test_project_listing() {
2326        const VOLUME_NAME: &str = "A";
2327        const FILE_NAME: &str = "B";
2328        const NON_ZERO_PROJECT_ID: u64 = 3;
2329        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
2330        let volume_store_id;
2331        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2332        {
2333            let blob_resupplied_count =
2334                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2335            let volumes_directory = VolumesDirectory::new(
2336                root_volume(filesystem.clone()).await.unwrap(),
2337                Weak::new(),
2338                None,
2339                blob_resupplied_count,
2340                MemoryPressureConfig::default(),
2341            )
2342            .await
2343            .unwrap();
2344            let volume_and_root = volumes_directory
2345                .create_and_mount_volume(VOLUME_NAME, None, false, None)
2346                .await
2347                .expect("create unencrypted volume failed");
2348            volume_store_id = volume_and_root.volume().store().store_object_id();
2349
2350            let (volume_dir_proxy, dir_server_end) =
2351                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2352            volumes_directory
2353                .serve_volume(&volume_and_root, dir_server_end, false)
2354                .expect("serve_volume failed");
2355            let project_proxy =
2356                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2357                    .expect("Unable to connect to project id service");
2358            // This is just to ensure that the small numbers below can be used for this test.
2359            assert!(FxVolume::MAX_PROJECT_ENTRIES >= 4);
2360            // Create a bunch of proxies. 3 more than the limit to ensure pagination.
2361            let num_entries = u64::try_from(FxVolume::MAX_PROJECT_ENTRIES + 3).unwrap();
2362            for project_id in 1..=num_entries {
2363                project_proxy.set_limit(project_id, 1, 1).await.unwrap().expect("To set limits");
2364            }
2365
2366            // Add one usage entry to be interspersed with the limit entries. Verifies that the
2367            // iterator will progress passed it with no effect.
2368            let file_proxy = {
2369                let (root_proxy, root_server_end) =
2370                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2371                volume_dir_proxy
2372                    .open(
2373                        "root",
2374                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2375                        &Default::default(),
2376                        root_server_end.into_channel(),
2377                    )
2378                    .expect("Failed to open volume root");
2379
2380                open_file_checked(
2381                    &root_proxy,
2382                    FILE_NAME,
2383                    fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2384                    &Default::default(),
2385                )
2386                .await
2387            };
2388            let (_, immutable_attributes) =
2389                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2390            let node_id = immutable_attributes.id.unwrap();
2391            project_proxy
2392                .set_for_node(node_id, NON_ZERO_PROJECT_ID)
2393                .await
2394                .unwrap()
2395                .expect("Setting project on node");
2396            {
2397                let BytesAndNodes { nodes, .. } = project_proxy
2398                    .info(NON_ZERO_PROJECT_ID)
2399                    .await
2400                    .unwrap()
2401                    .expect("Fetching project info")
2402                    .1;
2403                assert_eq!(nodes, 1);
2404            }
2405
2406            // If this `unwrap()` fails, it is likely the MAX_PROJECT_ENTRIES is too large for fidl.
2407            let (mut entries, mut next_token) =
2408                project_proxy.list(None).await.unwrap().expect("To get project listing");
2409            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2410            assert!(next_token.is_some());
2411            assert!(entries.contains(&1));
2412            assert!(entries.contains(&3));
2413            assert!(!entries.contains(&num_entries));
2414            // Page two should have a small set at the end.
2415            (entries, next_token) = project_proxy
2416                .list(next_token.as_deref())
2417                .await
2418                .unwrap()
2419                .expect("To get project listing");
2420            assert_eq!(entries.len(), 3);
2421            assert!(next_token.is_none());
2422            assert!(entries.contains(&num_entries));
2423            assert!(!entries.contains(&1));
2424            assert!(!entries.contains(&3));
2425            // Delete a couple and list all again, but one has usage still.
2426            project_proxy.clear(1).await.unwrap().expect("Clear project");
2427            project_proxy.clear(3).await.unwrap().expect("Clear project");
2428            (entries, next_token) =
2429                project_proxy.list(None).await.unwrap().expect("To get project listing");
2430            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2431            assert!(next_token.is_some());
2432            assert!(!entries.contains(&num_entries));
2433            assert!(!entries.contains(&1));
2434            assert!(entries.contains(&3));
2435            (entries, next_token) = project_proxy
2436                .list(next_token.as_deref())
2437                .await
2438                .unwrap()
2439                .expect("To get project listing");
2440            assert_eq!(entries.len(), 2);
2441            assert!(next_token.is_none());
2442            assert!(entries.contains(&num_entries));
2443            // Delete two more to hit the edge case.
2444            project_proxy.clear(2).await.unwrap().expect("Clear project");
2445            project_proxy.clear(4).await.unwrap().expect("Clear project");
2446            (entries, next_token) =
2447                project_proxy.list(None).await.unwrap().expect("To get project listing");
2448            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2449            assert!(next_token.is_none());
2450            assert!(entries.contains(&num_entries));
2451            volumes_directory.terminate().await;
2452            filesystem.close().await.expect("close filesystem failed");
2453        }
2454        device = filesystem.take_device().await;
2455        device.ensure_unique();
2456        device.reopen(false);
2457        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2458        fsck(filesystem.clone()).await.expect("Fsck");
2459        fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
2460        filesystem.close().await.expect("close filesystem failed");
2461    }
2462
2463    #[fuchsia::test(threads = 10)]
2464    async fn test_profile_blob() {
2465        let mut hashes = Vec::new();
2466        let device = {
2467            let fixture = blob_testing::new_blob_fixture().await;
2468
2469            for i in 0..3u64 {
2470                let hash =
2471                    fixture.write_blob(i.to_le_bytes().as_slice(), CompressionMode::Never).await;
2472                hashes.push(hash);
2473            }
2474            fixture.close().await
2475        };
2476        device.ensure_unique();
2477
2478        device.reopen(false);
2479        let mut device = {
2480            let fixture = blob_testing::open_blob_fixture(device).await;
2481            fixture
2482                .volume()
2483                .volume()
2484                .record_or_replay_profile(new_profile_state(true), "foo")
2485                .await
2486                .expect("Recording");
2487
2488            // Page in the zero offsets only to avoid readahead strangeness.
2489            let mut writable = [0u8];
2490            for hash in &hashes {
2491                let vmo = fixture.get_blob_vmo(*hash).await;
2492                vmo.read(&mut writable, 0).expect("Vmo read");
2493            }
2494            fixture.volume().volume().stop_profile_tasks().await;
2495            fixture.close().await
2496        };
2497
2498        // Do this multiple times to ensure that the re-recording doesn't drop anything.
2499        for i in 0..3 {
2500            device.ensure_unique();
2501            device.reopen(false);
2502            let fixture = blob_testing::open_blob_fixture(device).await;
2503            {
2504                // Ensure that nothing is paged in right now.
2505                for hash in &hashes {
2506                    let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2507                    assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
2508                }
2509
2510                fixture
2511                    .volume()
2512                    .volume()
2513                    .record_or_replay_profile(new_profile_state(true), "foo")
2514                    .await
2515                    .expect("Replaying");
2516
2517                // Move the file in flight to ensure a new version lands to be used next time.
2518                {
2519                    let store_id = fixture.volume().volume().store().store_object_id();
2520                    let dir = fixture.volume().volume().get_profile_directory().await.unwrap();
2521                    let old_file = dir.lookup("foo").await.unwrap().unwrap().0;
2522                    let mut transaction = fixture
2523                        .fs()
2524                        .clone()
2525                        .new_transaction(
2526                            lock_keys!(
2527                                LockKey::object(store_id, dir.object_id()),
2528                                LockKey::object(store_id, old_file),
2529                            ),
2530                            Options::default(),
2531                        )
2532                        .await
2533                        .unwrap();
2534                    replace_child(&mut transaction, Some((&dir, "foo")), (&dir, &i.to_string()))
2535                        .await
2536                        .expect("Replace old profile.");
2537                    transaction.commit().await.unwrap();
2538                    assert!(
2539                        dir.lookup("foo").await.unwrap().is_none(),
2540                        "Old profile should be moved"
2541                    );
2542                }
2543
2544                // Await all data being played back by checking that things have paged in.
2545                for hash in &hashes {
2546                    // Fetch vmo this way as well to ensure that the open is counting the file as
2547                    // used in the current recording.
2548                    let _vmo = fixture.get_blob_vmo(*hash).await;
2549                    let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2550                    while blob.vmo().info().unwrap().committed_bytes == 0 {
2551                        fasync::Timer::new(Duration::from_millis(25)).await;
2552                    }
2553                }
2554
2555                // Complete the recording.
2556                fixture.volume().volume().stop_profile_tasks().await;
2557            }
2558            device = fixture.close().await;
2559        }
2560    }
2561
2562    #[fuchsia::test(threads = 10)]
2563    async fn test_profile_file() {
2564        let mut hashes = Vec::new();
2565        let crypt_file_id;
2566        let device = {
2567            let fixture = TestFixture::new().await;
2568            // Include a crypt file to access during the recording. It should not be added.
2569            {
2570                let crypt_dir = open_dir_checked(
2571                    fixture.root(),
2572                    "crypt_dir",
2573                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2574                    Default::default(),
2575                )
2576                .await;
2577                let crypt: Arc<CryptBase> = fixture.crypt().unwrap();
2578                crypt
2579                    .add_wrapping_key(WRAPPING_KEY_ID, [1; 32].into())
2580                    .expect("add_wrapping_key failed");
2581                crypt_dir
2582                    .update_attributes(&fio::MutableNodeAttributes {
2583                        wrapping_key_id: Some(WRAPPING_KEY_ID),
2584                        ..Default::default()
2585                    })
2586                    .await
2587                    .expect("update_attributes wire call failed")
2588                    .expect("update_attributes failed");
2589                let crypt_file = open_file_checked(
2590                    &crypt_dir,
2591                    "crypt_file",
2592                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2593                    &Default::default(),
2594                )
2595                .await;
2596                crypt_file.write("asdf".as_bytes()).await.unwrap().expect("Writing crypt file");
2597                crypt_file_id = crypt_file
2598                    .get_attributes(fio::NodeAttributesQuery::ID)
2599                    .await
2600                    .unwrap()
2601                    .expect("Get id")
2602                    .1
2603                    .id
2604                    .expect("Reading id in response");
2605            }
2606
2607            for i in 0..3u64 {
2608                let file_proxy = open_file_checked(
2609                    fixture.root(),
2610                    &i.to_string(),
2611                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2612                    &Default::default(),
2613                )
2614                .await;
2615                file_proxy.write(i.to_le_bytes().as_slice()).await.unwrap().expect("Writing file");
2616                let id = file_proxy
2617                    .get_attributes(fio::NodeAttributesQuery::ID)
2618                    .await
2619                    .unwrap()
2620                    .expect("Get id")
2621                    .1
2622                    .id
2623                    .expect("Reading id in response");
2624                hashes.push((i, id));
2625            }
2626            fixture.close().await
2627        };
2628        device.ensure_unique();
2629
2630        device.reopen(false);
2631        let mut device = {
2632            let fixture = TestFixture::open(
2633                device,
2634                TestFixtureOptions { format: false, ..Default::default() },
2635            )
2636            .await;
2637            fixture
2638                .volume()
2639                .volume()
2640                .record_or_replay_profile(new_profile_state(false), "foo")
2641                .await
2642                .expect("Recording");
2643
2644            {
2645                let crypt: Arc<CryptBase> = fixture.crypt().unwrap();
2646                crypt
2647                    .add_wrapping_key(WRAPPING_KEY_ID, [1; 32].into())
2648                    .expect("add wrapping key failed");
2649                let crypt_file = open_file_checked(
2650                    fixture.root(),
2651                    "crypt_dir/crypt_file",
2652                    fio::PERM_READABLE,
2653                    &Default::default(),
2654                )
2655                .await;
2656                crypt_file.read(1).await.unwrap().expect("Reading crypt file");
2657            }
2658            // Page in the zero offsets only to avoid readahead strangeness.
2659            for (i, _) in &hashes {
2660                let file_proxy = open_file_checked(
2661                    fixture.root(),
2662                    &i.to_string(),
2663                    fio::PERM_READABLE,
2664                    &Default::default(),
2665                )
2666                .await;
2667                file_proxy.read(1).await.unwrap().expect("Reading file");
2668            }
2669            fixture.volume().volume().stop_profile_tasks().await;
2670            fixture.close().await
2671        };
2672
2673        // Do this multiple times to ensure that the re-recording doesn't drop anything.
2674        for i in 0..3 {
2675            device.ensure_unique();
2676            device.reopen(false);
2677            let fixture = TestFixture::open(
2678                device,
2679                TestFixtureOptions { format: false, ..Default::default() },
2680            )
2681            .await;
2682            {
2683                // Need to get the root vmo to check committed bytes.
2684                let volume = fixture.volume().volume().clone();
2685                // Ensure that nothing is paged in right now.
2686                {
2687                    let crypt_file = volume
2688                        .get_or_load_node(crypt_file_id, ObjectDescriptor::File, None)
2689                        .await
2690                        .expect("Opening file internally")
2691                        .into_any()
2692                        .downcast::<FxFile>()
2693                        .expect("Should be file");
2694                    assert_eq!(crypt_file.vmo().info().unwrap().committed_bytes, 0);
2695                }
2696                for (_, id) in &hashes {
2697                    let file = volume
2698                        .get_or_load_node(*id, ObjectDescriptor::File, None)
2699                        .await
2700                        .expect("Opening file internally")
2701                        .into_any()
2702                        .downcast::<FxFile>()
2703                        .expect("Should be file");
2704                    assert_eq!(file.vmo().info().unwrap().committed_bytes, 0);
2705                }
2706
2707                fixture
2708                    .volume()
2709                    .volume()
2710                    .record_or_replay_profile(new_profile_state(false), "foo")
2711                    .await
2712                    .expect("Replaying");
2713
2714                // Move the file in flight to ensure a new version lands to be used next time.
2715                {
2716                    let store_id = fixture.volume().volume().store().store_object_id();
2717                    let dir = fixture.volume().volume().get_profile_directory().await.unwrap();
2718                    let old_file = dir.lookup("foo").await.unwrap().unwrap().0;
2719                    let mut transaction = fixture
2720                        .fs()
2721                        .clone()
2722                        .new_transaction(
2723                            lock_keys!(
2724                                LockKey::object(store_id, dir.object_id()),
2725                                LockKey::object(store_id, old_file),
2726                            ),
2727                            Options::default(),
2728                        )
2729                        .await
2730                        .unwrap();
2731                    replace_child(&mut transaction, Some((&dir, "foo")), (&dir, &i.to_string()))
2732                        .await
2733                        .expect("Replace old profile.");
2734                    transaction.commit().await.unwrap();
2735                    assert!(
2736                        dir.lookup("foo").await.unwrap().is_none(),
2737                        "Old profile should be moved"
2738                    );
2739                }
2740
2741                // Await all data being played back by checking that things have paged in.
2742                for (_, id) in &hashes {
2743                    let file = volume
2744                        .get_or_load_node(*id, ObjectDescriptor::File, None)
2745                        .await
2746                        .expect("Opening file internally")
2747                        .into_any()
2748                        .downcast::<FxFile>()
2749                        .expect("Should be file");
2750                    while file.vmo().info().unwrap().committed_bytes == 0 {
2751                        fasync::Timer::new(Duration::from_millis(25)).await;
2752                    }
2753                }
2754                // The crypt file access should not have been recorded or replayed.
2755                {
2756                    let crypt_file = volume
2757                        .get_or_load_node(crypt_file_id, ObjectDescriptor::File, None)
2758                        .await
2759                        .expect("Opening file internally")
2760                        .into_any()
2761                        .downcast::<FxFile>()
2762                        .expect("Should be file");
2763                    assert_eq!(crypt_file.vmo().info().unwrap().committed_bytes, 0);
2764                }
2765
2766                // Open all the files to show that they have been used.
2767                for (i, _) in &hashes {
2768                    let _file_proxy = open_file_checked(
2769                        fixture.root(),
2770                        &i.to_string(),
2771                        fio::PERM_READABLE,
2772                        &Default::default(),
2773                    )
2774                    .await;
2775                }
2776
2777                // Complete the recording.
2778                fixture.volume().volume().stop_profile_tasks().await;
2779            }
2780            device = fixture.close().await;
2781        }
2782    }
2783
2784    #[fuchsia::test(threads = 10)]
2785    async fn test_profile_update() {
2786        let mut hashes = Vec::new();
2787        let device = {
2788            let fixture = blob_testing::new_blob_fixture().await;
2789            for i in 0..2u64 {
2790                let hash =
2791                    fixture.write_blob(i.to_le_bytes().as_slice(), CompressionMode::Never).await;
2792                hashes.push(hash);
2793            }
2794            fixture.close().await
2795        };
2796        device.ensure_unique();
2797
2798        device.reopen(false);
2799        let device = {
2800            let fixture = blob_testing::open_blob_fixture(device).await;
2801
2802            {
2803                let volume = fixture.volume().volume();
2804                volume
2805                    .record_or_replay_profile(new_profile_state(true), "foo")
2806                    .await
2807                    .expect("Recording");
2808
2809                let original_recorded = RECORDED.load(Ordering::Relaxed);
2810
2811                // Page in the zero offsets only to avoid readahead strangeness.
2812                {
2813                    let mut writable = [0u8];
2814                    let hash = &hashes[0];
2815                    let vmo = fixture.get_blob_vmo(*hash).await;
2816                    vmo.read(&mut writable, 0).expect("Vmo read");
2817                }
2818
2819                // The recording happens asynchronously, so we must wait.  This is crude, but it's
2820                // only for testing and it's simple.
2821                while RECORDED.load(Ordering::Relaxed) == original_recorded {
2822                    fasync::Timer::new(std::time::Duration::from_millis(10)).await;
2823                }
2824
2825                volume.stop_profile_tasks().await;
2826            }
2827            fixture.close().await
2828        };
2829
2830        device.ensure_unique();
2831        device.reopen(false);
2832        let fixture = blob_testing::open_blob_fixture(device).await;
2833        {
2834            // Need to get the root vmo to check committed bytes.
2835            // Ensure that nothing is paged in right now.
2836            for hash in &hashes {
2837                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2838                assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
2839            }
2840
2841            let volume = fixture.volume().volume();
2842
2843            volume
2844                .record_or_replay_profile(new_profile_state(true), "foo")
2845                .await
2846                .expect("Replaying");
2847
2848            // Await all data being played back by checking that things have paged in.
2849            {
2850                let hash = &hashes[0];
2851                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2852                while blob.vmo().info().unwrap().committed_bytes == 0 {
2853                    fasync::Timer::new(Duration::from_millis(25)).await;
2854                }
2855            }
2856
2857            let original_recorded = RECORDED.load(Ordering::Relaxed);
2858
2859            // Record the new profile that will overwrite it.
2860            {
2861                let mut writable = [0u8];
2862                let hash = &hashes[1];
2863                let vmo = fixture.get_blob_vmo(*hash).await;
2864                vmo.read(&mut writable, 0).expect("Vmo read");
2865            }
2866
2867            // The recording happens asynchronously, so we must wait.  This is crude, but it's only
2868            // for testing and it's simple.
2869            while RECORDED.load(Ordering::Relaxed) == original_recorded {
2870                fasync::Timer::new(std::time::Duration::from_millis(10)).await;
2871            }
2872
2873            // Complete the recording.
2874            volume.stop_profile_tasks().await;
2875        }
2876        let device = fixture.close().await;
2877
2878        device.ensure_unique();
2879        device.reopen(false);
2880        let fixture = blob_testing::open_blob_fixture(device).await;
2881        {
2882            // Need to get the root vmo to check committed bytes.
2883            // Ensure that nothing is paged in right now.
2884            for hash in &hashes {
2885                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2886                assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
2887            }
2888
2889            fixture
2890                .volume()
2891                .volume()
2892                .record_or_replay_profile(new_profile_state(true), "foo")
2893                .await
2894                .expect("Replaying");
2895
2896            // Await all data being played back by checking that things have paged in.
2897            {
2898                let hash = &hashes[1];
2899                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2900                while blob.vmo().info().unwrap().committed_bytes == 0 {
2901                    fasync::Timer::new(Duration::from_millis(25)).await;
2902                }
2903            }
2904
2905            // Complete the recording.
2906            fixture.volume().volume().stop_profile_tasks().await;
2907
2908            // Verify that first blob was not paged in as the it should be dropped from the profile.
2909            {
2910                let hash = &hashes[0];
2911                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2912                assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
2913            }
2914        }
2915        fixture.close().await;
2916    }
2917
2918    #[fuchsia::test(threads = 10)]
2919    async fn test_unencrypted_volume() {
2920        let fixture = TestFixture::new_unencrypted().await;
2921        let root = fixture.root();
2922
2923        let f = open_file_checked(
2924            &root,
2925            "foo",
2926            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
2927            &Default::default(),
2928        )
2929        .await;
2930        close_file_checked(f).await;
2931
2932        fixture.close().await;
2933    }
2934
2935    #[fuchsia::test]
2936    async fn test_read_only_unencrypted_volume() {
2937        // Make a new Fxfs filesystem with an unencrypted volume named "vol".
2938        let fs = {
2939            let device = fxfs::filesystem::mkfs_with_volume(
2940                DeviceHolder::new(FakeDevice::new(8192, 512)),
2941                "vol",
2942                None,
2943            )
2944            .await
2945            .unwrap();
2946            // Re-open the device as read-only and mount the filesystem as read-only.
2947            device.reopen(true);
2948            FxFilesystemBuilder::new().read_only(true).open(device).await.unwrap()
2949        };
2950        // Ensure we can access the volume and gracefully terminate any tasks.
2951        {
2952            let root_volume = root_volume(fs.clone()).await.unwrap();
2953            let store = root_volume.volume("vol", StoreOptions::default()).await.unwrap();
2954            let unique_id = store.store_object_id();
2955            let blob_resupplied_count =
2956                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2957            let volume = FxVolume::new(
2958                Weak::new(),
2959                store,
2960                unique_id,
2961                blob_resupplied_count,
2962                MemoryPressureConfig::default(),
2963            )
2964            .unwrap();
2965            volume.terminate().await;
2966        }
2967        // Close the filesystem, and make sure we don't have any dangling references.
2968        fs.close().await.unwrap();
2969        let device = fs.take_device().await;
2970        device.ensure_unique();
2971    }
2972
2973    #[fuchsia::test]
2974    async fn test_read_only_encrypted_volume() {
2975        let crypt: Arc<CryptBase> = Arc::new(new_insecure_crypt());
2976        // Make a new Fxfs filesystem with an encrypted volume named "vol".
2977        let fs = {
2978            let device = fxfs::filesystem::mkfs_with_volume(
2979                DeviceHolder::new(FakeDevice::new(8192, 512)),
2980                "vol",
2981                Some(crypt.clone()),
2982            )
2983            .await
2984            .unwrap();
2985            // Re-open the device as read-only and mount the filesystem as read-only.
2986            device.reopen(true);
2987            FxFilesystemBuilder::new().read_only(true).open(device).await.unwrap()
2988        };
2989        // Ensure we can access the volume and gracefully terminate any tasks.
2990        {
2991            let root_volume = root_volume(fs.clone()).await.unwrap();
2992            let store = root_volume
2993                .volume("vol", StoreOptions { crypt: Some(crypt), ..StoreOptions::default() })
2994                .await
2995                .unwrap();
2996            let unique_id = store.store_object_id();
2997            let blob_resupplied_count =
2998                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2999            let volume = FxVolume::new(
3000                Weak::new(),
3001                store,
3002                unique_id,
3003                blob_resupplied_count,
3004                MemoryPressureConfig::default(),
3005            )
3006            .unwrap();
3007            volume.terminate().await;
3008        }
3009        // Close the filesystem, and make sure we don't have any dangling references.
3010        fs.close().await.unwrap();
3011        let device = fs.take_device().await;
3012        device.ensure_unique();
3013    }
3014
3015    #[fuchsia::test]
3016    fn test_read_ahead_sizes() {
3017        let config = MemoryPressureConfig::default();
3018        assert!(config.mem_normal.read_ahead_size % BASE_READ_AHEAD_SIZE == 0);
3019        assert_eq!(config.mem_normal.read_ahead_size, MAX_READ_AHEAD_SIZE);
3020
3021        assert!(config.mem_warning.read_ahead_size % BASE_READ_AHEAD_SIZE == 0);
3022
3023        assert_eq!(config.mem_critical.read_ahead_size, BASE_READ_AHEAD_SIZE);
3024    }
3025}