Skip to main content

fxfs_platform/fuchsia/
volume.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::component::map_to_raw_status;
6use crate::fuchsia::directory::FxDirectory;
7use crate::fuchsia::dirent_cache::DirentCache;
8use crate::fuchsia::file::FxFile;
9use crate::fuchsia::memory_pressure::{MemoryPressureLevel, MemoryPressureMonitor};
10use crate::fuchsia::node::{FxNode, GetResult, NodeCache};
11use crate::fuchsia::pager::Pager;
12use crate::fuchsia::profile::ProfileState;
13use crate::fuchsia::symlink::FxSymlink;
14use crate::fuchsia::volumes_directory::VolumesDirectory;
15use anyhow::{Error, bail, ensure};
16use async_trait::async_trait;
17use fidl::endpoints::ServerEnd;
18use fidl_fuchsia_fxfs::{
19    BytesAndNodes, FileBackedVolumeProviderRequest, FileBackedVolumeProviderRequestStream,
20    ProjectIdRequest, ProjectIdRequestStream, ProjectIterToken,
21};
22use fidl_fuchsia_io as fio;
23use fs_inspect::{FsInspectVolume, VolumeData};
24use fuchsia_async as fasync;
25use fuchsia_async::epoch::Epoch;
26use fuchsia_sync::Mutex;
27use futures::channel::oneshot;
28use futures::stream::{self, FusedStream, Stream};
29use futures::{FutureExt, StreamExt, TryStreamExt};
30use fxfs::errors::FxfsError;
31use fxfs::filesystem::{self, SyncOptions};
32use fxfs::future_with_guard::FutureWithGuard;
33use fxfs::log::*;
34use fxfs::object_store::directory::Directory;
35use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
36use fxfs::object_store::{HandleOptions, HandleOwner, ObjectDescriptor, ObjectStore};
37use refaults_vmo::PageRefaultCounter;
38use std::future::Future;
39use std::pin::pin;
40#[cfg(any(test, feature = "testing"))]
41use std::sync::atomic::AtomicBool;
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::{Arc, Weak};
44use std::time::Duration;
45use vfs::directory::entry::DirectoryEntry;
46use vfs::directory::simple::Simple;
47use vfs::execution_scope::ExecutionScope;
48
49// LINT.IfChange
50// TODO:(b/299919008) Fix this number to something reasonable, or maybe just for fxblob.
51const DIRENT_CACHE_LIMIT: usize = 2000;
52// LINT.ThenChange(//src/storage/stressor/src/aggressive.rs)
53
54/// The smallest read-ahead size. All other read-ahead sizes will be a multiple of this read-ahead
55/// size. Having this property allows for chunking metadata at this granularity.
56pub const BASE_READ_AHEAD_SIZE: u64 = 32 * 1024;
57pub const MAX_READ_AHEAD_SIZE: u64 = BASE_READ_AHEAD_SIZE * 4;
58
59const PROFILE_DIRECTORY: &str = "profiles";
60
61#[derive(Clone, Copy, Debug)]
62pub struct MemoryPressureLevelConfig {
63    /// The period to wait between flushes, as well as perform other background maintenance tasks
64    /// (e.g. purging caches).
65    pub background_task_period: Duration,
66
67    /// The limit of cached nodes.
68    pub cache_size_limit: usize,
69
70    /// The initial delay before the background task runs. The background task has a longer initial
71    /// delay to avoid running the task during boot.
72    pub background_task_initial_delay: Duration,
73
74    /// The amount of read-ahead to do. The read-ahead size is reduce when under memory pressure.
75    /// The kernel starts evicting pages when under memory pressure so over supplied pages are less
76    /// likely to be used before being evicted.
77    pub read_ahead_size: u64,
78}
79
80impl Default for MemoryPressureLevelConfig {
81    fn default() -> Self {
82        Self {
83            background_task_period: Duration::from_secs(20),
84            cache_size_limit: DIRENT_CACHE_LIMIT,
85            background_task_initial_delay: Duration::from_secs(70),
86            read_ahead_size: MAX_READ_AHEAD_SIZE,
87        }
88    }
89}
90
91#[derive(Clone, Copy, Debug)]
92pub struct MemoryPressureConfig {
93    /// The configuration to use at [`MemoryPressureLevel::Normal`].
94    pub mem_normal: MemoryPressureLevelConfig,
95
96    /// The configuration to use at [`MemoryPressureLevel::Warning`].
97    pub mem_warning: MemoryPressureLevelConfig,
98
99    /// The configuration to use at [`MemoryPressureLevel::Critical`].
100    pub mem_critical: MemoryPressureLevelConfig,
101}
102
103impl MemoryPressureConfig {
104    pub fn for_level(&self, level: &MemoryPressureLevel) -> &MemoryPressureLevelConfig {
105        match level {
106            MemoryPressureLevel::Normal => &self.mem_normal,
107            MemoryPressureLevel::Warning => &self.mem_warning,
108            MemoryPressureLevel::Critical => &self.mem_critical,
109        }
110    }
111}
112
113impl Default for MemoryPressureConfig {
114    fn default() -> Self {
115        // TODO(https://fxbug.dev/42061389): investigate a smarter strategy for determining flush
116        // frequency.
117        Self {
118            mem_normal: MemoryPressureLevelConfig {
119                background_task_period: Duration::from_secs(20),
120                cache_size_limit: DIRENT_CACHE_LIMIT,
121                background_task_initial_delay: Duration::from_secs(70),
122                read_ahead_size: MAX_READ_AHEAD_SIZE,
123            },
124            mem_warning: MemoryPressureLevelConfig {
125                background_task_period: Duration::from_secs(5),
126                cache_size_limit: 100,
127                background_task_initial_delay: Duration::from_secs(5),
128                read_ahead_size: BASE_READ_AHEAD_SIZE * 2,
129            },
130            mem_critical: MemoryPressureLevelConfig {
131                background_task_period: Duration::from_millis(1500),
132                cache_size_limit: 20,
133                background_task_initial_delay: Duration::from_millis(1500),
134                read_ahead_size: BASE_READ_AHEAD_SIZE,
135            },
136        }
137    }
138}
139
140/// FxVolume represents an opened volume. It is also a (weak) cache for all opened Nodes within the
141/// volume.
142pub struct FxVolume {
143    parent: Weak<VolumesDirectory>,
144    cache: NodeCache,
145    store: Arc<ObjectStore>,
146    pager: Pager,
147    executor: fasync::EHandle,
148    name: String,
149
150    // A tuple of the actual task and a channel to signal to terminate the task.
151    background_task: Mutex<Option<(fasync::Task<()>, oneshot::Sender<()>)>>,
152
153    // Unique identifier of the filesystem that owns this volume.
154    fs_id: u64,
155
156    // The execution scope for this volume.
157    scope: ExecutionScope,
158
159    dirent_cache: DirentCache,
160
161    profile_state: Mutex<Option<Box<dyn ProfileState>>>,
162
163    /// This is updated based on the memory-pressure level.
164    read_ahead_size: AtomicU64,
165
166    #[cfg(any(test, feature = "testing"))]
167    poisoned: AtomicBool,
168
169    blob_resupplied_count: Arc<PageRefaultCounter>,
170
171    /// The number of dirty bytes in pager backed VMOs that belong to this volume. VolumesDirectory
172    /// holds a count for all volumes. This count is only used for tracing.
173    pager_dirty_byte_count: AtomicU64,
174}
175
176#[fxfs_trace::trace]
177impl FxVolume {
178    pub fn new(
179        parent: Weak<VolumesDirectory>,
180        store: Arc<ObjectStore>,
181        fs_id: u64,
182        name: String,
183        blob_resupplied_count: Arc<PageRefaultCounter>,
184        memory_pressure_config: MemoryPressureConfig,
185    ) -> Result<Self, Error> {
186        let scope = ExecutionScope::new();
187        Ok(Self {
188            parent,
189            cache: NodeCache::new(),
190            store,
191            name,
192            pager: Pager::new(scope.clone())?,
193            executor: fasync::EHandle::local(),
194            background_task: Mutex::new(None),
195            fs_id,
196            scope,
197            dirent_cache: DirentCache::new(memory_pressure_config.mem_normal.cache_size_limit),
198            profile_state: Mutex::new(None),
199            read_ahead_size: AtomicU64::new(memory_pressure_config.mem_normal.read_ahead_size),
200            #[cfg(any(test, feature = "testing"))]
201            poisoned: AtomicBool::new(false),
202            blob_resupplied_count,
203            pager_dirty_byte_count: AtomicU64::new(0),
204        })
205    }
206
207    pub fn store(&self) -> &Arc<ObjectStore> {
208        &self.store
209    }
210
211    pub fn cache(&self) -> &NodeCache {
212        &self.cache
213    }
214
215    pub fn dirent_cache(&self) -> &DirentCache {
216        &self.dirent_cache
217    }
218
219    pub fn pager(&self) -> &Pager {
220        &self.pager
221    }
222
223    pub fn id(&self) -> u64 {
224        self.fs_id
225    }
226
227    pub fn scope(&self) -> &ExecutionScope {
228        &self.scope
229    }
230
231    pub fn blob_resupplied_count(&self) -> &PageRefaultCounter {
232        &self.blob_resupplied_count
233    }
234
235    pub fn name(&self) -> &str {
236        &self.name
237    }
238
239    /// Reports the filesystem info, but if the volume has a space limit applied then the space
240    /// available and space used are reported based on the volume instead.
241    pub fn filesystem_info_for_volume(&self) -> fio::FilesystemInfo {
242        let allocator = self.store.filesystem().allocator();
243        let info =
244            if let Some(limit) = allocator.get_owner_bytes_limit(self.store.store_object_id()) {
245                filesystem::Info {
246                    used_bytes: allocator.get_owner_bytes_used(self.store.store_object_id()),
247                    total_bytes: limit,
248                }
249            } else {
250                self.store.filesystem().get_info()
251            };
252
253        info_to_filesystem_info(info, self.store.block_size(), self.store.object_count(), self.id())
254    }
255
256    /// Stop profiling, recover resources from it and finalize recordings.
257    pub async fn stop_profile_tasks(self: &Arc<Self>) {
258        let Some(mut state) = self.profile_state.lock().take() else { return };
259        state.wait_for_replay_to_finish().await;
260        self.pager.set_recorder(None);
261        let _ = state.wait_for_recording_to_finish().await;
262    }
263
264    /// Opens or creates the profile directory in the volume's internal directory.
265    pub async fn get_profile_directory(self: &Arc<Self>) -> Result<Directory<FxVolume>, Error> {
266        let internal_dir = self
267            .get_or_create_internal_dir()
268            .await
269            .map_err(|e| e.context("Opening internal directory"))?;
270        // Have to do separate calls to create the profile dir if necessary.
271        let mut transaction = self
272            .store()
273            .filesystem()
274            .new_transaction(
275                lock_keys![LockKey::object(
276                    self.store().store_object_id(),
277                    internal_dir.object_id(),
278                )],
279                Options::default(),
280            )
281            .await?;
282        Ok(match internal_dir.directory().lookup(PROFILE_DIRECTORY).await? {
283            Some((object_id, _, _)) => {
284                Directory::open_unchecked(self.clone(), object_id, None, false)
285            }
286            None => {
287                let new_dir = internal_dir
288                    .directory()
289                    .create_child_dir(&mut transaction, PROFILE_DIRECTORY)
290                    .await?;
291                transaction.commit().await?;
292                new_dir
293            }
294        })
295    }
296
297    /// Starts recording a profile for the volume under the name given, and if a profile exists
298    /// under that same name it is replayed and will be replaced after by the new recording if it
299    /// is cleanly shutdown and finalized.
300    pub async fn record_or_replay_profile(
301        self: &Arc<Self>,
302        mut state: Box<dyn ProfileState>,
303        name: &str,
304    ) -> Result<(), Error> {
305        // We don't meddle in FxDirectory or FxFile here because we don't want a paged object.
306        // Normally we ensure that there's only one copy by using the Node cache on the volume, but
307        // that would create FxFile, so in this case we just assume that only one profile operation
308        // should be ongoing at a time, as that is ensured in `VolumesDirectory`.
309
310        // If there is a recording already, prepare to replay it.
311        let profile_dir = self.get_profile_directory().await?;
312        let replay_handle = if let Some((id, descriptor, _)) = profile_dir.lookup(name).await? {
313            ensure!(matches!(descriptor, ObjectDescriptor::File), FxfsError::Inconsistent);
314            Some(Box::new(
315                ObjectStore::open_object(self, id, HandleOptions::default(), None).await?,
316            ))
317        } else {
318            None
319        };
320
321        let mut profile_state = self.profile_state.lock();
322
323        info!("Recording new profile '{name}' for volume object {}", self.store.store_object_id());
324        // Begin recording first to ensure that we capture any activity from the replay.
325        self.pager.set_recorder(Some(state.record_new(self, name)));
326        if let Some(handle) = replay_handle {
327            if let Some(guard) = self.scope().try_active_guard() {
328                state.replay_profile(handle, self.clone(), guard);
329                info!(
330                    "Replaying existing profile '{name}' for volume object {}",
331                    self.store.store_object_id()
332                );
333            }
334        }
335        *profile_state = Some(state);
336        Ok(())
337    }
338
339    async fn get_or_create_internal_dir(self: &Arc<Self>) -> Result<Arc<FxDirectory>, Error> {
340        let internal_data_id = self.store().get_or_create_internal_directory_id().await?;
341        let internal_dir = self
342            .get_or_load_node(internal_data_id, ObjectDescriptor::Directory, None)
343            .await?
344            .into_any()
345            .downcast::<FxDirectory>()
346            .unwrap();
347        Ok(internal_dir)
348    }
349
350    pub async fn terminate(&self) {
351        let task = std::mem::replace(&mut *self.background_task.lock(), None);
352        if let Some((task, terminate)) = task {
353            let _ = terminate.send(());
354            task.await;
355        }
356
357        // `NodeCache::terminate` will break any strong reference cycles contained within nodes
358        // (pager registration). The only remaining nodes should be those with open FIDL
359        // connections or vmo references in the process of handling the VMO_ZERO_CHILDREN signal.
360        // `ExecutionScope::shutdown` + `ExecutionScope::wait` will close the open FIDL connections
361        // and synchonrize the signal handling which should result in all nodes flushing and then
362        // dropping. Any async tasks required to flush a node should take an active guard on the
363        // `ExecutionScope` which will prevent `ExecutionScope::wait` from completing until all
364        // nodes are flushed.
365        self.scope.shutdown();
366        self.cache.terminate();
367        self.scope.wait().await;
368
369        // Make sure there are no deferred operations still pending for this volume.
370        Epoch::global().barrier().await;
371
372        // The dirent_cache must be cleared *after* shutting down the scope because there can be
373        // tasks that insert entries into the cache.
374        self.dirent_cache.clear();
375
376        if self.store.filesystem().options().read_only {
377            // If the filesystem is read only, we don't need to flush/sync anything.
378            if self.store.is_unlocked() {
379                self.store.lock_read_only();
380            }
381            return;
382        }
383
384        self.flush_all_files(true).await;
385        self.store.filesystem().graveyard().flush().await;
386        if self.store.crypt().is_some() {
387            if let Err(e) = self.store.lock().await {
388                // The store will be left in a safe state and there won't be data-loss unless
389                // there's an issue flushing the journal later.
390                warn!(error:? = e; "Locking store error");
391            }
392        }
393        let sync_status = self
394            .store
395            .filesystem()
396            .sync(SyncOptions { flush_device: true, ..Default::default() })
397            .await;
398        if let Err(e) = sync_status {
399            error!(error:? = e; "Failed to sync filesystem; data may be lost");
400        }
401    }
402
403    /// Attempts to get a node from the node cache. If the node wasn't present in the cache, loads
404    /// the object from the object store, installing the returned node into the cache and returns
405    /// the newly created FxNode backed by the loaded object.  |parent| is only set on the node if
406    /// the node was not present in the cache.  Otherwise, it is ignored.
407    pub async fn get_or_load_node(
408        self: &Arc<Self>,
409        object_id: u64,
410        object_descriptor: ObjectDescriptor,
411        parent: Option<Arc<FxDirectory>>,
412    ) -> Result<Arc<dyn FxNode>, Error> {
413        match self.cache.get_or_reserve(object_id).await {
414            GetResult::Node(node) => Ok(node),
415            GetResult::Placeholder(placeholder) => {
416                let node = match object_descriptor {
417                    ObjectDescriptor::File => FxFile::new(
418                        ObjectStore::open_object(self, object_id, HandleOptions::default(), None)
419                            .await?,
420                    ) as Arc<dyn FxNode>,
421                    ObjectDescriptor::Directory => {
422                        // Can't use open_unchecked because we don't know if the dir is casefolded
423                        // or encrypted.
424                        Arc::new(FxDirectory::new(parent, Directory::open(self, object_id).await?))
425                            as Arc<dyn FxNode>
426                    }
427                    ObjectDescriptor::Symlink => Arc::new(FxSymlink::new(self.clone(), object_id)),
428                    _ => bail!(FxfsError::Inconsistent),
429                };
430                placeholder.commit(&node);
431                Ok(node)
432            }
433        }
434    }
435
436    /// Marks the given directory deleted.
437    pub fn mark_directory_deleted(&self, object_id: u64) {
438        if let Some(node) = self.cache.get(object_id) {
439            // It's possible that node is a placeholder, in which case we don't need to wait for it
440            // to be resolved because it should be blocked behind the locks that are held by the
441            // caller, and once they're dropped, it'll be found to be deleted via the tree.
442            if let Ok(dir) = node.into_any().downcast::<FxDirectory>() {
443                dir.set_deleted();
444            }
445        }
446    }
447
448    /// Removes resources associated with |object_id| (which ought to be a file), if there are no
449    /// open connections to that file.
450    ///
451    /// This must be called *after committing* a transaction which deletes the last reference to
452    /// |object_id|, since before that point, new connections could be established.
453    pub(super) async fn maybe_purge_file(&self, object_id: u64) -> Result<(), Error> {
454        if let Some(node) = self.cache.get(object_id) {
455            node.mark_to_be_purged();
456            return Ok(());
457        }
458        // If this fails, the graveyard should clean it up on next mount.
459        self.store
460            .tombstone_object(
461                object_id,
462                Options { borrow_metadata_space: true, ..Default::default() },
463            )
464            .await?;
465        Ok(())
466    }
467
468    /// Starts the background work task.  This task will periodically:
469    ///   - scan all files and flush them to disk, and
470    ///   - purge unused cached data.
471    /// The task will hold a strong reference to the FxVolume while it is running, so the task must
472    /// be closed later with Self::terminate, or the FxVolume will never be dropped.
473    pub fn start_background_task(
474        self: &Arc<Self>,
475        config: MemoryPressureConfig,
476        mem_monitor: Option<&MemoryPressureMonitor>,
477    ) {
478        let mut background_task = self.background_task.lock();
479        if background_task.is_none() {
480            let (tx, rx) = oneshot::channel();
481
482            let task = if let Some(mem_monitor) = mem_monitor {
483                fasync::Task::spawn(self.clone().background_task(
484                    config,
485                    mem_monitor.get_level_stream(),
486                    rx,
487                ))
488            } else {
489                // With no memory pressure monitoring, just stub the stream out as always pending.
490                fasync::Task::spawn(self.clone().background_task(config, stream::pending(), rx))
491            };
492
493            *background_task = Some((task, tx));
494        }
495    }
496
497    #[trace]
498    async fn background_task(
499        self: Arc<Self>,
500        config: MemoryPressureConfig,
501        mut level_stream: impl Stream<Item = MemoryPressureLevel> + FusedStream + Unpin,
502        terminate: oneshot::Receiver<()>,
503    ) {
504        debug!(store_id = self.store.store_object_id(); "FxVolume::background_task start");
505        let mut terminate = terminate.fuse();
506        // Default to the normal period until updates come from the `level_stream`.
507        let mut level = MemoryPressureLevel::Normal;
508        let mut timer =
509            pin!(fasync::Timer::new(config.for_level(&level).background_task_initial_delay));
510
511        loop {
512            let mut should_terminate = false;
513            let mut should_flush = false;
514            let mut low_mem = false;
515            let mut should_purge_layer_files = false;
516            let mut should_update_cache_limit = false;
517
518            futures::select_biased! {
519                _ = terminate => should_terminate = true,
520                new_level = level_stream.next() => {
521                    // Because `level_stream` will never terminate, this is safe to unwrap.
522                    let new_level = new_level.unwrap();
523                    // At critical levels, it's okay to undertake expensive work immediately
524                    // to reclaim memory.
525                    low_mem = matches!(new_level, MemoryPressureLevel::Critical);
526                    should_purge_layer_files = true;
527                    if new_level != level {
528                        level = new_level;
529                        should_update_cache_limit = true;
530                        let level_config = config.for_level(&level);
531                        self.read_ahead_size.store(level_config.read_ahead_size, Ordering::Relaxed);
532                        timer.as_mut().reset(fasync::MonotonicInstant::after(
533                            level_config.background_task_period.into())
534                        );
535                        debug!(
536                            "Background task period changed to {:?} due to new memory pressure \
537                            level ({:?}).",
538                            config.for_level(&level).background_task_period, level
539                        );
540                    }
541                }
542                _ = timer => {
543                    timer.as_mut().reset(fasync::MonotonicInstant::after(
544                        config.for_level(&level).background_task_period.into())
545                    );
546                    should_flush = true;
547                    // Only purge layer file caches once we have elevated memory pressure.
548                    should_purge_layer_files = !matches!(level, MemoryPressureLevel::Normal);
549                }
550            };
551            if should_terminate {
552                break;
553            }
554            // Maybe close extra files *before* iterating them for flush/low mem.
555            if should_update_cache_limit {
556                self.dirent_cache.set_limit(config.for_level(&level).cache_size_limit);
557            }
558            if should_flush {
559                self.flush_all_files(false).await;
560                self.dirent_cache.recycle_stale_files();
561            } else if low_mem {
562                // This is a softer version of flushing files, so don't bother if we're flushing.
563                self.minimize_memory().await;
564            }
565            if should_purge_layer_files {
566                for layer in self.store.tree().immutable_layer_set().layers {
567                    layer.purge_cached_data();
568                }
569            }
570        }
571        debug!(store_id = self.store.store_object_id(); "FxVolume::background_task end");
572    }
573
574    /// Reports that a certain number of bytes will be dirtied in a pager-backed VMO.
575    ///
576    /// Note that this function may await flush tasks.
577    pub fn report_pager_dirty(
578        self: Arc<Self>,
579        byte_count: u64,
580        mark_dirty: impl FnOnce() + Send + 'static,
581    ) {
582        let this = self.clone();
583        let callback = move || {
584            mark_dirty();
585            let prev = this.pager_dirty_byte_count.fetch_add(byte_count, Ordering::Relaxed);
586            fxfs_trace::counter!("dirty-bytes", 0, this.name => prev.saturating_add(byte_count));
587        };
588        if let Some(parent) = self.parent.upgrade() {
589            parent.report_pager_dirty(byte_count, self, callback);
590        } else {
591            callback();
592        }
593    }
594
595    /// Reports that a certain number of bytes were cleaned in a pager-backed VMO.
596    pub fn report_pager_clean(&self, byte_count: u64) {
597        if let Some(parent) = self.parent.upgrade() {
598            parent.report_pager_clean(byte_count);
599        }
600        let prev = self.pager_dirty_byte_count.fetch_sub(byte_count, Ordering::Relaxed);
601        fxfs_trace::counter!("dirty-bytes", 0, self.name => prev.saturating_sub(byte_count));
602    }
603
604    #[trace]
605    pub async fn flush_all_files(&self, last_chance: bool) {
606        let mut flushed = 0;
607        for file in self.cache.files() {
608            if let Some(node) = file.into_opened_node() {
609                if let Err(e) = FxFile::flush(&node, last_chance).await {
610                    warn!(
611                        store_id = self.store.store_object_id(),
612                        oid = node.object_id(),
613                        error:? = e;
614                        "Failed to flush",
615                    )
616                }
617                if last_chance {
618                    let file = node.clone();
619                    std::mem::drop(node);
620                    file.force_clean();
621                }
622            }
623            flushed += 1;
624        }
625        debug!(store_id = self.store.store_object_id(), file_count = flushed; "FxVolume flushed");
626    }
627
628    /// Flushes only files with dirty pages.
629    ///
630    /// `PagedObjectHandle` tracks the number dirty pages locally (except for overwrite files) which
631    /// makes determining whether flushing a file will reduce the number of dirty pages efficient.
632    /// `flush_all_files` checks if any metadata needs to be flushed which involves a syscall making
633    /// it significantly slower when there are lots of open files without dirty pages.
634    #[trace]
635    pub async fn minimize_memory(&self) {
636        for file in self.cache.files() {
637            if let Some(node) = file.into_opened_node() {
638                if let Err(e) = node.handle().minimize_memory().await {
639                    warn!(
640                        store_id = self.store.store_object_id(),
641                        oid = node.object_id(),
642                        error:? = e;
643                        "Failed to flush",
644                    )
645                }
646            }
647        }
648    }
649
650    /// Spawns a short term task for the volume that includes a guard that will prevent termination.
651    pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) {
652        if let Some(guard) = self.scope.try_active_guard() {
653            self.executor.spawn_detached(FutureWithGuard::new(guard, task));
654        }
655    }
656
657    /// Returns the current read-ahead size that should be used based on the current memory-pressure
658    /// level.
659    pub fn read_ahead_size(&self) -> u64 {
660        self.read_ahead_size.load(Ordering::Relaxed)
661    }
662
663    /// Tries to unwrap this volume.  If it fails, it will poison the volume so that when it is
664    /// dropped, you get a backtrace.
665    #[cfg(any(test, feature = "testing"))]
666    pub fn try_unwrap(self: Arc<Self>) -> Option<FxVolume> {
667        self.poisoned.store(true, Ordering::Relaxed);
668        match Arc::try_unwrap(self) {
669            Ok(volume) => {
670                volume.poisoned.store(false, Ordering::Relaxed);
671                Some(volume)
672            }
673            Err(this) => {
674                // Log details about all the places where there might be a reference cycle.
675                info!(
676                    "background_task: {}, profile_state: {}, dirent_cache count: {}, \
677                     pager strong file refs={}, no tasks={}",
678                    this.background_task.lock().is_some(),
679                    this.profile_state.lock().is_some(),
680                    this.dirent_cache.len(),
681                    crate::pager::STRONG_FILE_REFS.load(Ordering::Relaxed),
682                    {
683                        let mut no_tasks = pin!(this.scope.wait());
684                        no_tasks
685                            .poll_unpin(&mut std::task::Context::from_waker(
686                                std::task::Waker::noop(),
687                            ))
688                            .is_ready()
689                    },
690                );
691                None
692            }
693        }
694    }
695
696    pub async fn handle_file_backed_volume_provider_requests(
697        this: Weak<Self>,
698        scope: ExecutionScope,
699        mut requests: FileBackedVolumeProviderRequestStream,
700    ) -> Result<(), Error> {
701        while let Some(request) = requests.try_next().await? {
702            match request {
703                FileBackedVolumeProviderRequest::Open {
704                    parent_directory_token,
705                    name,
706                    server_end,
707                    control_handle: _,
708                } => {
709                    // Try and get an active guard before upgrading.
710                    let Some(_guard) = scope.try_active_guard() else {
711                        bail!("Volume shutting down")
712                    };
713                    let Some(this) = this.upgrade() else { bail!("FxVolume dropped") };
714                    match this
715                        .scope
716                        .token_registry()
717                        // NB: For now, we only expect these calls in a regular (non-blob) volume.
718                        // Hard-code the type for simplicity; attempts to call on a blob volume will
719                        // get an error.
720                        .get_owner(parent_directory_token)
721                        .and_then(|dir| {
722                            dir.ok_or(zx::Status::BAD_HANDLE).and_then(|dir| {
723                                dir.into_any()
724                                    .downcast::<FxDirectory>()
725                                    .map_err(|_| zx::Status::BAD_HANDLE)
726                            })
727                        }) {
728                        Ok(dir) => {
729                            dir.open_block_file(&name, server_end).await;
730                        }
731                        Err(status) => {
732                            let _ = server_end.close_with_epitaph(status).unwrap_or_else(|e| {
733                                error!(error:? = e; "open failed to send epitaph");
734                            });
735                        }
736                    }
737                }
738            }
739        }
740        Ok(())
741    }
742
743    pub async fn handle_project_id_requests(
744        this: Weak<Self>,
745        scope: ExecutionScope,
746        mut requests: ProjectIdRequestStream,
747    ) -> Result<(), Error> {
748        while let Some(request) = requests.try_next().await? {
749            // Try and get an active guard before upgrading.
750            let Some(_guard) = scope.try_active_guard() else { bail!("Volume shutting down") };
751            let Some(this) = this.upgrade() else { bail!("FxVolume dropped") };
752            let store_id = this.store.store_object_id();
753
754            match request {
755                ProjectIdRequest::SetLimit { responder, project_id, bytes, nodes } => responder
756                    .send(
757                    this.store().set_project_limit(project_id, bytes, nodes).await.map_err(
758                        |error| {
759                            error!(error:?, store_id, project_id; "Failed to set project limit");
760                            map_to_raw_status(error)
761                        },
762                    ),
763                )?,
764                ProjectIdRequest::Clear { responder, project_id } => responder.send(
765                    this.store().clear_project_limit(project_id).await.map_err(|error| {
766                        error!(error:?, store_id, project_id; "Failed to clear project limit");
767                        map_to_raw_status(error)
768                    }),
769                )?,
770                ProjectIdRequest::SetForNode { responder, node_id, project_id } => {
771                    responder
772                        .send(this.store().set_project_for_node(node_id, project_id).await.map_err(
773                        |error| {
774                            error!(error:?, store_id, node_id, project_id; "Failed to apply node.");
775                            map_to_raw_status(error)
776                        },
777                    ))?
778                }
779                ProjectIdRequest::GetForNode { responder, node_id } => responder.send(
780                    this.store().get_project_for_node(node_id).await.map_err(|error| {
781                        error!(error:?, store_id, node_id; "Failed to get node.");
782                        map_to_raw_status(error)
783                    }),
784                )?,
785                ProjectIdRequest::ClearForNode { responder, node_id } => responder.send(
786                    this.store().clear_project_for_node(node_id).await.map_err(|error| {
787                        error!(error:?, store_id, node_id; "Failed to clear for node.");
788                        map_to_raw_status(error)
789                    }),
790                )?,
791                ProjectIdRequest::List { responder, token } => {
792                    responder.send(match this.list_projects(&token).await {
793                        Ok((ref entries, ref next_token)) => Ok((entries, next_token.as_ref())),
794                        Err(error) => {
795                            error!(error:?, store_id, token:?; "Failed to list projects.");
796                            Err(map_to_raw_status(error))
797                        }
798                    })?
799                }
800                ProjectIdRequest::Info { responder, project_id } => {
801                    responder.send(match this.project_info(project_id).await {
802                        Ok((ref limit, ref usage)) => Ok((limit, usage)),
803                        Err(error) => {
804                            error!(error:?, store_id, project_id; "Failed to get project info.");
805                            Err(map_to_raw_status(error))
806                        }
807                    })?
808                }
809            }
810        }
811        Ok(())
812    }
813
814    // Maximum entries to fit based on 64KiB message size minus 16 bytes of header, 16 bytes
815    // of vector header, 16 bytes for the optional token header, and 8 bytes of token value.
816    // https://fuchsia.dev/fuchsia-src/development/languages/fidl/guides/max-out-pagination
817    const MAX_PROJECT_ENTRIES: usize = 8184;
818
819    // Calls out to the inner volume to list available projects, removing and re-adding the fidl
820    // wrapper types for the pagination token.
821    async fn list_projects(
822        &self,
823        last_token: &Option<Box<ProjectIterToken>>,
824    ) -> Result<(Vec<u64>, Option<ProjectIterToken>), Error> {
825        let (entries, token) = self
826            .store()
827            .list_projects(
828                match last_token {
829                    None => 0,
830                    Some(v) => v.value,
831                },
832                Self::MAX_PROJECT_ENTRIES,
833            )
834            .await?;
835        Ok((entries, token.map(|value| ProjectIterToken { value })))
836    }
837
838    async fn project_info(&self, project_id: u64) -> Result<(BytesAndNodes, BytesAndNodes), Error> {
839        let (limit, usage) = self.store().project_info(project_id).await?;
840        // At least one of them needs to be around to return anything.
841        ensure!(limit.is_some() || usage.is_some(), FxfsError::NotFound);
842        Ok((
843            limit.map_or_else(
844                || BytesAndNodes { bytes: u64::MAX, nodes: u64::MAX },
845                |v| BytesAndNodes { bytes: v.0, nodes: v.1 },
846            ),
847            usage.map_or_else(
848                || BytesAndNodes { bytes: 0, nodes: 0 },
849                |v| BytesAndNodes { bytes: v.0, nodes: v.1 },
850            ),
851        ))
852    }
853}
854
855#[cfg(any(test, feature = "testing"))]
856impl Drop for FxVolume {
857    fn drop(&mut self) {
858        assert!(!*self.poisoned.get_mut());
859    }
860}
861
862impl HandleOwner for FxVolume {}
863
864impl AsRef<ObjectStore> for FxVolume {
865    fn as_ref(&self) -> &ObjectStore {
866        &self.store
867    }
868}
869
870#[async_trait]
871impl FsInspectVolume for FxVolume {
872    async fn get_volume_data(&self) -> Option<VolumeData> {
873        // Don't try to return data if the volume is shutting down.
874        let _guard = self.scope.try_active_guard()?;
875
876        let object_count = self.store().object_count();
877        let (used_bytes, bytes_limit) =
878            self.store.filesystem().allocator().owner_allocation_info(self.store.store_object_id());
879        let encrypted = self.store().crypt().is_some();
880        let port_koid = fasync::EHandle::local().port().as_handle_ref().koid().unwrap().raw_koid();
881        Some(VolumeData { bytes_limit, used_bytes, used_nodes: object_count, encrypted, port_koid })
882    }
883}
884
885pub trait RootDir: FxNode + DirectoryEntry {
886    fn as_directory_entry(self: Arc<Self>) -> Arc<dyn DirectoryEntry>;
887
888    fn serve(self: Arc<Self>, flags: fio::Flags, server_end: ServerEnd<fio::DirectoryMarker>);
889
890    fn as_node(self: Arc<Self>) -> Arc<dyn FxNode>;
891
892    fn register_additional_volume_services(
893        self: Arc<Self>,
894        _svc_dir: &Simple,
895    ) -> Result<(), Error> {
896        Ok(())
897    }
898}
899
900#[derive(Clone)]
901pub struct FxVolumeAndRoot {
902    volume: Arc<FxVolume>,
903    root: Arc<dyn RootDir>,
904
905    // This is used for service connections and anything that isn't the actual volume.
906    admin_scope: ExecutionScope,
907
908    // The outgoing directory that the volume might be served on.
909    outgoing_dir: Arc<Simple>,
910}
911
912impl FxVolumeAndRoot {
913    pub async fn new<T: From<Directory<FxVolume>> + RootDir>(
914        parent: Weak<VolumesDirectory>,
915        store: Arc<ObjectStore>,
916        unique_id: u64,
917        volume_name: String,
918        blob_resupplied_count: Arc<PageRefaultCounter>,
919        memory_pressure_config: MemoryPressureConfig,
920    ) -> Result<Self, Error> {
921        let volume = Arc::new(FxVolume::new(
922            parent,
923            store,
924            unique_id,
925            volume_name,
926            blob_resupplied_count.clone(),
927            memory_pressure_config,
928        )?);
929        let root_object_id = volume.store().root_directory_object_id();
930        let root_dir = Directory::open(&volume, root_object_id).await?;
931        let root = Arc::<T>::new(root_dir.into()) as Arc<dyn RootDir>;
932        volume
933            .cache
934            .get_or_reserve(root_object_id)
935            .await
936            .placeholder()
937            .unwrap()
938            .commit(&root.clone().as_node());
939        Ok(Self {
940            volume,
941            root,
942            admin_scope: ExecutionScope::new(),
943            outgoing_dir: vfs::directory::immutable::simple(),
944        })
945    }
946
947    pub fn volume(&self) -> &Arc<FxVolume> {
948        &self.volume
949    }
950
951    pub fn root(&self) -> &Arc<dyn RootDir> {
952        &self.root
953    }
954
955    pub fn admin_scope(&self) -> &ExecutionScope {
956        &self.admin_scope
957    }
958
959    pub fn outgoing_dir(&self) -> &Arc<Simple> {
960        &self.outgoing_dir
961    }
962
963    // The same as root but downcasted to FxDirectory.
964    pub fn root_dir(&self) -> Arc<FxDirectory> {
965        self.root().clone().into_any().downcast::<FxDirectory>().expect("Invalid type for root")
966    }
967
968    pub fn into_volume(self) -> Arc<FxVolume> {
969        self.volume
970    }
971}
972
973// The correct number here is arguably u64::MAX - 1 (because node 0 is reserved). There's a bug
974// where inspect test cases fail if we try and use that, possibly because of a signed/unsigned bug.
975// See https://fxbug.dev/42168242.  Until that's fixed, we'll have to use i64::MAX.
976const TOTAL_NODES: u64 = i64::MAX as u64;
977
978// An array used to initialize the FilesystemInfo |name| field. This just spells "fxfs" 0-padded to
979// 32 bytes.
980const FXFS_INFO_NAME_FIDL: [i8; 32] = [
981    0x66, 0x78, 0x66, 0x73, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
982    0, 0, 0, 0,
983];
984
985fn info_to_filesystem_info(
986    info: filesystem::Info,
987    block_size: u64,
988    object_count: u64,
989    fs_id: u64,
990) -> fio::FilesystemInfo {
991    fio::FilesystemInfo {
992        total_bytes: info.total_bytes,
993        used_bytes: info.used_bytes,
994        total_nodes: TOTAL_NODES,
995        used_nodes: object_count,
996        // TODO(https://fxbug.dev/42175592): Support free_shared_pool_bytes.
997        free_shared_pool_bytes: 0,
998        fs_id,
999        block_size: block_size as u32,
1000        max_filename_size: fio::MAX_NAME_LENGTH as u32,
1001        fs_type: fidl_fuchsia_fs::VfsType::Fxfs.into_primitive(),
1002        padding: 0,
1003        name: FXFS_INFO_NAME_FIDL,
1004    }
1005}
1006
1007#[cfg(test)]
1008mod tests {
1009    use super::DIRENT_CACHE_LIMIT;
1010    use crate::fuchsia::file::FxFile;
1011    use crate::fuchsia::fxblob::testing::{self as blob_testing, BlobFixture};
1012    use crate::fuchsia::memory_pressure::MemoryPressureLevel;
1013    use crate::fuchsia::pager::PagerBacked;
1014    use crate::fuchsia::profile::{RECORDED, new_profile_state};
1015    use crate::fuchsia::testing::{
1016        TestFixture, TestFixtureOptions, close_dir_checked, close_file_checked, open_dir,
1017        open_dir_checked, open_file, open_file_checked,
1018    };
1019    use crate::fuchsia::volume::{
1020        BASE_READ_AHEAD_SIZE, FxVolume, MemoryPressureConfig, MemoryPressureLevelConfig,
1021    };
1022    use crate::fuchsia::volumes_directory::VolumesDirectory;
1023    use crate::volume::MAX_READ_AHEAD_SIZE;
1024    use delivery_blob::CompressionMode;
1025    use fidl_fuchsia_fxfs::{BytesAndNodes, ProjectIdMarker};
1026    use fidl_fuchsia_io as fio;
1027    use fs_inspect::FsInspectVolume;
1028    use fuchsia_async as fasync;
1029    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
1030    use fuchsia_fs::file;
1031    use fxfs::filesystem::{FxFilesystem, FxFilesystemBuilder};
1032    use fxfs::fsck::{fsck, fsck_volume};
1033    use fxfs::object_store::directory::replace_child;
1034    use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
1035    use fxfs::object_store::volume::root_volume;
1036    use fxfs::object_store::{HandleOptions, ObjectDescriptor, ObjectStore, StoreOptions};
1037    use fxfs_crypt_common::CryptBase;
1038    use fxfs_crypto::{Crypt, WrappingKeyId};
1039    use fxfs_insecure_crypto::new_insecure_crypt;
1040    use refaults_vmo::PageRefaultCounter;
1041    use std::sync::atomic::Ordering;
1042    use std::sync::{Arc, Weak};
1043    use std::time::Duration;
1044    use storage_device::DeviceHolder;
1045    use storage_device::fake_device::FakeDevice;
1046    use zx::Status;
1047
1048    const WRAPPING_KEY_ID: WrappingKeyId = u128::to_le_bytes(123);
1049
1050    #[fuchsia::test(threads = 10)]
1051    async fn test_rename_different_dirs() {
1052        use zx::Event;
1053
1054        let fixture = TestFixture::new().await;
1055        let root = fixture.root();
1056
1057        let src = open_dir_checked(
1058            &root,
1059            "foo",
1060            fio::Flags::FLAG_MAYBE_CREATE
1061                | fio::PERM_READABLE
1062                | fio::PERM_WRITABLE
1063                | fio::Flags::PROTOCOL_DIRECTORY,
1064            Default::default(),
1065        )
1066        .await;
1067
1068        let dst = open_dir_checked(
1069            &root,
1070            "bar",
1071            fio::Flags::FLAG_MAYBE_CREATE
1072                | fio::PERM_READABLE
1073                | fio::PERM_WRITABLE
1074                | fio::Flags::PROTOCOL_DIRECTORY,
1075            Default::default(),
1076        )
1077        .await;
1078
1079        let f = open_file_checked(
1080            &root,
1081            "foo/a",
1082            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1083            &Default::default(),
1084        )
1085        .await;
1086        close_file_checked(f).await;
1087
1088        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1089        Status::ok(status).expect("get_token failed");
1090        src.rename("a", Event::from(dst_token.unwrap()), "b")
1091            .await
1092            .expect("FIDL call failed")
1093            .expect("rename failed");
1094
1095        assert_eq!(
1096            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1097                .await
1098                .expect_err("Open succeeded")
1099                .root_cause()
1100                .downcast_ref::<Status>()
1101                .expect("No status"),
1102            &Status::NOT_FOUND,
1103        );
1104        let f =
1105            open_file_checked(&root, "bar/b", fio::Flags::PROTOCOL_FILE, &Default::default()).await;
1106        close_file_checked(f).await;
1107
1108        close_dir_checked(dst).await;
1109        close_dir_checked(src).await;
1110        fixture.close().await;
1111    }
1112
1113    #[fuchsia::test(threads = 10)]
1114    async fn test_rename_same_dir() {
1115        use zx::Event;
1116        let fixture = TestFixture::new().await;
1117        let root = fixture.root();
1118
1119        let src = open_dir_checked(
1120            &root,
1121            "foo",
1122            fio::Flags::FLAG_MAYBE_CREATE
1123                | fio::PERM_READABLE
1124                | fio::PERM_WRITABLE
1125                | fio::Flags::PROTOCOL_DIRECTORY,
1126            Default::default(),
1127        )
1128        .await;
1129
1130        let f = open_file_checked(
1131            &root,
1132            "foo/a",
1133            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1134            &Default::default(),
1135        )
1136        .await;
1137        close_file_checked(f).await;
1138
1139        let (status, src_token) = src.get_token().await.expect("FIDL call failed");
1140        Status::ok(status).expect("get_token failed");
1141        src.rename("a", Event::from(src_token.unwrap()), "b")
1142            .await
1143            .expect("FIDL call failed")
1144            .expect("rename failed");
1145
1146        assert_eq!(
1147            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1148                .await
1149                .expect_err("Open succeeded")
1150                .root_cause()
1151                .downcast_ref::<Status>()
1152                .expect("No status"),
1153            &Status::NOT_FOUND,
1154        );
1155        let f =
1156            open_file_checked(&root, "foo/b", fio::Flags::PROTOCOL_FILE, &Default::default()).await;
1157        close_file_checked(f).await;
1158
1159        close_dir_checked(src).await;
1160        fixture.close().await;
1161    }
1162
1163    #[fuchsia::test(threads = 10)]
1164    async fn test_rename_overwrites_file() {
1165        use zx::Event;
1166        let fixture = TestFixture::new().await;
1167        let root = fixture.root();
1168
1169        let src = open_dir_checked(
1170            &root,
1171            "foo",
1172            fio::Flags::FLAG_MAYBE_CREATE
1173                | fio::PERM_READABLE
1174                | fio::PERM_WRITABLE
1175                | fio::Flags::PROTOCOL_DIRECTORY,
1176            Default::default(),
1177        )
1178        .await;
1179
1180        let dst = open_dir_checked(
1181            &root,
1182            "bar",
1183            fio::Flags::FLAG_MAYBE_CREATE
1184                | fio::PERM_READABLE
1185                | fio::PERM_WRITABLE
1186                | fio::Flags::PROTOCOL_DIRECTORY,
1187            Default::default(),
1188        )
1189        .await;
1190
1191        // The src file is non-empty.
1192        let src_file = open_file_checked(
1193            &root,
1194            "foo/a",
1195            fio::Flags::FLAG_MAYBE_CREATE
1196                | fio::PERM_READABLE
1197                | fio::PERM_WRITABLE
1198                | fio::Flags::PROTOCOL_FILE,
1199            &Default::default(),
1200        )
1201        .await;
1202        let buf = vec![0xaa as u8; 8192];
1203        file::write(&src_file, buf.as_slice()).await.expect("Failed to write to file");
1204        close_file_checked(src_file).await;
1205
1206        // The dst file is empty (so we can distinguish it).
1207        let f = open_file_checked(
1208            &root,
1209            "bar/b",
1210            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1211            &Default::default(),
1212        )
1213        .await;
1214        close_file_checked(f).await;
1215
1216        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1217        Status::ok(status).expect("get_token failed");
1218        src.rename("a", Event::from(dst_token.unwrap()), "b")
1219            .await
1220            .expect("FIDL call failed")
1221            .expect("rename failed");
1222
1223        assert_eq!(
1224            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1225                .await
1226                .expect_err("Open succeeded")
1227                .root_cause()
1228                .downcast_ref::<Status>()
1229                .expect("No status"),
1230            &Status::NOT_FOUND,
1231        );
1232        let file = open_file_checked(
1233            &root,
1234            "bar/b",
1235            fio::PERM_READABLE | fio::Flags::PROTOCOL_FILE,
1236            &Default::default(),
1237        )
1238        .await;
1239        let buf = file::read(&file).await.expect("read file failed");
1240        assert_eq!(buf, vec![0xaa as u8; 8192]);
1241        close_file_checked(file).await;
1242
1243        close_dir_checked(dst).await;
1244        close_dir_checked(src).await;
1245        fixture.close().await;
1246    }
1247
1248    #[fuchsia::test(threads = 10)]
1249    async fn test_rename_overwrites_dir() {
1250        use zx::Event;
1251        let fixture = TestFixture::new().await;
1252        let root = fixture.root();
1253
1254        let src = open_dir_checked(
1255            &root,
1256            "foo",
1257            fio::Flags::FLAG_MAYBE_CREATE
1258                | fio::PERM_READABLE
1259                | fio::PERM_WRITABLE
1260                | fio::Flags::PROTOCOL_DIRECTORY,
1261            Default::default(),
1262        )
1263        .await;
1264
1265        let dst = open_dir_checked(
1266            &root,
1267            "bar",
1268            fio::Flags::FLAG_MAYBE_CREATE
1269                | fio::PERM_READABLE
1270                | fio::PERM_WRITABLE
1271                | fio::Flags::PROTOCOL_DIRECTORY,
1272            Default::default(),
1273        )
1274        .await;
1275
1276        // The src dir is non-empty.
1277        open_dir_checked(
1278            &root,
1279            "foo/a",
1280            fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
1281            Default::default(),
1282        )
1283        .await;
1284        open_file_checked(
1285            &root,
1286            "foo/a/file",
1287            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1288            &Default::default(),
1289        )
1290        .await;
1291        open_dir_checked(
1292            &root,
1293            "bar/b",
1294            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_DIRECTORY,
1295            Default::default(),
1296        )
1297        .await;
1298
1299        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1300        Status::ok(status).expect("get_token failed");
1301        src.rename("a", Event::from(dst_token.unwrap()), "b")
1302            .await
1303            .expect("FIDL call failed")
1304            .expect("rename failed");
1305
1306        assert_eq!(
1307            open_dir(&root, "foo/a", fio::Flags::PROTOCOL_DIRECTORY, &Default::default())
1308                .await
1309                .expect_err("Open succeeded")
1310                .root_cause()
1311                .downcast_ref::<Status>()
1312                .expect("No status"),
1313            &Status::NOT_FOUND,
1314        );
1315        let f =
1316            open_file_checked(&root, "bar/b/file", fio::Flags::PROTOCOL_FILE, &Default::default())
1317                .await;
1318        close_file_checked(f).await;
1319
1320        close_dir_checked(dst).await;
1321        close_dir_checked(src).await;
1322
1323        fixture.close().await;
1324    }
1325
1326    #[fuchsia::test]
1327    async fn test_background_flush() {
1328        let fixture = TestFixture::new().await;
1329        {
1330            let file = open_file_checked(
1331                fixture.root(),
1332                "file",
1333                fio::Flags::FLAG_MAYBE_CREATE
1334                    | fio::PERM_READABLE
1335                    | fio::PERM_WRITABLE
1336                    | fio::Flags::PROTOCOL_FILE,
1337                &Default::default(),
1338            )
1339            .await;
1340            let object_id = file
1341                .get_attributes(fio::NodeAttributesQuery::ID)
1342                .await
1343                .expect("Fidl get attr")
1344                .expect("get attr")
1345                .1
1346                .id
1347                .unwrap();
1348
1349            // Write some data to the file, which will only go to the cache for now.
1350            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1351
1352            // Initialized to the default size.
1353            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1354            let volume = fixture.volume().volume().clone();
1355
1356            let data_has_persisted = || async {
1357                // We have to reopen the object each time since this is a distinct handle from the
1358                // one managed by the FxFile.
1359                let object =
1360                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1361                        .await
1362                        .expect("open_object failed");
1363                let data = object.contents(8192).await.expect("read failed");
1364                data.len() == 1 && data[..] == [123u8]
1365            };
1366            assert!(!data_has_persisted().await);
1367
1368            fixture.volume().volume().start_background_task(
1369                MemoryPressureConfig {
1370                    mem_normal: MemoryPressureLevelConfig {
1371                        background_task_period: Duration::from_millis(100),
1372                        background_task_initial_delay: Duration::from_millis(100),
1373                        ..Default::default()
1374                    },
1375                    mem_warning: Default::default(),
1376                    mem_critical: Default::default(),
1377                },
1378                None,
1379            );
1380
1381            const MAX_WAIT: Duration = Duration::from_secs(20);
1382            let wait_increments = Duration::from_millis(400);
1383            let mut total_waited = Duration::ZERO;
1384
1385            while total_waited < MAX_WAIT {
1386                fasync::Timer::new(wait_increments).await;
1387                total_waited += wait_increments;
1388
1389                if data_has_persisted().await {
1390                    break;
1391                }
1392            }
1393
1394            assert!(data_has_persisted().await);
1395        }
1396
1397        fixture.close().await;
1398    }
1399
1400    #[fuchsia::test(threads = 2)]
1401    async fn test_background_flush_with_warning_memory_pressure() {
1402        let fixture = TestFixture::new().await;
1403        {
1404            let file = open_file_checked(
1405                fixture.root(),
1406                "file",
1407                fio::Flags::FLAG_MAYBE_CREATE
1408                    | fio::PERM_READABLE
1409                    | fio::PERM_WRITABLE
1410                    | fio::Flags::PROTOCOL_FILE,
1411                &Default::default(),
1412            )
1413            .await;
1414            let object_id = file
1415                .get_attributes(fio::NodeAttributesQuery::ID)
1416                .await
1417                .expect("Fidl get attr")
1418                .expect("get attr")
1419                .1
1420                .id
1421                .unwrap();
1422
1423            // Write some data to the file, which will only go to the cache for now.
1424            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1425
1426            // Initialized to the default size.
1427            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1428            let volume = fixture.volume().volume().clone();
1429
1430            let data_has_persisted = || async {
1431                // We have to reopen the object each time since this is a distinct handle from the
1432                // one managed by the FxFile.
1433                let object =
1434                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1435                        .await
1436                        .expect("open_object failed");
1437                let data = object.contents(8192).await.expect("read failed");
1438                data.len() == 1 && data[..] == [123u8]
1439            };
1440            assert!(!data_has_persisted().await);
1441
1442            // Configure the flush task to only flush quickly on warning.
1443            let flush_config = MemoryPressureConfig {
1444                mem_normal: MemoryPressureLevelConfig {
1445                    background_task_period: Duration::from_secs(20),
1446                    cache_size_limit: DIRENT_CACHE_LIMIT,
1447                    ..Default::default()
1448                },
1449                mem_warning: MemoryPressureLevelConfig {
1450                    background_task_period: Duration::from_millis(100),
1451                    cache_size_limit: 100,
1452                    background_task_initial_delay: Duration::from_millis(100),
1453                    ..Default::default()
1454                },
1455                mem_critical: MemoryPressureLevelConfig {
1456                    background_task_period: Duration::from_secs(20),
1457                    cache_size_limit: 50,
1458                    ..Default::default()
1459                },
1460            };
1461            fixture.volume().volume().start_background_task(
1462                flush_config,
1463                fixture.volumes_directory().memory_pressure_monitor(),
1464            );
1465
1466            // Send the memory pressure update.
1467            fixture
1468                .memory_pressure_proxy()
1469                .on_level_changed(MemoryPressureLevel::Warning)
1470                .await
1471                .expect("Failed to send memory pressure level change");
1472
1473            // Wait a bit of time for the flush to occur (but less than the normal and critical
1474            // periods).
1475            const MAX_WAIT: Duration = Duration::from_secs(3);
1476            let wait_increments = Duration::from_millis(400);
1477            let mut total_waited = Duration::ZERO;
1478
1479            while total_waited < MAX_WAIT {
1480                fasync::Timer::new(wait_increments).await;
1481                total_waited += wait_increments;
1482
1483                if data_has_persisted().await {
1484                    break;
1485                }
1486            }
1487
1488            assert!(data_has_persisted().await);
1489            assert_eq!(fixture.volume().volume().dirent_cache().limit(), 100);
1490        }
1491
1492        fixture.close().await;
1493    }
1494
1495    #[fuchsia::test(threads = 2)]
1496    async fn test_background_flush_with_critical_memory_pressure() {
1497        let fixture = TestFixture::new().await;
1498        {
1499            let file = open_file_checked(
1500                fixture.root(),
1501                "file",
1502                fio::Flags::FLAG_MAYBE_CREATE
1503                    | fio::PERM_READABLE
1504                    | fio::PERM_WRITABLE
1505                    | fio::Flags::PROTOCOL_FILE,
1506                &Default::default(),
1507            )
1508            .await;
1509            let object_id = file
1510                .get_attributes(fio::NodeAttributesQuery::ID)
1511                .await
1512                .expect("Fidl get attr")
1513                .expect("get attr")
1514                .1
1515                .id
1516                .unwrap();
1517
1518            // Write some data to the file, which will only go to the cache for now.
1519            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1520
1521            // Initialized to the default size.
1522            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1523            let volume = fixture.volume().volume().clone();
1524
1525            let data_has_persisted = || async {
1526                // We have to reopen the object each time since this is a distinct handle from the
1527                // one managed by the FxFile.
1528                let object =
1529                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1530                        .await
1531                        .expect("open_object failed");
1532                let data = object.contents(8192).await.expect("read failed");
1533                data.len() == 1 && data[..] == [123u8]
1534            };
1535            assert!(!data_has_persisted().await);
1536
1537            let flush_config = MemoryPressureConfig {
1538                mem_normal: MemoryPressureLevelConfig {
1539                    cache_size_limit: DIRENT_CACHE_LIMIT,
1540                    ..Default::default()
1541                },
1542                mem_warning: MemoryPressureLevelConfig {
1543                    cache_size_limit: 100,
1544                    ..Default::default()
1545                },
1546                mem_critical: MemoryPressureLevelConfig {
1547                    cache_size_limit: 50,
1548                    ..Default::default()
1549                },
1550            };
1551            fixture.volume().volume().start_background_task(
1552                flush_config,
1553                fixture.volumes_directory().memory_pressure_monitor(),
1554            );
1555
1556            // Send the memory pressure update.
1557            fixture
1558                .memory_pressure_proxy()
1559                .on_level_changed(MemoryPressureLevel::Critical)
1560                .await
1561                .expect("Failed to send memory pressure level change");
1562
1563            // Critical memory should trigger a flush immediately so expect a flush very quickly.
1564            const MAX_WAIT: Duration = Duration::from_secs(2);
1565            let wait_increments = Duration::from_millis(400);
1566            let mut total_waited = Duration::ZERO;
1567
1568            while total_waited < MAX_WAIT {
1569                fasync::Timer::new(wait_increments).await;
1570                total_waited += wait_increments;
1571
1572                if data_has_persisted().await {
1573                    break;
1574                }
1575            }
1576
1577            assert!(data_has_persisted().await);
1578            assert_eq!(fixture.volume().volume().dirent_cache().limit(), 50);
1579        }
1580
1581        fixture.close().await;
1582    }
1583
1584    #[fuchsia::test(threads = 2)]
1585    async fn test_memory_pressure_signal_updates_read_ahead_size() {
1586        let fixture = TestFixture::new().await;
1587        {
1588            let flush_config = MemoryPressureConfig {
1589                mem_normal: MemoryPressureLevelConfig {
1590                    read_ahead_size: 12 * 1024,
1591                    ..Default::default()
1592                },
1593                mem_warning: MemoryPressureLevelConfig {
1594                    read_ahead_size: 8 * 1024,
1595                    ..Default::default()
1596                },
1597                mem_critical: MemoryPressureLevelConfig {
1598                    read_ahead_size: 4 * 1024,
1599                    ..Default::default()
1600                },
1601            };
1602            let volume = fixture.volume().volume().clone();
1603            volume.start_background_task(
1604                flush_config,
1605                fixture.volumes_directory().memory_pressure_monitor(),
1606            );
1607            let wait_for_read_ahead_to_change =
1608                async |level: MemoryPressureLevel, expected: u64| {
1609                    fixture
1610                        .memory_pressure_proxy()
1611                        .on_level_changed(level)
1612                        .await
1613                        .expect("Failed to send memory pressure level change");
1614                    const MAX_WAIT: Duration = Duration::from_secs(2);
1615                    let wait_increments = Duration::from_millis(400);
1616                    let mut total_waited = Duration::ZERO;
1617                    while total_waited < MAX_WAIT {
1618                        if volume.read_ahead_size() == expected {
1619                            break;
1620                        }
1621                        fasync::Timer::new(wait_increments).await;
1622                        total_waited += wait_increments;
1623                    }
1624                    assert_eq!(volume.read_ahead_size(), expected);
1625                };
1626            wait_for_read_ahead_to_change(MemoryPressureLevel::Critical, 4 * 1024).await;
1627            wait_for_read_ahead_to_change(MemoryPressureLevel::Warning, 8 * 1024).await;
1628            wait_for_read_ahead_to_change(MemoryPressureLevel::Normal, 12 * 1024).await;
1629            wait_for_read_ahead_to_change(MemoryPressureLevel::Critical, 4 * 1024).await;
1630        }
1631        fixture.close().await;
1632    }
1633
1634    // This test verifies that it is safe to query a volume after it is unmounted in case of a race
1635    // during unmount/shutdown.
1636    #[fuchsia::test(threads = 2)]
1637    async fn test_query_info_unmounted_volume() {
1638        const TEST_VOLUME: &str = "test_1234";
1639        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1640
1641        let fixture = TestFixture::new().await;
1642        {
1643            let volumes_directory = fixture.volumes_directory();
1644            let volume = volumes_directory
1645                .create_and_mount_volume(TEST_VOLUME, Some(crypt.clone()), false, None)
1646                .await
1647                .unwrap();
1648
1649            assert!(volume.volume().get_volume_data().await.is_some());
1650
1651            // Unmount it but keep a reference.
1652            volumes_directory
1653                .lock()
1654                .await
1655                .unmount(volume.volume().store().store_object_id())
1656                .await
1657                .expect("unmount failed");
1658
1659            // This returns None, but doesn't crash.
1660            assert!(volume.volume().get_volume_data().await.is_none());
1661        }
1662        fixture.close().await;
1663    }
1664
1665    #[fuchsia::test]
1666    async fn test_project_limit_persistence() {
1667        const BYTES_LIMIT_1: u64 = 123456;
1668        const NODES_LIMIT_1: u64 = 4321;
1669        const BYTES_LIMIT_2: u64 = 456789;
1670        const NODES_LIMIT_2: u64 = 9876;
1671        const VOLUME_NAME: &str = "A";
1672        const FILE_NAME: &str = "B";
1673        const PROJECT_ID: u64 = 42;
1674        const PROJECT_ID2: u64 = 343;
1675        let volume_store_id;
1676        let node_id;
1677        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
1678        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1679        {
1680            let blob_resupplied_count =
1681                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1682            let volumes_directory = VolumesDirectory::new(
1683                root_volume(filesystem.clone()).await.unwrap(),
1684                Weak::new(),
1685                None,
1686                blob_resupplied_count,
1687                MemoryPressureConfig::default(),
1688            )
1689            .await
1690            .unwrap();
1691
1692            let volume_and_root = volumes_directory
1693                .create_and_mount_volume(VOLUME_NAME, None, false, None)
1694                .await
1695                .expect("create unencrypted volume failed");
1696            volume_store_id = volume_and_root.volume().store().store_object_id();
1697
1698            let (volume_dir_proxy, dir_server_end) =
1699                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1700            volumes_directory
1701                .serve_volume(&volume_and_root, dir_server_end, false)
1702                .expect("serve_volume failed");
1703
1704            let project_proxy =
1705                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1706                    .expect("Unable to connect to project id service");
1707
1708            project_proxy
1709                .set_limit(0, BYTES_LIMIT_1, NODES_LIMIT_1)
1710                .await
1711                .unwrap()
1712                .expect_err("Should not set limits for project id 0");
1713
1714            project_proxy
1715                .set_limit(PROJECT_ID, BYTES_LIMIT_1, NODES_LIMIT_1)
1716                .await
1717                .unwrap()
1718                .expect("To set limits");
1719            {
1720                let BytesAndNodes { bytes, nodes } =
1721                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1722                assert_eq!(bytes, BYTES_LIMIT_1);
1723                assert_eq!(nodes, NODES_LIMIT_1);
1724            }
1725
1726            let file_proxy = {
1727                let (root_proxy, root_server_end) =
1728                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1729                volume_dir_proxy
1730                    .open(
1731                        "root",
1732                        fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
1733                        &Default::default(),
1734                        root_server_end.into_channel(),
1735                    )
1736                    .expect("Failed to open volume root");
1737
1738                open_file_checked(
1739                    &root_proxy,
1740                    FILE_NAME,
1741                    fio::Flags::FLAG_MAYBE_CREATE
1742                        | fio::PERM_READABLE
1743                        | fio::PERM_WRITABLE
1744                        | fio::Flags::PROTOCOL_FILE,
1745                    &Default::default(),
1746                )
1747                .await
1748            };
1749
1750            let (_, immutable_attributes) =
1751                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
1752            node_id = immutable_attributes.id.unwrap();
1753
1754            project_proxy
1755                .set_for_node(node_id, 0)
1756                .await
1757                .unwrap()
1758                .expect_err("Should not set 0 project id");
1759
1760            project_proxy
1761                .set_for_node(node_id, PROJECT_ID)
1762                .await
1763                .unwrap()
1764                .expect("Setting project on node");
1765
1766            project_proxy
1767                .set_limit(PROJECT_ID, BYTES_LIMIT_2, NODES_LIMIT_2)
1768                .await
1769                .unwrap()
1770                .expect("To set limits");
1771            {
1772                let BytesAndNodes { bytes, nodes } =
1773                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1774                assert_eq!(bytes, BYTES_LIMIT_2);
1775                assert_eq!(nodes, NODES_LIMIT_2);
1776            }
1777
1778            assert_eq!(
1779                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1780                PROJECT_ID
1781            );
1782
1783            volumes_directory.terminate().await;
1784            filesystem.close().await.expect("close filesystem failed");
1785        }
1786        device = filesystem.take_device().await;
1787        device.ensure_unique();
1788        device.reopen(false);
1789        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1790        {
1791            fsck(filesystem.clone()).await.expect("Fsck");
1792            fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
1793            let blob_resupplied_count =
1794                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1795            let volumes_directory = VolumesDirectory::new(
1796                root_volume(filesystem.clone()).await.unwrap(),
1797                Weak::new(),
1798                None,
1799                blob_resupplied_count,
1800                MemoryPressureConfig::default(),
1801            )
1802            .await
1803            .unwrap();
1804            let volume_and_root = volumes_directory
1805                .mount_volume(VOLUME_NAME, None, false)
1806                .await
1807                .expect("mount unencrypted volume failed");
1808
1809            let (volume_proxy, _scope) = crate::volumes_directory::serve_startup_volume_proxy(
1810                &volumes_directory,
1811                VOLUME_NAME,
1812            );
1813
1814            let project_proxy = {
1815                let (volume_dir_proxy, dir_server_end) =
1816                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1817                volumes_directory
1818                    .serve_volume(&volume_and_root, dir_server_end, false)
1819                    .expect("serve_volume failed");
1820
1821                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1822                    .expect("Unable to connect to project id service")
1823            };
1824
1825            let usage_bytes_and_nodes = {
1826                let (
1827                    BytesAndNodes { bytes: limit_bytes, nodes: limit_nodes },
1828                    usage_bytes_and_nodes,
1829                ) = project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info");
1830                assert_eq!(limit_bytes, BYTES_LIMIT_2);
1831                assert_eq!(limit_nodes, NODES_LIMIT_2);
1832                usage_bytes_and_nodes
1833            };
1834
1835            // Should be unable to clear the project limit, due to being in use.
1836            project_proxy.clear(PROJECT_ID).await.unwrap().expect("To clear limits");
1837
1838            assert_eq!(
1839                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1840                PROJECT_ID
1841            );
1842            project_proxy
1843                .set_for_node(node_id, PROJECT_ID2)
1844                .await
1845                .unwrap()
1846                .expect("Changing project");
1847            assert_eq!(
1848                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1849                PROJECT_ID2
1850            );
1851
1852            assert_eq!(
1853                project_proxy.info(PROJECT_ID).await.unwrap().expect_err("Expect missing limits"),
1854                Status::NOT_FOUND.into_raw()
1855            );
1856            assert_eq!(
1857                project_proxy.info(PROJECT_ID2).await.unwrap().expect("Fetching project info").1,
1858                usage_bytes_and_nodes
1859            );
1860
1861            std::mem::drop(volume_proxy);
1862            volumes_directory.terminate().await;
1863            std::mem::drop(volumes_directory);
1864            filesystem.close().await.expect("close filesystem failed");
1865        }
1866        device = filesystem.take_device().await;
1867        device.ensure_unique();
1868        device.reopen(false);
1869        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1870        fsck(filesystem.clone()).await.expect("Fsck");
1871        fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
1872        let blob_resupplied_count =
1873            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1874        let volumes_directory = VolumesDirectory::new(
1875            root_volume(filesystem.clone()).await.unwrap(),
1876            Weak::new(),
1877            None,
1878            blob_resupplied_count,
1879            MemoryPressureConfig::default(),
1880        )
1881        .await
1882        .unwrap();
1883        let volume_and_root = volumes_directory
1884            .mount_volume(VOLUME_NAME, None, false)
1885            .await
1886            .expect("mount unencrypted volume failed");
1887        let (volume_dir_proxy, dir_server_end) =
1888            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1889        volumes_directory
1890            .serve_volume(&volume_and_root, dir_server_end, false)
1891            .expect("serve_volume failed");
1892        let project_proxy = connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1893            .expect("Unable to connect to project id service");
1894        assert_eq!(
1895            project_proxy.info(PROJECT_ID).await.unwrap().expect_err("Expect missing limits"),
1896            Status::NOT_FOUND.into_raw()
1897        );
1898        volumes_directory.terminate().await;
1899        std::mem::drop(volumes_directory);
1900        filesystem.close().await.expect("close filesystem failed");
1901    }
1902
1903    #[fuchsia::test]
1904    async fn test_project_limit_accounting() {
1905        const BYTES_LIMIT: u64 = 123456;
1906        const NODES_LIMIT: u64 = 4321;
1907        const VOLUME_NAME: &str = "A";
1908        const FILE_NAME: &str = "B";
1909        const PROJECT_ID: u64 = 42;
1910        let volume_store_id;
1911        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
1912        let first_object_id;
1913        let mut bytes_usage;
1914        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1915        {
1916            let blob_resupplied_count =
1917                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1918            let volumes_directory = VolumesDirectory::new(
1919                root_volume(filesystem.clone()).await.unwrap(),
1920                Weak::new(),
1921                None,
1922                blob_resupplied_count,
1923                MemoryPressureConfig::default(),
1924            )
1925            .await
1926            .unwrap();
1927
1928            let volume_and_root = volumes_directory
1929                .create_and_mount_volume(
1930                    VOLUME_NAME,
1931                    Some(Arc::new(new_insecure_crypt())),
1932                    false,
1933                    None,
1934                )
1935                .await
1936                .expect("create unencrypted volume failed");
1937            volume_store_id = volume_and_root.volume().store().store_object_id();
1938
1939            let (volume_dir_proxy, dir_server_end) =
1940                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1941            volumes_directory
1942                .serve_volume(&volume_and_root, dir_server_end, false)
1943                .expect("serve_volume failed");
1944
1945            let project_proxy =
1946                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1947                    .expect("Unable to connect to project id service");
1948
1949            project_proxy
1950                .set_limit(PROJECT_ID, BYTES_LIMIT, NODES_LIMIT)
1951                .await
1952                .unwrap()
1953                .expect("To set limits");
1954            {
1955                let BytesAndNodes { bytes, nodes } =
1956                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1957                assert_eq!(bytes, BYTES_LIMIT);
1958                assert_eq!(nodes, NODES_LIMIT);
1959            }
1960
1961            let file_proxy = {
1962                let (root_proxy, root_server_end) =
1963                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1964                volume_dir_proxy
1965                    .open(
1966                        "root",
1967                        fio::PERM_READABLE | fio::PERM_WRITABLE,
1968                        &Default::default(),
1969                        root_server_end.into_channel(),
1970                    )
1971                    .expect("Failed to open volume root");
1972
1973                open_file_checked(
1974                    &root_proxy,
1975                    FILE_NAME,
1976                    fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
1977                    &Default::default(),
1978                )
1979                .await
1980            };
1981
1982            assert_eq!(
1983                8192,
1984                file_proxy
1985                    .write(&vec![0xff as u8; 8192])
1986                    .await
1987                    .expect("FIDL call failed")
1988                    .map_err(Status::from_raw)
1989                    .expect("File write was successful")
1990            );
1991            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
1992
1993            {
1994                let BytesAndNodes { bytes, nodes } =
1995                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
1996                assert_eq!(bytes, 0);
1997                assert_eq!(nodes, 0);
1998            }
1999
2000            let (_, immutable_attributes) =
2001                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2002            let node_id = immutable_attributes.id.unwrap();
2003
2004            first_object_id = node_id;
2005            project_proxy
2006                .set_for_node(node_id, PROJECT_ID)
2007                .await
2008                .unwrap()
2009                .expect("Setting project on node");
2010
2011            bytes_usage = {
2012                let BytesAndNodes { bytes, nodes } =
2013                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2014                assert!(bytes > 0);
2015                assert_eq!(nodes, 1);
2016                bytes
2017            };
2018
2019            // Grow the file by a block.
2020            assert_eq!(
2021                8192,
2022                file_proxy
2023                    .write(&vec![0xff as u8; 8192])
2024                    .await
2025                    .expect("FIDL call failed")
2026                    .map_err(Status::from_raw)
2027                    .expect("File write was successful")
2028            );
2029            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
2030            bytes_usage = {
2031                let BytesAndNodes { bytes, nodes } =
2032                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2033                assert!(bytes > bytes_usage);
2034                assert_eq!(nodes, 1);
2035                bytes
2036            };
2037
2038            volumes_directory.terminate().await;
2039            filesystem.close().await.expect("close filesystem failed");
2040        }
2041        device = filesystem.take_device().await;
2042        device.ensure_unique();
2043        device.reopen(false);
2044        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2045        {
2046            fsck(filesystem.clone()).await.expect("Fsck");
2047            fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
2048                .await
2049                .expect("Fsck volume");
2050            let blob_resupplied_count =
2051                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2052            let volumes_directory = VolumesDirectory::new(
2053                root_volume(filesystem.clone()).await.unwrap(),
2054                Weak::new(),
2055                None,
2056                blob_resupplied_count,
2057                MemoryPressureConfig::default(),
2058            )
2059            .await
2060            .unwrap();
2061            let volume_and_root = volumes_directory
2062                .mount_volume(VOLUME_NAME, Some(Arc::new(new_insecure_crypt())), false)
2063                .await
2064                .expect("mount unencrypted volume failed");
2065
2066            let (root_proxy, project_proxy) = {
2067                let (volume_dir_proxy, dir_server_end) =
2068                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2069                volumes_directory
2070                    .serve_volume(&volume_and_root, dir_server_end, false)
2071                    .expect("serve_volume failed");
2072
2073                let (root_proxy, root_server_end) =
2074                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2075                volume_dir_proxy
2076                    .open(
2077                        "root",
2078                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2079                        &Default::default(),
2080                        root_server_end.into_channel(),
2081                    )
2082                    .expect("Failed to open volume root");
2083                let project_proxy = {
2084                    connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2085                        .expect("Unable to connect to project id service")
2086                };
2087                (root_proxy, project_proxy)
2088            };
2089
2090            {
2091                let BytesAndNodes { bytes, nodes } =
2092                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2093                assert_eq!(bytes, bytes_usage);
2094                assert_eq!(nodes, 1);
2095            }
2096
2097            assert_eq!(
2098                project_proxy
2099                    .get_for_node(first_object_id)
2100                    .await
2101                    .unwrap()
2102                    .expect("Checking project"),
2103                PROJECT_ID
2104            );
2105            root_proxy
2106                .unlink(FILE_NAME, &fio::UnlinkOptions::default())
2107                .await
2108                .expect("FIDL call failed")
2109                .expect("unlink failed");
2110            filesystem.graveyard().flush().await;
2111
2112            {
2113                let BytesAndNodes { bytes, nodes } =
2114                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2115                assert_eq!(bytes, 0);
2116                assert_eq!(nodes, 0);
2117            }
2118
2119            let file_proxy = open_file_checked(
2120                &root_proxy,
2121                FILE_NAME,
2122                fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2123                &Default::default(),
2124            )
2125            .await;
2126
2127            let (_, immutable_attributes) =
2128                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2129            let node_id = immutable_attributes.id.unwrap();
2130
2131            project_proxy
2132                .set_for_node(node_id, PROJECT_ID)
2133                .await
2134                .unwrap()
2135                .expect("Applying project");
2136
2137            bytes_usage = {
2138                let BytesAndNodes { bytes, nodes } =
2139                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2140                // Empty file should have less space than the non-empty file from above.
2141                assert!(bytes < bytes_usage);
2142                assert_eq!(nodes, 1);
2143                bytes
2144            };
2145
2146            assert_eq!(
2147                8192,
2148                file_proxy
2149                    .write(&vec![0xff as u8; 8192])
2150                    .await
2151                    .expect("FIDL call failed")
2152                    .map_err(Status::from_raw)
2153                    .expect("File write was successful")
2154            );
2155            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
2156            bytes_usage = {
2157                let BytesAndNodes { bytes, nodes } =
2158                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2159                assert!(bytes > bytes_usage);
2160                assert_eq!(nodes, 1);
2161                bytes
2162            };
2163
2164            // Trim to zero. Bytes should decrease.
2165            file_proxy.resize(0).await.expect("FIDL call failed").expect("Resize file");
2166            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
2167            {
2168                let BytesAndNodes { bytes, nodes } =
2169                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2170                assert!(bytes < bytes_usage);
2171                assert_eq!(nodes, 1);
2172            };
2173
2174            // Dropping node from project. Usage should go to zero.
2175            project_proxy
2176                .clear_for_node(node_id)
2177                .await
2178                .expect("FIDL call failed")
2179                .expect("Clear failed.");
2180            {
2181                let BytesAndNodes { bytes, nodes } =
2182                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2183                assert_eq!(bytes, 0);
2184                assert_eq!(nodes, 0);
2185            };
2186
2187            volumes_directory.terminate().await;
2188            filesystem.close().await.expect("close filesystem failed");
2189        }
2190        device = filesystem.take_device().await;
2191        device.ensure_unique();
2192        device.reopen(false);
2193        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2194        fsck(filesystem.clone()).await.expect("Fsck");
2195        fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
2196            .await
2197            .expect("Fsck volume");
2198        filesystem.close().await.expect("close filesystem failed");
2199    }
2200
2201    #[fuchsia::test]
2202    async fn test_project_node_inheritance() {
2203        const BYTES_LIMIT: u64 = 123456;
2204        const NODES_LIMIT: u64 = 4321;
2205        const VOLUME_NAME: &str = "A";
2206        const DIR_NAME: &str = "B";
2207        const SUBDIR_NAME: &str = "C";
2208        const FILE_NAME: &str = "D";
2209        const PROJECT_ID: u64 = 42;
2210        let volume_store_id;
2211        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
2212        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2213        {
2214            let blob_resupplied_count =
2215                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2216            let volumes_directory = VolumesDirectory::new(
2217                root_volume(filesystem.clone()).await.unwrap(),
2218                Weak::new(),
2219                None,
2220                blob_resupplied_count,
2221                MemoryPressureConfig::default(),
2222            )
2223            .await
2224            .unwrap();
2225
2226            let volume_and_root = volumes_directory
2227                .create_and_mount_volume(
2228                    VOLUME_NAME,
2229                    Some(Arc::new(new_insecure_crypt())),
2230                    false,
2231                    None,
2232                )
2233                .await
2234                .expect("create unencrypted volume failed");
2235            volume_store_id = volume_and_root.volume().store().store_object_id();
2236
2237            let (volume_dir_proxy, dir_server_end) =
2238                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2239            volumes_directory
2240                .serve_volume(&volume_and_root, dir_server_end, false)
2241                .expect("serve_volume failed");
2242
2243            let project_proxy =
2244                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2245                    .expect("Unable to connect to project id service");
2246
2247            project_proxy
2248                .set_limit(PROJECT_ID, BYTES_LIMIT, NODES_LIMIT)
2249                .await
2250                .unwrap()
2251                .expect("To set limits");
2252
2253            let dir_proxy = {
2254                let (root_proxy, root_server_end) =
2255                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2256                volume_dir_proxy
2257                    .open(
2258                        "root",
2259                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2260                        &Default::default(),
2261                        root_server_end.into_channel(),
2262                    )
2263                    .expect("Failed to open volume root");
2264
2265                open_dir_checked(
2266                    &root_proxy,
2267                    DIR_NAME,
2268                    fio::Flags::FLAG_MAYBE_CREATE
2269                        | fio::PERM_READABLE
2270                        | fio::PERM_WRITABLE
2271                        | fio::Flags::PROTOCOL_DIRECTORY,
2272                    Default::default(),
2273                )
2274                .await
2275            };
2276            {
2277                let (_, immutable_attributes) =
2278                    dir_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2279                let node_id = immutable_attributes.id.unwrap();
2280
2281                project_proxy
2282                    .set_for_node(node_id, PROJECT_ID)
2283                    .await
2284                    .unwrap()
2285                    .expect("Setting project on node");
2286            }
2287
2288            let subdir_proxy = open_dir_checked(
2289                &dir_proxy,
2290                SUBDIR_NAME,
2291                fio::Flags::FLAG_MAYBE_CREATE
2292                    | fio::PERM_READABLE
2293                    | fio::PERM_WRITABLE
2294                    | fio::Flags::PROTOCOL_DIRECTORY,
2295                Default::default(),
2296            )
2297            .await;
2298            {
2299                let (_, immutable_attributes) = subdir_proxy
2300                    .get_attributes(fio::NodeAttributesQuery::ID)
2301                    .await
2302                    .unwrap()
2303                    .unwrap();
2304                let node_id = immutable_attributes.id.unwrap();
2305
2306                assert_eq!(
2307                    project_proxy
2308                        .get_for_node(node_id)
2309                        .await
2310                        .unwrap()
2311                        .expect("Setting project on node"),
2312                    PROJECT_ID
2313                );
2314            }
2315
2316            let file_proxy = open_file_checked(
2317                &subdir_proxy,
2318                FILE_NAME,
2319                fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2320                &Default::default(),
2321            )
2322            .await;
2323            {
2324                let (_, immutable_attributes) =
2325                    file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2326                let node_id = immutable_attributes.id.unwrap();
2327
2328                assert_eq!(
2329                    project_proxy
2330                        .get_for_node(node_id)
2331                        .await
2332                        .unwrap()
2333                        .expect("Setting project on node"),
2334                    PROJECT_ID
2335                );
2336            }
2337
2338            // An unnamed temporary file is created slightly differently to a regular file object.
2339            // Just in case, check that it inherits project ID as well.
2340            let tmpfile_proxy = open_file_checked(
2341                &subdir_proxy,
2342                ".",
2343                fio::Flags::PROTOCOL_FILE
2344                    | fio::Flags::FLAG_CREATE_AS_UNNAMED_TEMPORARY
2345                    | fio::PERM_READABLE,
2346                &fio::Options::default(),
2347            )
2348            .await;
2349            {
2350                let (_, immutable_attributes) = tmpfile_proxy
2351                    .get_attributes(fio::NodeAttributesQuery::ID)
2352                    .await
2353                    .unwrap()
2354                    .unwrap();
2355                let node_id: u64 = immutable_attributes.id.unwrap();
2356                assert_eq!(
2357                    project_proxy
2358                        .get_for_node(node_id)
2359                        .await
2360                        .unwrap()
2361                        .expect("Setting project on node"),
2362                    PROJECT_ID
2363                );
2364            }
2365
2366            let BytesAndNodes { nodes, .. } =
2367                project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2368            assert_eq!(nodes, 3);
2369            volumes_directory.terminate().await;
2370            filesystem.close().await.expect("close filesystem failed");
2371        }
2372        device = filesystem.take_device().await;
2373        device.ensure_unique();
2374        device.reopen(false);
2375        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2376        fsck(filesystem.clone()).await.expect("Fsck");
2377        fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
2378            .await
2379            .expect("Fsck volume");
2380        filesystem.close().await.expect("close filesystem failed");
2381    }
2382
2383    #[fuchsia::test]
2384    async fn test_project_listing() {
2385        const VOLUME_NAME: &str = "A";
2386        const FILE_NAME: &str = "B";
2387        const NON_ZERO_PROJECT_ID: u64 = 3;
2388        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
2389        let volume_store_id;
2390        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2391        {
2392            let blob_resupplied_count =
2393                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2394            let volumes_directory = VolumesDirectory::new(
2395                root_volume(filesystem.clone()).await.unwrap(),
2396                Weak::new(),
2397                None,
2398                blob_resupplied_count,
2399                MemoryPressureConfig::default(),
2400            )
2401            .await
2402            .unwrap();
2403            let volume_and_root = volumes_directory
2404                .create_and_mount_volume(VOLUME_NAME, None, false, None)
2405                .await
2406                .expect("create unencrypted volume failed");
2407            volume_store_id = volume_and_root.volume().store().store_object_id();
2408
2409            let (volume_dir_proxy, dir_server_end) =
2410                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2411            volumes_directory
2412                .serve_volume(&volume_and_root, dir_server_end, false)
2413                .expect("serve_volume failed");
2414            let project_proxy =
2415                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2416                    .expect("Unable to connect to project id service");
2417            // This is just to ensure that the small numbers below can be used for this test.
2418            assert!(FxVolume::MAX_PROJECT_ENTRIES >= 4);
2419            // Create a bunch of proxies. 3 more than the limit to ensure pagination.
2420            let num_entries = u64::try_from(FxVolume::MAX_PROJECT_ENTRIES + 3).unwrap();
2421            for project_id in 1..=num_entries {
2422                project_proxy.set_limit(project_id, 1, 1).await.unwrap().expect("To set limits");
2423            }
2424
2425            // Add one usage entry to be interspersed with the limit entries. Verifies that the
2426            // iterator will progress passed it with no effect.
2427            let file_proxy = {
2428                let (root_proxy, root_server_end) =
2429                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2430                volume_dir_proxy
2431                    .open(
2432                        "root",
2433                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2434                        &Default::default(),
2435                        root_server_end.into_channel(),
2436                    )
2437                    .expect("Failed to open volume root");
2438
2439                open_file_checked(
2440                    &root_proxy,
2441                    FILE_NAME,
2442                    fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2443                    &Default::default(),
2444                )
2445                .await
2446            };
2447            let (_, immutable_attributes) =
2448                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2449            let node_id = immutable_attributes.id.unwrap();
2450            project_proxy
2451                .set_for_node(node_id, NON_ZERO_PROJECT_ID)
2452                .await
2453                .unwrap()
2454                .expect("Setting project on node");
2455            {
2456                let BytesAndNodes { nodes, .. } = project_proxy
2457                    .info(NON_ZERO_PROJECT_ID)
2458                    .await
2459                    .unwrap()
2460                    .expect("Fetching project info")
2461                    .1;
2462                assert_eq!(nodes, 1);
2463            }
2464
2465            // If this `unwrap()` fails, it is likely the MAX_PROJECT_ENTRIES is too large for fidl.
2466            let (mut entries, mut next_token) =
2467                project_proxy.list(None).await.unwrap().expect("To get project listing");
2468            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2469            assert!(next_token.is_some());
2470            assert!(entries.contains(&1));
2471            assert!(entries.contains(&3));
2472            assert!(!entries.contains(&num_entries));
2473            // Page two should have a small set at the end.
2474            (entries, next_token) = project_proxy
2475                .list(next_token.as_deref())
2476                .await
2477                .unwrap()
2478                .expect("To get project listing");
2479            assert_eq!(entries.len(), 3);
2480            assert!(next_token.is_none());
2481            assert!(entries.contains(&num_entries));
2482            assert!(!entries.contains(&1));
2483            assert!(!entries.contains(&3));
2484            // Delete a couple and list all again, but one has usage still.
2485            project_proxy.clear(1).await.unwrap().expect("Clear project");
2486            project_proxy.clear(3).await.unwrap().expect("Clear project");
2487            (entries, next_token) =
2488                project_proxy.list(None).await.unwrap().expect("To get project listing");
2489            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2490            assert!(next_token.is_some());
2491            assert!(!entries.contains(&num_entries));
2492            assert!(!entries.contains(&1));
2493            assert!(entries.contains(&3));
2494            (entries, next_token) = project_proxy
2495                .list(next_token.as_deref())
2496                .await
2497                .unwrap()
2498                .expect("To get project listing");
2499            assert_eq!(entries.len(), 2);
2500            assert!(next_token.is_none());
2501            assert!(entries.contains(&num_entries));
2502            // Delete two more to hit the edge case.
2503            project_proxy.clear(2).await.unwrap().expect("Clear project");
2504            project_proxy.clear(4).await.unwrap().expect("Clear project");
2505            (entries, next_token) =
2506                project_proxy.list(None).await.unwrap().expect("To get project listing");
2507            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2508            assert!(next_token.is_none());
2509            assert!(entries.contains(&num_entries));
2510            volumes_directory.terminate().await;
2511            filesystem.close().await.expect("close filesystem failed");
2512        }
2513        device = filesystem.take_device().await;
2514        device.ensure_unique();
2515        device.reopen(false);
2516        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2517        fsck(filesystem.clone()).await.expect("Fsck");
2518        fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
2519        filesystem.close().await.expect("close filesystem failed");
2520    }
2521
2522    #[fuchsia::test(threads = 10)]
2523    async fn test_profile_blob() {
2524        let mut hashes = Vec::new();
2525        let device = {
2526            let fixture = blob_testing::new_blob_fixture().await;
2527
2528            for i in 0..3u64 {
2529                let hash =
2530                    fixture.write_blob(i.to_le_bytes().as_slice(), CompressionMode::Never).await;
2531                hashes.push(hash);
2532            }
2533            fixture.close().await
2534        };
2535        device.ensure_unique();
2536
2537        device.reopen(false);
2538        let mut device = {
2539            let fixture = blob_testing::open_blob_fixture(device).await;
2540            fixture
2541                .volume()
2542                .volume()
2543                .record_or_replay_profile(new_profile_state(true), "foo")
2544                .await
2545                .expect("Recording");
2546
2547            // Page in the zero offsets only to avoid readahead strangeness.
2548            let mut writable = [0u8];
2549            for hash in &hashes {
2550                let vmo = fixture.get_blob_vmo(*hash).await;
2551                vmo.read(&mut writable, 0).expect("Vmo read");
2552            }
2553            fixture.volume().volume().stop_profile_tasks().await;
2554            fixture.close().await
2555        };
2556
2557        // Do this multiple times to ensure that the re-recording doesn't drop anything.
2558        for i in 0..3 {
2559            device.ensure_unique();
2560            device.reopen(false);
2561            let fixture = blob_testing::open_blob_fixture(device).await;
2562            {
2563                // Ensure that nothing is paged in right now.
2564                for hash in &hashes {
2565                    let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2566                    assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
2567                }
2568
2569                fixture
2570                    .volume()
2571                    .volume()
2572                    .record_or_replay_profile(new_profile_state(true), "foo")
2573                    .await
2574                    .expect("Replaying");
2575
2576                // Move the file in flight to ensure a new version lands to be used next time.
2577                {
2578                    let store_id = fixture.volume().volume().store().store_object_id();
2579                    let dir = fixture.volume().volume().get_profile_directory().await.unwrap();
2580                    let old_file = dir.lookup("foo").await.unwrap().unwrap().0;
2581                    let mut transaction = fixture
2582                        .fs()
2583                        .clone()
2584                        .new_transaction(
2585                            lock_keys!(
2586                                LockKey::object(store_id, dir.object_id()),
2587                                LockKey::object(store_id, old_file),
2588                            ),
2589                            Options::default(),
2590                        )
2591                        .await
2592                        .unwrap();
2593                    replace_child(&mut transaction, Some((&dir, "foo")), (&dir, &i.to_string()))
2594                        .await
2595                        .expect("Replace old profile.");
2596                    transaction.commit().await.unwrap();
2597                    assert!(
2598                        dir.lookup("foo").await.unwrap().is_none(),
2599                        "Old profile should be moved"
2600                    );
2601                }
2602
2603                // Await all data being played back by checking that things have paged in.
2604                for hash in &hashes {
2605                    // Fetch vmo this way as well to ensure that the open is counting the file as
2606                    // used in the current recording.
2607                    let _vmo = fixture.get_blob_vmo(*hash).await;
2608                    let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2609                    while blob.vmo().info().unwrap().committed_bytes == 0 {
2610                        fasync::Timer::new(Duration::from_millis(25)).await;
2611                    }
2612                }
2613
2614                // Complete the recording.
2615                fixture.volume().volume().stop_profile_tasks().await;
2616            }
2617            device = fixture.close().await;
2618        }
2619    }
2620
2621    #[fuchsia::test(threads = 10)]
2622    async fn test_profile_file() {
2623        let mut hashes = Vec::new();
2624        let crypt_file_id;
2625        let device = {
2626            let fixture = TestFixture::new().await;
2627            // Include a crypt file to access during the recording. It should not be added.
2628            {
2629                let crypt_dir = open_dir_checked(
2630                    fixture.root(),
2631                    "crypt_dir",
2632                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2633                    Default::default(),
2634                )
2635                .await;
2636                let crypt: Arc<CryptBase> = fixture.crypt().unwrap();
2637                crypt
2638                    .add_wrapping_key(WRAPPING_KEY_ID, [1; 32].into())
2639                    .expect("add_wrapping_key failed");
2640                crypt_dir
2641                    .update_attributes(&fio::MutableNodeAttributes {
2642                        wrapping_key_id: Some(WRAPPING_KEY_ID),
2643                        ..Default::default()
2644                    })
2645                    .await
2646                    .expect("update_attributes wire call failed")
2647                    .expect("update_attributes failed");
2648                let crypt_file = open_file_checked(
2649                    &crypt_dir,
2650                    "crypt_file",
2651                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2652                    &Default::default(),
2653                )
2654                .await;
2655                crypt_file.write("asdf".as_bytes()).await.unwrap().expect("Writing crypt file");
2656                crypt_file_id = crypt_file
2657                    .get_attributes(fio::NodeAttributesQuery::ID)
2658                    .await
2659                    .unwrap()
2660                    .expect("Get id")
2661                    .1
2662                    .id
2663                    .expect("Reading id in response");
2664            }
2665
2666            for i in 0..3u64 {
2667                let file_proxy = open_file_checked(
2668                    fixture.root(),
2669                    &i.to_string(),
2670                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2671                    &Default::default(),
2672                )
2673                .await;
2674                file_proxy.write(i.to_le_bytes().as_slice()).await.unwrap().expect("Writing file");
2675                let id = file_proxy
2676                    .get_attributes(fio::NodeAttributesQuery::ID)
2677                    .await
2678                    .unwrap()
2679                    .expect("Get id")
2680                    .1
2681                    .id
2682                    .expect("Reading id in response");
2683                hashes.push((i, id));
2684            }
2685            fixture.close().await
2686        };
2687        device.ensure_unique();
2688
2689        device.reopen(false);
2690        let mut device = {
2691            let fixture = TestFixture::open(
2692                device,
2693                TestFixtureOptions { format: false, ..Default::default() },
2694            )
2695            .await;
2696            fixture
2697                .volume()
2698                .volume()
2699                .record_or_replay_profile(new_profile_state(false), "foo")
2700                .await
2701                .expect("Recording");
2702
2703            {
2704                let crypt: Arc<CryptBase> = fixture.crypt().unwrap();
2705                crypt
2706                    .add_wrapping_key(WRAPPING_KEY_ID, [1; 32].into())
2707                    .expect("add wrapping key failed");
2708                let crypt_file = open_file_checked(
2709                    fixture.root(),
2710                    "crypt_dir/crypt_file",
2711                    fio::PERM_READABLE,
2712                    &Default::default(),
2713                )
2714                .await;
2715                crypt_file.read(1).await.unwrap().expect("Reading crypt file");
2716            }
2717            // Page in the zero offsets only to avoid readahead strangeness.
2718            for (i, _) in &hashes {
2719                let file_proxy = open_file_checked(
2720                    fixture.root(),
2721                    &i.to_string(),
2722                    fio::PERM_READABLE,
2723                    &Default::default(),
2724                )
2725                .await;
2726                file_proxy.read(1).await.unwrap().expect("Reading file");
2727            }
2728            fixture.volume().volume().stop_profile_tasks().await;
2729            fixture.close().await
2730        };
2731
2732        // Do this multiple times to ensure that the re-recording doesn't drop anything.
2733        for i in 0..3 {
2734            device.ensure_unique();
2735            device.reopen(false);
2736            let fixture = TestFixture::open(
2737                device,
2738                TestFixtureOptions { format: false, ..Default::default() },
2739            )
2740            .await;
2741            {
2742                // Need to get the root vmo to check committed bytes.
2743                let volume = fixture.volume().volume().clone();
2744                // Ensure that nothing is paged in right now.
2745                {
2746                    let crypt_file = volume
2747                        .get_or_load_node(crypt_file_id, ObjectDescriptor::File, None)
2748                        .await
2749                        .expect("Opening file internally")
2750                        .into_any()
2751                        .downcast::<FxFile>()
2752                        .expect("Should be file");
2753                    assert_eq!(crypt_file.vmo().info().unwrap().committed_bytes, 0);
2754                }
2755                for (_, id) in &hashes {
2756                    let file = volume
2757                        .get_or_load_node(*id, ObjectDescriptor::File, None)
2758                        .await
2759                        .expect("Opening file internally")
2760                        .into_any()
2761                        .downcast::<FxFile>()
2762                        .expect("Should be file");
2763                    assert_eq!(file.vmo().info().unwrap().committed_bytes, 0);
2764                }
2765
2766                fixture
2767                    .volume()
2768                    .volume()
2769                    .record_or_replay_profile(new_profile_state(false), "foo")
2770                    .await
2771                    .expect("Replaying");
2772
2773                // Move the file in flight to ensure a new version lands to be used next time.
2774                {
2775                    let store_id = fixture.volume().volume().store().store_object_id();
2776                    let dir = fixture.volume().volume().get_profile_directory().await.unwrap();
2777                    let old_file = dir.lookup("foo").await.unwrap().unwrap().0;
2778                    let mut transaction = fixture
2779                        .fs()
2780                        .clone()
2781                        .new_transaction(
2782                            lock_keys!(
2783                                LockKey::object(store_id, dir.object_id()),
2784                                LockKey::object(store_id, old_file),
2785                            ),
2786                            Options::default(),
2787                        )
2788                        .await
2789                        .unwrap();
2790                    replace_child(&mut transaction, Some((&dir, "foo")), (&dir, &i.to_string()))
2791                        .await
2792                        .expect("Replace old profile.");
2793                    transaction.commit().await.unwrap();
2794                    assert!(
2795                        dir.lookup("foo").await.unwrap().is_none(),
2796                        "Old profile should be moved"
2797                    );
2798                }
2799
2800                // Await all data being played back by checking that things have paged in.
2801                for (_, id) in &hashes {
2802                    let file = volume
2803                        .get_or_load_node(*id, ObjectDescriptor::File, None)
2804                        .await
2805                        .expect("Opening file internally")
2806                        .into_any()
2807                        .downcast::<FxFile>()
2808                        .expect("Should be file");
2809                    while file.vmo().info().unwrap().committed_bytes == 0 {
2810                        fasync::Timer::new(Duration::from_millis(25)).await;
2811                    }
2812                }
2813                // The crypt file access should not have been recorded or replayed.
2814                {
2815                    let crypt_file = volume
2816                        .get_or_load_node(crypt_file_id, ObjectDescriptor::File, None)
2817                        .await
2818                        .expect("Opening file internally")
2819                        .into_any()
2820                        .downcast::<FxFile>()
2821                        .expect("Should be file");
2822                    assert_eq!(crypt_file.vmo().info().unwrap().committed_bytes, 0);
2823                }
2824
2825                // Open all the files to show that they have been used.
2826                for (i, _) in &hashes {
2827                    let _file_proxy = open_file_checked(
2828                        fixture.root(),
2829                        &i.to_string(),
2830                        fio::PERM_READABLE,
2831                        &Default::default(),
2832                    )
2833                    .await;
2834                }
2835
2836                // Complete the recording.
2837                fixture.volume().volume().stop_profile_tasks().await;
2838            }
2839            device = fixture.close().await;
2840        }
2841    }
2842
2843    #[fuchsia::test(threads = 10)]
2844    async fn test_profile_update() {
2845        let mut hashes = Vec::new();
2846        let device = {
2847            let fixture = blob_testing::new_blob_fixture().await;
2848            for i in 0..2u64 {
2849                let hash =
2850                    fixture.write_blob(i.to_le_bytes().as_slice(), CompressionMode::Never).await;
2851                hashes.push(hash);
2852            }
2853            fixture.close().await
2854        };
2855        device.ensure_unique();
2856
2857        device.reopen(false);
2858        let device = {
2859            let fixture = blob_testing::open_blob_fixture(device).await;
2860
2861            {
2862                let volume = fixture.volume().volume();
2863                volume
2864                    .record_or_replay_profile(new_profile_state(true), "foo")
2865                    .await
2866                    .expect("Recording");
2867
2868                let original_recorded = RECORDED.load(Ordering::Relaxed);
2869
2870                // Page in the zero offsets only to avoid readahead strangeness.
2871                {
2872                    let mut writable = [0u8];
2873                    let hash = &hashes[0];
2874                    let vmo = fixture.get_blob_vmo(*hash).await;
2875                    vmo.read(&mut writable, 0).expect("Vmo read");
2876                }
2877
2878                // The recording happens asynchronously, so we must wait.  This is crude, but it's
2879                // only for testing and it's simple.
2880                while RECORDED.load(Ordering::Relaxed) == original_recorded {
2881                    fasync::Timer::new(std::time::Duration::from_millis(10)).await;
2882                }
2883
2884                volume.stop_profile_tasks().await;
2885            }
2886            fixture.close().await
2887        };
2888
2889        device.ensure_unique();
2890        device.reopen(false);
2891        let fixture = blob_testing::open_blob_fixture(device).await;
2892        {
2893            // Need to get the root vmo to check committed bytes.
2894            // Ensure that nothing is paged in right now.
2895            for hash in &hashes {
2896                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2897                assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
2898            }
2899
2900            let volume = fixture.volume().volume();
2901
2902            volume
2903                .record_or_replay_profile(new_profile_state(true), "foo")
2904                .await
2905                .expect("Replaying");
2906
2907            // Await all data being played back by checking that things have paged in.
2908            {
2909                let hash = &hashes[0];
2910                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2911                while blob.vmo().info().unwrap().committed_bytes == 0 {
2912                    fasync::Timer::new(Duration::from_millis(25)).await;
2913                }
2914            }
2915
2916            let original_recorded = RECORDED.load(Ordering::Relaxed);
2917
2918            // Record the new profile that will overwrite it.
2919            {
2920                let mut writable = [0u8];
2921                let hash = &hashes[1];
2922                let vmo = fixture.get_blob_vmo(*hash).await;
2923                vmo.read(&mut writable, 0).expect("Vmo read");
2924            }
2925
2926            // The recording happens asynchronously, so we must wait.  This is crude, but it's only
2927            // for testing and it's simple.
2928            while RECORDED.load(Ordering::Relaxed) == original_recorded {
2929                fasync::Timer::new(std::time::Duration::from_millis(10)).await;
2930            }
2931
2932            // Complete the recording.
2933            volume.stop_profile_tasks().await;
2934        }
2935        let device = fixture.close().await;
2936
2937        device.ensure_unique();
2938        device.reopen(false);
2939        let fixture = blob_testing::open_blob_fixture(device).await;
2940        {
2941            // Need to get the root vmo to check committed bytes.
2942            // Ensure that nothing is paged in right now.
2943            for hash in &hashes {
2944                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2945                assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
2946            }
2947
2948            fixture
2949                .volume()
2950                .volume()
2951                .record_or_replay_profile(new_profile_state(true), "foo")
2952                .await
2953                .expect("Replaying");
2954
2955            // Await all data being played back by checking that things have paged in.
2956            {
2957                let hash = &hashes[1];
2958                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2959                while blob.vmo().info().unwrap().committed_bytes == 0 {
2960                    fasync::Timer::new(Duration::from_millis(25)).await;
2961                }
2962            }
2963
2964            // Complete the recording.
2965            fixture.volume().volume().stop_profile_tasks().await;
2966
2967            // Verify that first blob was not paged in as the it should be dropped from the profile.
2968            {
2969                let hash = &hashes[0];
2970                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2971                assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
2972            }
2973        }
2974        fixture.close().await;
2975    }
2976
2977    #[fuchsia::test(threads = 10)]
2978    async fn test_unencrypted_volume() {
2979        let fixture = TestFixture::new_unencrypted().await;
2980        let root = fixture.root();
2981
2982        let f = open_file_checked(
2983            &root,
2984            "foo",
2985            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
2986            &Default::default(),
2987        )
2988        .await;
2989        close_file_checked(f).await;
2990
2991        fixture.close().await;
2992    }
2993
2994    #[fuchsia::test]
2995    async fn test_read_only_unencrypted_volume() {
2996        // Make a new Fxfs filesystem with an unencrypted volume named "vol".
2997        let fs = {
2998            let device = fxfs::filesystem::mkfs_with_volume(
2999                DeviceHolder::new(FakeDevice::new(8192, 512)),
3000                "vol",
3001                None,
3002            )
3003            .await
3004            .unwrap();
3005            // Re-open the device as read-only and mount the filesystem as read-only.
3006            device.reopen(true);
3007            FxFilesystemBuilder::new().read_only(true).open(device).await.unwrap()
3008        };
3009        // Ensure we can access the volume and gracefully terminate any tasks.
3010        {
3011            let root_volume = root_volume(fs.clone()).await.unwrap();
3012            let store = root_volume.volume("vol", StoreOptions::default()).await.unwrap();
3013            let unique_id = store.store_object_id();
3014            let blob_resupplied_count =
3015                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
3016            let volume = FxVolume::new(
3017                Weak::new(),
3018                store,
3019                unique_id,
3020                "vol".to_owned(),
3021                blob_resupplied_count,
3022                MemoryPressureConfig::default(),
3023            )
3024            .unwrap();
3025            volume.terminate().await;
3026        }
3027        // Close the filesystem, and make sure we don't have any dangling references.
3028        fs.close().await.unwrap();
3029        let device = fs.take_device().await;
3030        device.ensure_unique();
3031    }
3032
3033    #[fuchsia::test]
3034    async fn test_read_only_encrypted_volume() {
3035        let crypt: Arc<CryptBase> = Arc::new(new_insecure_crypt());
3036        // Make a new Fxfs filesystem with an encrypted volume named "vol".
3037        let fs = {
3038            let device = fxfs::filesystem::mkfs_with_volume(
3039                DeviceHolder::new(FakeDevice::new(8192, 512)),
3040                "vol",
3041                Some(crypt.clone()),
3042            )
3043            .await
3044            .unwrap();
3045            // Re-open the device as read-only and mount the filesystem as read-only.
3046            device.reopen(true);
3047            FxFilesystemBuilder::new().read_only(true).open(device).await.unwrap()
3048        };
3049        // Ensure we can access the volume and gracefully terminate any tasks.
3050        {
3051            let root_volume = root_volume(fs.clone()).await.unwrap();
3052            let store = root_volume
3053                .volume("vol", StoreOptions { crypt: Some(crypt), ..StoreOptions::default() })
3054                .await
3055                .unwrap();
3056            let unique_id = store.store_object_id();
3057            let blob_resupplied_count =
3058                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
3059            let volume = FxVolume::new(
3060                Weak::new(),
3061                store,
3062                unique_id,
3063                "vol".to_owned(),
3064                blob_resupplied_count,
3065                MemoryPressureConfig::default(),
3066            )
3067            .unwrap();
3068            volume.terminate().await;
3069        }
3070        // Close the filesystem, and make sure we don't have any dangling references.
3071        fs.close().await.unwrap();
3072        let device = fs.take_device().await;
3073        device.ensure_unique();
3074    }
3075
3076    #[fuchsia::test]
3077    fn test_read_ahead_sizes() {
3078        let config = MemoryPressureConfig::default();
3079        assert!(config.mem_normal.read_ahead_size % BASE_READ_AHEAD_SIZE == 0);
3080        assert_eq!(config.mem_normal.read_ahead_size, MAX_READ_AHEAD_SIZE);
3081
3082        assert!(config.mem_warning.read_ahead_size % BASE_READ_AHEAD_SIZE == 0);
3083
3084        assert_eq!(config.mem_critical.read_ahead_size, BASE_READ_AHEAD_SIZE);
3085    }
3086}