Skip to main content

fxfs_platform_testing/fuchsia/
volume.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::component::map_to_raw_status;
6use crate::fuchsia::directory::FxDirectory;
7use crate::fuchsia::dirent_cache::DirentCache;
8use crate::fuchsia::file::{FlushType, FxFile};
9use crate::fuchsia::memory_pressure::{MemoryPressureLevel, MemoryPressureMonitor};
10use crate::fuchsia::node::{FxNode, GetResult, NodeCache};
11use crate::fuchsia::pager::Pager;
12use crate::fuchsia::profile::ProfileState;
13use crate::fuchsia::symlink::FxSymlink;
14use crate::fuchsia::volumes_directory::VolumesDirectory;
15use anyhow::{Error, bail, ensure};
16use async_trait::async_trait;
17use fidl::endpoints::ServerEnd;
18use fidl_fuchsia_fxfs::{
19    BytesAndNodes, FileBackedVolumeProviderRequest, FileBackedVolumeProviderRequestStream,
20    ProjectIdRequest, ProjectIdRequestStream, ProjectIterToken,
21};
22use fidl_fuchsia_io as fio;
23use fs_inspect::{FsInspectVolume, VolumeData};
24use fuchsia_async as fasync;
25use fuchsia_async::epoch::Epoch;
26use fuchsia_sync::Mutex;
27use futures::channel::oneshot;
28use futures::stream::{self, FusedStream, Stream};
29use futures::{FutureExt, StreamExt, TryStreamExt};
30use fxfs::errors::FxfsError;
31use fxfs::filesystem::{self, SyncOptions};
32use fxfs::future_with_guard::FutureWithGuard;
33use fxfs::log::*;
34use fxfs::object_store::directory::Directory;
35use fxfs::object_store::project_id::ProjectIdExt;
36use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
37use fxfs::object_store::{
38    DirType, HandleOptions, HandleOwner, ObjectDescriptor, ObjectStore, ProjectId,
39};
40use refaults_vmo::PageRefaultCounter;
41use std::future::Future;
42use std::pin::pin;
43#[cfg(any(test, feature = "testing"))]
44use std::sync::atomic::AtomicBool;
45use std::sync::atomic::{AtomicU64, Ordering};
46use std::sync::{Arc, Weak};
47use std::time::Duration;
48use vfs::directory::entry::DirectoryEntry;
49use vfs::directory::simple::Simple;
50use vfs::execution_scope::ExecutionScope;
51
52// LINT.IfChange
53// TODO:(b/299919008) Fix this number to something reasonable, or maybe just for fxblob.
54const DIRENT_CACHE_LIMIT: usize = 2000;
55// LINT.ThenChange(//src/storage/stressor/src/aggressive.rs)
56
57/// The read ahead/around size to target. Increase reads to be this size within the restrictions of
58/// the format for the target object.
59pub const READ_AHEAD_SIZE: u64 = 128 * 1024;
60
61const PROFILE_DIRECTORY: &str = "profiles";
62
63#[derive(Clone, Copy, Debug)]
64pub struct MemoryPressureLevelConfig {
65    /// The period to wait between flushes, as well as perform other background maintenance tasks
66    /// (e.g. purging caches).
67    pub background_task_period: Duration,
68
69    /// The limit of cached nodes.
70    pub cache_size_limit: usize,
71
72    /// The initial delay before the background task runs. The background task has a longer initial
73    /// delay to avoid running the task during boot.
74    pub background_task_initial_delay: Duration,
75}
76
77impl Default for MemoryPressureLevelConfig {
78    fn default() -> Self {
79        Self {
80            background_task_period: Duration::from_secs(20),
81            cache_size_limit: DIRENT_CACHE_LIMIT,
82            background_task_initial_delay: Duration::from_secs(70),
83        }
84    }
85}
86
87#[derive(Clone, Copy, Debug)]
88pub struct MemoryPressureConfig {
89    /// The configuration to use at [`MemoryPressureLevel::Normal`].
90    pub mem_normal: MemoryPressureLevelConfig,
91
92    /// The configuration to use at [`MemoryPressureLevel::Warning`].
93    pub mem_warning: MemoryPressureLevelConfig,
94
95    /// The configuration to use at [`MemoryPressureLevel::Critical`].
96    pub mem_critical: MemoryPressureLevelConfig,
97}
98
99impl MemoryPressureConfig {
100    pub fn for_level(&self, level: &MemoryPressureLevel) -> &MemoryPressureLevelConfig {
101        match level {
102            MemoryPressureLevel::Normal => &self.mem_normal,
103            MemoryPressureLevel::Warning => &self.mem_warning,
104            MemoryPressureLevel::Critical => &self.mem_critical,
105        }
106    }
107}
108
109impl Default for MemoryPressureConfig {
110    fn default() -> Self {
111        // TODO(https://fxbug.dev/42061389): investigate a smarter strategy for determining flush
112        // frequency.
113        Self {
114            mem_normal: MemoryPressureLevelConfig {
115                background_task_period: Duration::from_secs(20),
116                cache_size_limit: DIRENT_CACHE_LIMIT,
117                background_task_initial_delay: Duration::from_secs(70),
118            },
119            mem_warning: MemoryPressureLevelConfig {
120                background_task_period: Duration::from_secs(5),
121                cache_size_limit: 100,
122                background_task_initial_delay: Duration::from_secs(5),
123            },
124            mem_critical: MemoryPressureLevelConfig {
125                background_task_period: Duration::from_millis(1500),
126                cache_size_limit: 20,
127                background_task_initial_delay: Duration::from_millis(1500),
128            },
129        }
130    }
131}
132
133/// FxVolume represents an opened volume. It is also a (weak) cache for all opened Nodes within the
134/// volume.
135pub struct FxVolume {
136    parent: Weak<VolumesDirectory>,
137    cache: NodeCache,
138    store: Arc<ObjectStore>,
139    pager: Pager,
140    executor: fasync::EHandle,
141    name: String,
142
143    // A tuple of the actual task and a channel to signal to terminate the task.
144    background_task: Mutex<Option<(fasync::Task<()>, oneshot::Sender<()>)>>,
145
146    // Unique identifier of the filesystem that owns this volume.
147    fs_id: u64,
148
149    // The execution scope for this volume.
150    scope: ExecutionScope,
151
152    dirent_cache: DirentCache,
153
154    profile_state: Mutex<Option<Box<dyn ProfileState>>>,
155
156    #[cfg(any(test, feature = "testing"))]
157    poisoned: AtomicBool,
158
159    blob_resupplied_count: Arc<PageRefaultCounter>,
160
161    /// The number of dirty bytes in pager backed VMOs that belong to this volume. VolumesDirectory
162    /// holds a count for all volumes. This count is only used for tracing.
163    pager_dirty_byte_count: AtomicU64,
164}
165
166#[fxfs_trace::trace]
167impl FxVolume {
168    pub fn new(
169        parent: Weak<VolumesDirectory>,
170        store: Arc<ObjectStore>,
171        fs_id: u64,
172        name: String,
173        blob_resupplied_count: Arc<PageRefaultCounter>,
174        memory_pressure_config: MemoryPressureConfig,
175    ) -> Result<Self, Error> {
176        let scope = ExecutionScope::new();
177        Ok(Self {
178            parent,
179            cache: NodeCache::new(),
180            store,
181            name,
182            pager: Pager::new(scope.clone())?,
183            executor: fasync::EHandle::local(),
184            background_task: Mutex::new(None),
185            fs_id,
186            scope,
187            dirent_cache: DirentCache::new(memory_pressure_config.mem_normal.cache_size_limit),
188            profile_state: Mutex::new(None),
189            #[cfg(any(test, feature = "testing"))]
190            poisoned: AtomicBool::new(false),
191            blob_resupplied_count,
192            pager_dirty_byte_count: AtomicU64::new(0),
193        })
194    }
195
196    pub fn store(&self) -> &Arc<ObjectStore> {
197        &self.store
198    }
199
200    pub fn cache(&self) -> &NodeCache {
201        &self.cache
202    }
203
204    pub fn dirent_cache(&self) -> &DirentCache {
205        &self.dirent_cache
206    }
207
208    pub fn pager(&self) -> &Pager {
209        &self.pager
210    }
211
212    pub fn id(&self) -> u64 {
213        self.fs_id
214    }
215
216    pub fn scope(&self) -> &ExecutionScope {
217        &self.scope
218    }
219
220    pub fn blob_resupplied_count(&self) -> &PageRefaultCounter {
221        &self.blob_resupplied_count
222    }
223
224    pub fn name(&self) -> &str {
225        &self.name
226    }
227
228    /// Reports the filesystem info, but if the volume has a space limit applied then the space
229    /// available and space used are reported based on the volume instead.
230    pub fn filesystem_info_for_volume(&self) -> fio::FilesystemInfo {
231        let allocator = self.store.filesystem().allocator();
232        let info =
233            if let Some(limit) = allocator.get_owner_bytes_limit(self.store.store_object_id()) {
234                filesystem::Info {
235                    used_bytes: allocator.get_owner_bytes_used(self.store.store_object_id()),
236                    total_bytes: limit,
237                }
238            } else {
239                self.store.filesystem().get_info()
240            };
241
242        info_to_filesystem_info(info, self.store.block_size(), self.store.object_count(), self.id())
243    }
244
245    /// Stop profiling, recover resources from it and finalize recordings.
246    pub async fn stop_profile_tasks(self: &Arc<Self>) {
247        let Some(mut state) = self.profile_state.lock().take() else { return };
248        state.wait_for_replay_to_finish().await;
249        self.pager.set_recorder(None);
250        let _ = state.wait_for_recording_to_finish().await;
251    }
252
253    /// Opens or creates the profile directory in the volume's internal directory.
254    pub async fn get_profile_directory(self: &Arc<Self>) -> Result<Directory<FxVolume>, Error> {
255        let internal_dir = self
256            .get_or_create_internal_dir()
257            .await
258            .map_err(|e| e.context("Opening internal directory"))?;
259        // Have to do separate calls to create the profile dir if necessary.
260        let mut transaction = self
261            .store()
262            .new_transaction(
263                lock_keys![LockKey::object(
264                    self.store().store_object_id(),
265                    internal_dir.object_id(),
266                )],
267                Options::default(),
268            )
269            .await?;
270        Ok(match internal_dir.directory().lookup(PROFILE_DIRECTORY).await? {
271            Some((object_id, _, _)) => {
272                Directory::open_unchecked(self.clone(), object_id, DirType::Normal)
273            }
274            None => {
275                let new_dir = internal_dir
276                    .directory()
277                    .create_child_dir(&mut transaction, PROFILE_DIRECTORY)
278                    .await?;
279                transaction.commit().await?;
280                new_dir
281            }
282        })
283    }
284
285    /// Starts recording a profile for the volume under the name given, and if a profile exists
286    /// under that same name it is replayed and will be replaced after by the new recording if it
287    /// is cleanly shutdown and finalized.
288    pub async fn record_and_replay_profile(
289        self: &Arc<Self>,
290        mut state: Box<dyn ProfileState>,
291        name: &str,
292    ) -> Result<(), Error> {
293        // We don't meddle in FxDirectory or FxFile here because we don't want a paged object.
294        // Normally we ensure that there's only one copy by using the Node cache on the volume, but
295        // that would create FxFile, so in this case we just assume that only one profile operation
296        // should be ongoing at a time, as that is ensured in `VolumesDirectory`.
297
298        // If there is a recording already, prepare to replay it.
299        let profile_dir = self.get_profile_directory().await?;
300        let replay_handle = if let Some((id, descriptor, _)) = profile_dir.lookup(name).await? {
301            ensure!(matches!(descriptor, ObjectDescriptor::File), FxfsError::Inconsistent);
302            Some(Box::new(
303                ObjectStore::open_object(self, id, HandleOptions::default(), None).await?,
304            ))
305        } else {
306            None
307        };
308
309        info!("Recording new profile '{name}' for volume object {}", self.store.store_object_id());
310        // Begin recording first to ensure that we capture any activity from the replay.
311        let recording_handle =
312            crate::fuchsia::profile::FileRecordingHandle::new(name, self.clone()).await?;
313
314        let mut profile_state = self.profile_state.lock();
315        self.pager.set_recorder(Some(state.record_new(self, Box::new(recording_handle))));
316        if let Some(handle) = replay_handle {
317            if let Some(guard) = self.scope().try_active_guard() {
318                state.replay_profile(handle, self.clone(), guard);
319                info!(
320                    "Replaying existing profile '{name}' for volume object {}",
321                    self.store.store_object_id()
322                );
323            }
324        }
325        *profile_state = Some(state);
326        Ok(())
327    }
328
329    /// Replays a profile if one exists, and only records if one does not exist.
330    pub async fn replay_xor_record_profile(
331        self: &Arc<Self>,
332        mut state: Box<dyn ProfileState>,
333        name: &str,
334    ) -> Result<(), Error> {
335        let profile_dir = self.get_profile_directory().await?;
336        let replay_handle = if let Some((id, descriptor, _)) = profile_dir.lookup(name).await? {
337            ensure!(matches!(descriptor, ObjectDescriptor::File), FxfsError::Inconsistent);
338            Some(Box::new(
339                ObjectStore::open_object(self, id, HandleOptions::default(), None).await?,
340            ))
341        } else {
342            None
343        };
344
345        if let Some(handle) = replay_handle {
346            let mut profile_state = self.profile_state.lock();
347            if let Some(guard) = self.scope().try_active_guard() {
348                state.replay_profile(handle, self.clone(), guard);
349                info!(
350                    "Replaying existing profile '{name}' for volume object {}",
351                    self.store.store_object_id()
352                );
353            }
354            *profile_state = Some(state);
355        } else {
356            info!(
357                "Recording new profile '{name}' for volume object {}",
358                self.store.store_object_id()
359            );
360            let recording_handle =
361                crate::fuchsia::profile::FileRecordingHandle::new(name, self.clone()).await?;
362            let mut profile_state = self.profile_state.lock();
363            self.pager.set_recorder(Some(state.record_new(self, Box::new(recording_handle))));
364            *profile_state = Some(state);
365        }
366        Ok(())
367    }
368
369    async fn get_or_create_internal_dir(self: &Arc<Self>) -> Result<Arc<FxDirectory>, Error> {
370        let internal_data_id = self.store().get_or_create_internal_directory_id().await?;
371        let internal_dir = self
372            .get_or_load_node(internal_data_id, ObjectDescriptor::Directory, None)
373            .await?
374            .into_any()
375            .downcast::<FxDirectory>()
376            .unwrap();
377        Ok(internal_dir)
378    }
379
380    pub async fn terminate(&self) {
381        let task = std::mem::replace(&mut *self.background_task.lock(), None);
382        if let Some((task, terminate)) = task {
383            let _ = terminate.send(());
384            task.await;
385        }
386
387        // `NodeCache::terminate` will break any strong reference cycles contained within nodes
388        // (pager registration). The only remaining nodes should be those with open FIDL
389        // connections or vmo references in the process of handling the VMO_ZERO_CHILDREN signal.
390        // `ExecutionScope::shutdown` + `ExecutionScope::wait` will close the open FIDL connections
391        // and synchonrize the signal handling which should result in all nodes flushing and then
392        // dropping. Any async tasks required to flush a node should take an active guard on the
393        // `ExecutionScope` which will prevent `ExecutionScope::wait` from completing until all
394        // nodes are flushed.
395        self.scope.shutdown();
396        self.cache.terminate();
397        self.scope.wait().await;
398
399        // Make sure there are no deferred operations still pending for this volume.
400        Epoch::global().barrier().await;
401
402        // The dirent_cache must be cleared *after* shutting down the scope because there can be
403        // tasks that insert entries into the cache.
404        self.dirent_cache.clear();
405
406        if self.store.filesystem().options().read_only {
407            // If the filesystem is read only, we don't need to flush/sync anything.
408            if self.store.is_unlocked() {
409                self.store.lock_read_only();
410            }
411            return;
412        }
413
414        self.flush_all_files(FlushType::LastChance).await;
415        self.store.filesystem().graveyard().flush().await;
416        if self.store.crypt().is_some() {
417            if let Err(e) = self.store.lock().await {
418                // The store will be left in a safe state and there won't be data-loss unless
419                // there's an issue flushing the journal later.
420                warn!(error:? = e; "Locking store error");
421            }
422        }
423        let sync_status = self
424            .store
425            .filesystem()
426            .sync(SyncOptions { flush_device: true, ..Default::default() })
427            .await;
428        if let Err(e) = sync_status {
429            error!(error:? = e; "Failed to sync filesystem; data may be lost");
430        }
431    }
432
433    /// Attempts to get a node from the node cache. If the node wasn't present in the cache, loads
434    /// the object from the object store, installing the returned node into the cache and returns
435    /// the newly created FxNode backed by the loaded object.  |parent| is only set on the node if
436    /// the node was not present in the cache.  Otherwise, it is ignored.
437    pub async fn get_or_load_node(
438        self: &Arc<Self>,
439        object_id: u64,
440        object_descriptor: ObjectDescriptor,
441        parent: Option<Arc<FxDirectory>>,
442    ) -> Result<Arc<dyn FxNode>, Error> {
443        match self.cache.get_or_reserve(object_id).await {
444            GetResult::Node(node) => Ok(node),
445            GetResult::Placeholder(placeholder) => {
446                let node = match object_descriptor {
447                    ObjectDescriptor::File => FxFile::new(
448                        ObjectStore::open_object(self, object_id, HandleOptions::default(), None)
449                            .await?,
450                    ) as Arc<dyn FxNode>,
451                    ObjectDescriptor::Directory => {
452                        // Can't use open_unchecked because we don't know if the dir is casefolded
453                        // or encrypted.
454                        Arc::new(FxDirectory::new(parent, Directory::open(self, object_id).await?))
455                            as Arc<dyn FxNode>
456                    }
457                    ObjectDescriptor::Symlink => Arc::new(FxSymlink::new(self.clone(), object_id)),
458                    _ => bail!(FxfsError::Inconsistent),
459                };
460                placeholder.commit(&node);
461                Ok(node)
462            }
463        }
464    }
465
466    /// Marks the given directory deleted.
467    pub fn mark_directory_deleted(&self, object_id: u64) {
468        if let Some(node) = self.cache.get(object_id) {
469            // It's possible that node is a placeholder, in which case we don't need to wait for it
470            // to be resolved because it should be blocked behind the locks that are held by the
471            // caller, and once they're dropped, it'll be found to be deleted via the tree.
472            if let Ok(dir) = node.into_any().downcast::<FxDirectory>() {
473                dir.set_deleted();
474            }
475        }
476    }
477
478    /// Removes resources associated with |object_id| (which ought to be a file), if there are no
479    /// open connections to that file.
480    ///
481    /// This must be called *after committing* a transaction which deletes the last reference to
482    /// |object_id|, since before that point, new connections could be established.
483    pub(super) async fn maybe_purge_file(&self, object_id: u64) -> Result<(), Error> {
484        if let Some(node) = self.cache.get(object_id) {
485            node.clone().mark_to_be_purged();
486            return Ok(());
487        }
488        // If this fails, the graveyard should clean it up on next mount.
489        self.store
490            .tombstone_object(
491                object_id,
492                Options { borrow_metadata_space: true, ..Default::default() },
493            )
494            .await?;
495        Ok(())
496    }
497
498    /// Starts the background work task.  This task will periodically:
499    ///   - scan all files and flush them to disk, and
500    ///   - purge unused cached data.
501    /// The task will hold a strong reference to the FxVolume while it is running, so the task must
502    /// be closed later with Self::terminate, or the FxVolume will never be dropped.
503    pub fn start_background_task(
504        self: &Arc<Self>,
505        config: MemoryPressureConfig,
506        mem_monitor: Option<&MemoryPressureMonitor>,
507    ) {
508        let mut background_task = self.background_task.lock();
509        if background_task.is_none() {
510            let (tx, rx) = oneshot::channel();
511
512            let task = if let Some(mem_monitor) = mem_monitor {
513                fasync::Task::spawn(self.clone().background_task(
514                    config,
515                    mem_monitor.get_level_stream(),
516                    rx,
517                ))
518            } else {
519                // With no memory pressure monitoring, just stub the stream out as always pending.
520                fasync::Task::spawn(self.clone().background_task(config, stream::pending(), rx))
521            };
522
523            *background_task = Some((task, tx));
524        }
525    }
526
527    #[trace]
528    async fn background_task(
529        self: Arc<Self>,
530        config: MemoryPressureConfig,
531        mut level_stream: impl Stream<Item = MemoryPressureLevel> + FusedStream + Unpin,
532        terminate: oneshot::Receiver<()>,
533    ) {
534        debug!(store_id = self.store.store_object_id(); "FxVolume::background_task start");
535        let mut terminate = terminate.fuse();
536        // Default to the normal period until updates come from the `level_stream`.
537        let mut level = MemoryPressureLevel::Normal;
538        let mut timer =
539            pin!(fasync::Timer::new(config.for_level(&level).background_task_initial_delay));
540
541        loop {
542            let mut should_terminate = false;
543            let mut should_flush = false;
544            let mut low_mem = false;
545            let mut should_purge_layer_files = false;
546            let mut should_update_cache_limit = false;
547
548            futures::select_biased! {
549                _ = terminate => should_terminate = true,
550                new_level = level_stream.next() => {
551                    // Because `level_stream` will never terminate, this is safe to unwrap.
552                    let new_level = new_level.unwrap();
553                    // At critical levels, it's okay to undertake expensive work immediately
554                    // to reclaim memory.
555                    low_mem = matches!(new_level, MemoryPressureLevel::Critical);
556                    should_purge_layer_files = true;
557                    if new_level != level {
558                        level = new_level;
559                        should_update_cache_limit = true;
560                        let level_config = config.for_level(&level);
561                        timer.as_mut().reset(fasync::MonotonicInstant::after(
562                            level_config.background_task_period.into())
563                        );
564                        debug!(
565                            "Background task period changed to {:?} due to new memory pressure \
566                            level ({:?}).",
567                            config.for_level(&level).background_task_period, level
568                        );
569                    }
570                }
571                _ = timer => {
572                    timer.as_mut().reset(fasync::MonotonicInstant::after(
573                        config.for_level(&level).background_task_period.into())
574                    );
575                    should_flush = true;
576                    // Only purge layer file caches once we have elevated memory pressure.
577                    should_purge_layer_files = !matches!(level, MemoryPressureLevel::Normal);
578                }
579            };
580            if should_terminate {
581                break;
582            }
583            // Maybe close extra files *before* iterating them for flush/low mem.
584            if should_update_cache_limit {
585                self.dirent_cache.set_limit(config.for_level(&level).cache_size_limit);
586            }
587            if should_flush {
588                self.flush_all_files(FlushType::Sync).await;
589                self.dirent_cache.recycle_stale_files();
590            } else if low_mem {
591                // This is a softer version of flushing files, so don't bother if we're flushing.
592                self.minimize_memory().await;
593            }
594            if should_purge_layer_files {
595                for layer in self.store.tree().immutable_layer_set().layers {
596                    layer.purge_cached_data();
597                }
598            }
599        }
600        debug!(store_id = self.store.store_object_id(); "FxVolume::background_task end");
601    }
602
603    /// Reports that a certain number of bytes will be dirtied in a pager-backed VMO.
604    ///
605    /// Note that this function may await flush tasks.
606    pub fn report_pager_dirty(
607        self: Arc<Self>,
608        byte_count: u64,
609        mark_dirty: impl FnOnce() + Send + 'static,
610    ) {
611        let this = self.clone();
612        let callback = move || {
613            let prev = this.pager_dirty_byte_count.fetch_add(byte_count, Ordering::Relaxed);
614            mark_dirty();
615            fxfs_trace::counter!("dirty-bytes", 0, this.name => prev.saturating_add(byte_count));
616        };
617        if let Some(parent) = self.parent.upgrade() {
618            parent.report_pager_dirty(byte_count, self, callback);
619        } else {
620            callback();
621        }
622    }
623
624    /// Reports that a certain number of bytes were cleaned in a pager-backed VMO.
625    pub fn report_pager_clean(&self, byte_count: u64) {
626        if let Some(parent) = self.parent.upgrade() {
627            parent.report_pager_clean(byte_count);
628        }
629        let prev = self.pager_dirty_byte_count.fetch_sub(byte_count, Ordering::Relaxed);
630        fxfs_trace::counter!("dirty-bytes", 0, self.name => prev.saturating_sub(byte_count));
631    }
632
633    #[trace]
634    pub async fn flush_all_files(&self, flush_type: FlushType) {
635        let mut flushed = 0;
636        for file in self.cache.files() {
637            if let Some(node) = file.into_opened_node() {
638                if let Err(e) = FxFile::flush(&node, flush_type).await {
639                    warn!(
640                        store_id = self.store.store_object_id(),
641                        oid = node.object_id(),
642                        error:? = e;
643                        "Failed to flush",
644                    )
645                }
646                if flush_type == FlushType::LastChance {
647                    let file = node.clone();
648                    std::mem::drop(node);
649                    file.force_clean();
650                }
651            }
652            flushed += 1;
653        }
654        debug!(store_id = self.store.store_object_id(), file_count = flushed; "FxVolume flushed");
655    }
656
657    /// Flushes only files with dirty pages.
658    ///
659    /// `PagedObjectHandle` tracks the number dirty pages locally (except for overwrite files) which
660    /// makes determining whether flushing a file will reduce the number of dirty pages efficient.
661    /// `flush_all_files` checks if any metadata needs to be flushed which involves a syscall making
662    /// it significantly slower when there are lots of open files without dirty pages.
663    #[trace]
664    pub async fn minimize_memory(&self) {
665        for file in self.cache.files() {
666            if let Some(node) = file.into_opened_node() {
667                if let Err(e) = node.handle().minimize_memory().await {
668                    warn!(
669                        store_id = self.store.store_object_id(),
670                        oid = node.object_id(),
671                        error:? = e;
672                        "Failed to flush",
673                    )
674                }
675            }
676        }
677    }
678
679    /// Spawns a short term task for the volume that includes a guard that will prevent termination.
680    pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) {
681        if let Some(guard) = self.scope.try_active_guard() {
682            self.executor.spawn_detached(FutureWithGuard::new(guard, task));
683        }
684    }
685
686    /// Tries to unwrap this volume.  If it fails, it will poison the volume so that when it is
687    /// dropped, you get a backtrace.
688    #[cfg(any(test, feature = "testing"))]
689    pub fn try_unwrap(self: Arc<Self>) -> Option<FxVolume> {
690        self.poisoned.store(true, Ordering::Relaxed);
691        match Arc::try_unwrap(self) {
692            Ok(volume) => {
693                volume.poisoned.store(false, Ordering::Relaxed);
694                Some(volume)
695            }
696            Err(this) => {
697                // Log details about all the places where there might be a reference cycle.
698                info!(
699                    "background_task: {}, profile_state: {}, dirent_cache count: {}, \
700                     pager strong file refs={}, no tasks={}",
701                    this.background_task.lock().is_some(),
702                    this.profile_state.lock().is_some(),
703                    this.dirent_cache.len(),
704                    crate::pager::STRONG_FILE_REFS.load(Ordering::Relaxed),
705                    {
706                        let mut no_tasks = pin!(this.scope.wait());
707                        no_tasks
708                            .poll_unpin(&mut std::task::Context::from_waker(
709                                std::task::Waker::noop(),
710                            ))
711                            .is_ready()
712                    },
713                );
714                None
715            }
716        }
717    }
718
719    pub async fn handle_file_backed_volume_provider_requests(
720        this: Weak<Self>,
721        scope: ExecutionScope,
722        mut requests: FileBackedVolumeProviderRequestStream,
723    ) -> Result<(), Error> {
724        while let Some(request) = requests.try_next().await? {
725            match request {
726                FileBackedVolumeProviderRequest::Open {
727                    parent_directory_token,
728                    name,
729                    server_end,
730                    control_handle: _,
731                } => {
732                    // Try and get an active guard before upgrading.
733                    let Some(_guard) = scope.try_active_guard() else {
734                        bail!("Volume shutting down")
735                    };
736                    let Some(this) = this.upgrade() else { bail!("FxVolume dropped") };
737                    match this
738                        .scope
739                        .token_registry()
740                        // NB: For now, we only expect these calls in a regular (non-blob) volume.
741                        // Hard-code the type for simplicity; attempts to call on a blob volume will
742                        // get an error.
743                        .get_owner_and_rights(parent_directory_token)
744                        .and_then(|dir| {
745                            dir.ok_or(zx::Status::BAD_HANDLE).and_then(|(dir, rights)| {
746                                if !rights.contains(fio::Rights::MODIFY_DIRECTORY) {
747                                    return Err(zx::Status::BAD_HANDLE);
748                                }
749                                dir.into_any()
750                                    .downcast::<FxDirectory>()
751                                    .map_err(|_| zx::Status::BAD_HANDLE)
752                            })
753                        }) {
754                        Ok(dir) => {
755                            dir.open_block_file(&name, server_end).await;
756                        }
757                        Err(status) => {
758                            let _ = server_end.close_with_epitaph(status).unwrap_or_else(|e| {
759                                error!(error:? = e; "open failed to send epitaph");
760                            });
761                        }
762                    }
763                }
764            }
765        }
766        Ok(())
767    }
768
769    pub async fn handle_project_id_requests(
770        this: Weak<Self>,
771        scope: ExecutionScope,
772        mut requests: ProjectIdRequestStream,
773    ) -> Result<(), Error> {
774        while let Some(request) = requests.try_next().await? {
775            // Try and get an active guard before upgrading.
776            let Some(_guard) = scope.try_active_guard() else { bail!("Volume shutting down") };
777            let Some(this) = this.upgrade() else { bail!("FxVolume dropped") };
778            let store_id = this.store.store_object_id();
779
780            match request {
781                ProjectIdRequest::SetLimit { responder, project_id, bytes, nodes } => {
782                    let result = if let Some(project_id) = ProjectId::new(project_id) {
783                        this.store()
784                        .set_project_limit(project_id, bytes, nodes)
785                        .await
786                        .map_err(|error| {
787                            error!(error:?, store_id, project_id; "Failed to set project limit");
788                            map_to_raw_status(error)
789                        })
790                    } else {
791                        Err(zx::Status::OUT_OF_RANGE.into_raw())
792                    };
793                    responder.send(result)?
794                }
795                ProjectIdRequest::Clear { responder, project_id } => {
796                    let result = if let Some(project_id) = ProjectId::new(project_id) {
797                        this.store().clear_project_limit(project_id).await.map_err(|error| {
798                            error!(error:?, store_id, project_id; "Failed to clear project limit");
799                            map_to_raw_status(error)
800                        })
801                    } else {
802                        Err(zx::Status::OUT_OF_RANGE.into_raw())
803                    };
804                    responder.send(result)?
805                }
806                ProjectIdRequest::SetForNode { responder, node_id, project_id } => {
807                    let result = if let Some(project_id) = ProjectId::new(project_id) {
808                        this.store()
809                        .set_project_for_node(node_id, project_id)
810                        .await
811                        .map_err(|error| {
812                            error!(error:?, store_id, node_id, project_id; "Failed to apply node.");
813                            map_to_raw_status(error)
814                        })
815                    } else {
816                        Err(zx::Status::OUT_OF_RANGE.into_raw())
817                    };
818                    responder.send(result)?
819                }
820                ProjectIdRequest::GetForNode { responder, node_id } => responder.send(
821                    this.store()
822                        .get_project_for_node(node_id)
823                        .await
824                        .map(ProjectIdExt::raw)
825                        .map_err(|error| {
826                            error!(error:?, store_id, node_id; "Failed to get node.");
827                            map_to_raw_status(error)
828                        }),
829                )?,
830                ProjectIdRequest::ClearForNode { responder, node_id } => responder.send(
831                    this.store().clear_project_for_node(node_id).await.map_err(|error| {
832                        error!(error:?, store_id, node_id; "Failed to clear for node.");
833                        map_to_raw_status(error)
834                    }),
835                )?,
836                ProjectIdRequest::List { responder, token } => {
837                    responder.send(match this.list_projects(&token).await {
838                        Ok((ref entries, ref next_token)) => Ok((entries, next_token.as_ref())),
839                        Err(error) => {
840                            error!(error:?, store_id, token:?; "Failed to list projects.");
841                            Err(map_to_raw_status(error))
842                        }
843                    })?
844                }
845                ProjectIdRequest::Info { responder, project_id } => {
846                    responder.send(match this.project_info(project_id).await {
847                        Ok((ref limit, ref usage)) => Ok((limit, usage)),
848                        Err(error) => {
849                            error!(error:?, store_id, project_id; "Failed to get project info.");
850                            Err(map_to_raw_status(error))
851                        }
852                    })?
853                }
854            }
855        }
856        Ok(())
857    }
858
859    // Maximum entries to fit based on 64KiB message size minus 16 bytes of header, 16 bytes
860    // of vector header, 16 bytes for the optional token header, and 8 bytes of token value.
861    // https://fuchsia.dev/fuchsia-src/development/languages/fidl/guides/max-out-pagination
862    const MAX_PROJECT_ENTRIES: usize = 8184;
863
864    // Calls out to the inner volume to list available projects, removing and re-adding the fidl
865    // wrapper types for the pagination token.
866    async fn list_projects(
867        &self,
868        last_token: &Option<Box<ProjectIterToken>>,
869    ) -> Result<(Vec<u64>, Option<ProjectIterToken>), Error> {
870        let (entries, token) = self
871            .store()
872            .list_projects(
873                last_token.as_ref().and_then(|v| ProjectId::new(v.value)),
874                Self::MAX_PROJECT_ENTRIES,
875            )
876            .await?;
877        Ok((
878            entries.into_iter().map(ProjectId::raw).collect(),
879            token.map(|value| ProjectIterToken { value: value.raw() }),
880        ))
881    }
882
883    async fn project_info(&self, project_id: u64) -> Result<(BytesAndNodes, BytesAndNodes), Error> {
884        let project_id = ProjectId::new(project_id).ok_or(FxfsError::OutOfRange)?;
885        let (limit, usage) = self.store().project_info(project_id).await?;
886        // At least one of them needs to be around to return anything.
887        ensure!(limit.is_some() || usage.is_some(), FxfsError::NotFound);
888        Ok((
889            limit.map_or_else(
890                || BytesAndNodes { bytes: u64::MAX, nodes: u64::MAX },
891                |v| BytesAndNodes { bytes: v.0, nodes: v.1 },
892            ),
893            usage.map_or_else(
894                || BytesAndNodes { bytes: 0, nodes: 0 },
895                |v| BytesAndNodes { bytes: v.0, nodes: v.1 },
896            ),
897        ))
898    }
899}
900
901#[cfg(any(test, feature = "testing"))]
902impl Drop for FxVolume {
903    fn drop(&mut self) {
904        assert!(!*self.poisoned.get_mut());
905    }
906}
907
908impl HandleOwner for FxVolume {}
909
910impl AsRef<ObjectStore> for FxVolume {
911    fn as_ref(&self) -> &ObjectStore {
912        &self.store
913    }
914}
915
916#[async_trait]
917impl FsInspectVolume for FxVolume {
918    async fn get_volume_data(&self) -> Option<VolumeData> {
919        // Don't try to return data if the volume is shutting down.
920        let _guard = self.scope.try_active_guard()?;
921
922        let object_count = self.store().object_count();
923        let (used_bytes, bytes_limit) =
924            self.store.filesystem().allocator().owner_allocation_info(self.store.store_object_id());
925        let encrypted = self.store().crypt().is_some();
926        let port_koid = fasync::EHandle::local().port().as_handle_ref().koid().unwrap().raw_koid();
927        Some(VolumeData { bytes_limit, used_bytes, used_nodes: object_count, encrypted, port_koid })
928    }
929}
930
931pub trait RootDir: FxNode + DirectoryEntry {
932    fn as_directory_entry(self: Arc<Self>) -> Arc<dyn DirectoryEntry>;
933
934    fn serve(self: Arc<Self>, flags: fio::Flags, server_end: ServerEnd<fio::DirectoryMarker>);
935
936    fn as_node(self: Arc<Self>) -> Arc<dyn FxNode>;
937
938    fn register_additional_volume_services(
939        self: Arc<Self>,
940        _svc_dir: &Simple,
941    ) -> Result<(), Error> {
942        Ok(())
943    }
944}
945
946#[derive(Clone)]
947pub struct FxVolumeAndRoot {
948    volume: Arc<FxVolume>,
949    root: Arc<dyn RootDir>,
950
951    // This is used for service connections and anything that isn't the actual volume.
952    admin_scope: ExecutionScope,
953
954    // The outgoing directory that the volume might be served on.
955    outgoing_dir: Arc<Simple>,
956}
957
958impl FxVolumeAndRoot {
959    pub async fn new<T: From<Directory<FxVolume>> + RootDir>(
960        parent: Weak<VolumesDirectory>,
961        store: Arc<ObjectStore>,
962        unique_id: u64,
963        volume_name: String,
964        blob_resupplied_count: Arc<PageRefaultCounter>,
965        memory_pressure_config: MemoryPressureConfig,
966    ) -> Result<Self, Error> {
967        let volume = Arc::new(FxVolume::new(
968            parent,
969            store,
970            unique_id,
971            volume_name,
972            blob_resupplied_count.clone(),
973            memory_pressure_config,
974        )?);
975        let root_object_id = volume.store().root_directory_object_id();
976        let root_dir = Directory::open(&volume, root_object_id).await?;
977        let root = Arc::<T>::new(root_dir.into()) as Arc<dyn RootDir>;
978        volume
979            .cache
980            .get_or_reserve(root_object_id)
981            .await
982            .placeholder()
983            .unwrap()
984            .commit(&root.clone().as_node());
985        Ok(Self {
986            volume,
987            root,
988            admin_scope: ExecutionScope::new(),
989            outgoing_dir: vfs::directory::immutable::simple(),
990        })
991    }
992
993    pub fn volume(&self) -> &Arc<FxVolume> {
994        &self.volume
995    }
996
997    pub fn root(&self) -> &Arc<dyn RootDir> {
998        &self.root
999    }
1000
1001    pub fn admin_scope(&self) -> &ExecutionScope {
1002        &self.admin_scope
1003    }
1004
1005    pub fn outgoing_dir(&self) -> &Arc<Simple> {
1006        &self.outgoing_dir
1007    }
1008
1009    // The same as root but downcasted to FxDirectory.
1010    pub fn root_dir(&self) -> Arc<FxDirectory> {
1011        self.root().clone().into_any().downcast::<FxDirectory>().expect("Invalid type for root")
1012    }
1013
1014    pub fn into_volume(self) -> Arc<FxVolume> {
1015        self.volume
1016    }
1017}
1018
1019// The correct number here is arguably u64::MAX - 1 (because node 0 is reserved). There's a bug
1020// where inspect test cases fail if we try and use that, possibly because of a signed/unsigned bug.
1021// See https://fxbug.dev/42168242.  Until that's fixed, we'll have to use i64::MAX.
1022const TOTAL_NODES: u64 = i64::MAX as u64;
1023
1024// An array used to initialize the FilesystemInfo |name| field. This just spells "fxfs" 0-padded to
1025// 32 bytes.
1026const FXFS_INFO_NAME_FIDL: [i8; 32] = [
1027    0x66, 0x78, 0x66, 0x73, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1028    0, 0, 0, 0,
1029];
1030
1031fn info_to_filesystem_info(
1032    info: filesystem::Info,
1033    block_size: u64,
1034    object_count: u64,
1035    fs_id: u64,
1036) -> fio::FilesystemInfo {
1037    fio::FilesystemInfo {
1038        total_bytes: info.total_bytes,
1039        used_bytes: info.used_bytes,
1040        total_nodes: TOTAL_NODES,
1041        used_nodes: object_count,
1042        // TODO(https://fxbug.dev/42175592): Support free_shared_pool_bytes.
1043        free_shared_pool_bytes: 0,
1044        fs_id,
1045        block_size: block_size as u32,
1046        max_filename_size: fio::MAX_NAME_LENGTH as u32,
1047        fs_type: fidl_fuchsia_fs::VfsType::Fxfs.into_primitive(),
1048        padding: 0,
1049        name: FXFS_INFO_NAME_FIDL,
1050    }
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055    use super::DIRENT_CACHE_LIMIT;
1056    use crate::fuchsia::file::FxFile;
1057    use crate::fuchsia::fxblob::testing::{self as blob_testing, BlobFixture};
1058    use crate::fuchsia::memory_pressure::MemoryPressureLevel;
1059    use crate::fuchsia::pager::PagerBacked;
1060    use crate::fuchsia::profile::{RECORDED, new_profile_state};
1061    use crate::fuchsia::testing::{
1062        TestFixture, TestFixtureOptions, close_dir_checked, close_file_checked, open_dir,
1063        open_dir_checked, open_file, open_file_checked,
1064    };
1065    use crate::fuchsia::volume::{FxVolume, MemoryPressureConfig, MemoryPressureLevelConfig};
1066    use crate::fuchsia::volumes_directory::VolumesDirectory;
1067    use delivery_blob::CompressionMode;
1068    use fidl_fuchsia_fxfs::{BytesAndNodes, ProjectIdMarker};
1069    use fidl_fuchsia_io as fio;
1070    use fs_inspect::FsInspectVolume;
1071    use fuchsia_async::{self as fasync, TimeoutExt as _};
1072    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
1073    use fuchsia_fs::file;
1074    use fxfs::filesystem::{FxFilesystem, FxFilesystemBuilder};
1075    use fxfs::fsck::{fsck, fsck_volume};
1076    use fxfs::object_store::directory::replace_child;
1077    use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
1078    use fxfs::object_store::volume::root_volume;
1079    use fxfs::object_store::{HandleOptions, ObjectDescriptor, ObjectStore, StoreOptions};
1080    use fxfs_crypt_common::CryptBase;
1081    use fxfs_crypto::{Crypt, WrappingKeyId};
1082    use fxfs_insecure_crypto::new_insecure_crypt;
1083    use refaults_vmo::PageRefaultCounter;
1084    use std::sync::atomic::Ordering;
1085    use std::sync::{Arc, Weak};
1086    use std::time::Duration;
1087    use storage_device::DeviceHolder;
1088    use storage_device::fake_device::FakeDevice;
1089    use zx::Status;
1090
1091    const WRAPPING_KEY_ID: WrappingKeyId = u128::to_le_bytes(123);
1092
1093    #[fuchsia::test(threads = 10)]
1094    async fn test_rename_different_dirs() {
1095        use zx::Event;
1096
1097        let fixture = TestFixture::new().await;
1098        let root = fixture.root();
1099
1100        let src = open_dir_checked(
1101            &root,
1102            "foo",
1103            fio::Flags::FLAG_MAYBE_CREATE
1104                | fio::PERM_READABLE
1105                | fio::PERM_WRITABLE
1106                | fio::Flags::PROTOCOL_DIRECTORY,
1107            Default::default(),
1108        )
1109        .await;
1110
1111        let dst = open_dir_checked(
1112            &root,
1113            "bar",
1114            fio::Flags::FLAG_MAYBE_CREATE
1115                | fio::PERM_READABLE
1116                | fio::PERM_WRITABLE
1117                | fio::Flags::PROTOCOL_DIRECTORY,
1118            Default::default(),
1119        )
1120        .await;
1121
1122        let f = open_file_checked(
1123            &root,
1124            "foo/a",
1125            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1126            &Default::default(),
1127        )
1128        .await;
1129        close_file_checked(f).await;
1130
1131        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1132        Status::ok(status).expect("get_token failed");
1133        src.rename("a", Event::from(dst_token.unwrap()), "b")
1134            .await
1135            .expect("FIDL call failed")
1136            .expect("rename failed");
1137
1138        assert_eq!(
1139            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1140                .await
1141                .expect_err("Open succeeded")
1142                .root_cause()
1143                .downcast_ref::<Status>()
1144                .expect("No status"),
1145            &Status::NOT_FOUND,
1146        );
1147        let f =
1148            open_file_checked(&root, "bar/b", fio::Flags::PROTOCOL_FILE, &Default::default()).await;
1149        close_file_checked(f).await;
1150
1151        close_dir_checked(dst).await;
1152        close_dir_checked(src).await;
1153        fixture.close().await;
1154    }
1155
1156    #[fuchsia::test(threads = 10)]
1157    async fn test_rename_same_dir() {
1158        use zx::Event;
1159        let fixture = TestFixture::new().await;
1160        let root = fixture.root();
1161
1162        let src = open_dir_checked(
1163            &root,
1164            "foo",
1165            fio::Flags::FLAG_MAYBE_CREATE
1166                | fio::PERM_READABLE
1167                | fio::PERM_WRITABLE
1168                | fio::Flags::PROTOCOL_DIRECTORY,
1169            Default::default(),
1170        )
1171        .await;
1172
1173        let f = open_file_checked(
1174            &root,
1175            "foo/a",
1176            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1177            &Default::default(),
1178        )
1179        .await;
1180        close_file_checked(f).await;
1181
1182        let (status, src_token) = src.get_token().await.expect("FIDL call failed");
1183        Status::ok(status).expect("get_token failed");
1184        src.rename("a", Event::from(src_token.unwrap()), "b")
1185            .await
1186            .expect("FIDL call failed")
1187            .expect("rename failed");
1188
1189        assert_eq!(
1190            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1191                .await
1192                .expect_err("Open succeeded")
1193                .root_cause()
1194                .downcast_ref::<Status>()
1195                .expect("No status"),
1196            &Status::NOT_FOUND,
1197        );
1198        let f =
1199            open_file_checked(&root, "foo/b", fio::Flags::PROTOCOL_FILE, &Default::default()).await;
1200        close_file_checked(f).await;
1201
1202        close_dir_checked(src).await;
1203        fixture.close().await;
1204    }
1205
1206    #[fuchsia::test(threads = 10)]
1207    async fn test_rename_overwrites_file() {
1208        use zx::Event;
1209        let fixture = TestFixture::new().await;
1210        let root = fixture.root();
1211
1212        let src = open_dir_checked(
1213            &root,
1214            "foo",
1215            fio::Flags::FLAG_MAYBE_CREATE
1216                | fio::PERM_READABLE
1217                | fio::PERM_WRITABLE
1218                | fio::Flags::PROTOCOL_DIRECTORY,
1219            Default::default(),
1220        )
1221        .await;
1222
1223        let dst = open_dir_checked(
1224            &root,
1225            "bar",
1226            fio::Flags::FLAG_MAYBE_CREATE
1227                | fio::PERM_READABLE
1228                | fio::PERM_WRITABLE
1229                | fio::Flags::PROTOCOL_DIRECTORY,
1230            Default::default(),
1231        )
1232        .await;
1233
1234        // The src file is non-empty.
1235        let src_file = open_file_checked(
1236            &root,
1237            "foo/a",
1238            fio::Flags::FLAG_MAYBE_CREATE
1239                | fio::PERM_READABLE
1240                | fio::PERM_WRITABLE
1241                | fio::Flags::PROTOCOL_FILE,
1242            &Default::default(),
1243        )
1244        .await;
1245        let buf = vec![0xaa as u8; 8192];
1246        file::write(&src_file, buf.as_slice()).await.expect("Failed to write to file");
1247        close_file_checked(src_file).await;
1248
1249        // The dst file is empty (so we can distinguish it).
1250        let f = open_file_checked(
1251            &root,
1252            "bar/b",
1253            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1254            &Default::default(),
1255        )
1256        .await;
1257        close_file_checked(f).await;
1258
1259        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1260        Status::ok(status).expect("get_token failed");
1261        src.rename("a", Event::from(dst_token.unwrap()), "b")
1262            .await
1263            .expect("FIDL call failed")
1264            .expect("rename failed");
1265
1266        assert_eq!(
1267            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1268                .await
1269                .expect_err("Open succeeded")
1270                .root_cause()
1271                .downcast_ref::<Status>()
1272                .expect("No status"),
1273            &Status::NOT_FOUND,
1274        );
1275        let file = open_file_checked(
1276            &root,
1277            "bar/b",
1278            fio::PERM_READABLE | fio::Flags::PROTOCOL_FILE,
1279            &Default::default(),
1280        )
1281        .await;
1282        let buf = file::read(&file).await.expect("read file failed");
1283        assert_eq!(buf, vec![0xaa as u8; 8192]);
1284        close_file_checked(file).await;
1285
1286        close_dir_checked(dst).await;
1287        close_dir_checked(src).await;
1288        fixture.close().await;
1289    }
1290
1291    #[fuchsia::test(threads = 10)]
1292    async fn test_rename_overwrites_dir() {
1293        use zx::Event;
1294        let fixture = TestFixture::new().await;
1295        let root = fixture.root();
1296
1297        let src = open_dir_checked(
1298            &root,
1299            "foo",
1300            fio::Flags::FLAG_MAYBE_CREATE
1301                | fio::PERM_READABLE
1302                | fio::PERM_WRITABLE
1303                | fio::Flags::PROTOCOL_DIRECTORY,
1304            Default::default(),
1305        )
1306        .await;
1307
1308        let dst = open_dir_checked(
1309            &root,
1310            "bar",
1311            fio::Flags::FLAG_MAYBE_CREATE
1312                | fio::PERM_READABLE
1313                | fio::PERM_WRITABLE
1314                | fio::Flags::PROTOCOL_DIRECTORY,
1315            Default::default(),
1316        )
1317        .await;
1318
1319        // The src dir is non-empty.
1320        open_dir_checked(
1321            &root,
1322            "foo/a",
1323            fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
1324            Default::default(),
1325        )
1326        .await;
1327        open_file_checked(
1328            &root,
1329            "foo/a/file",
1330            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1331            &Default::default(),
1332        )
1333        .await;
1334        open_dir_checked(
1335            &root,
1336            "bar/b",
1337            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_DIRECTORY,
1338            Default::default(),
1339        )
1340        .await;
1341
1342        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1343        Status::ok(status).expect("get_token failed");
1344        src.rename("a", Event::from(dst_token.unwrap()), "b")
1345            .await
1346            .expect("FIDL call failed")
1347            .expect("rename failed");
1348
1349        assert_eq!(
1350            open_dir(&root, "foo/a", fio::Flags::PROTOCOL_DIRECTORY, &Default::default())
1351                .await
1352                .expect_err("Open succeeded")
1353                .root_cause()
1354                .downcast_ref::<Status>()
1355                .expect("No status"),
1356            &Status::NOT_FOUND,
1357        );
1358        let f =
1359            open_file_checked(&root, "bar/b/file", fio::Flags::PROTOCOL_FILE, &Default::default())
1360                .await;
1361        close_file_checked(f).await;
1362
1363        close_dir_checked(dst).await;
1364        close_dir_checked(src).await;
1365
1366        fixture.close().await;
1367    }
1368
1369    #[fuchsia::test]
1370    async fn test_background_flush() {
1371        let fixture = TestFixture::new().await;
1372        {
1373            let file = open_file_checked(
1374                fixture.root(),
1375                "file",
1376                fio::Flags::FLAG_MAYBE_CREATE
1377                    | fio::PERM_READABLE
1378                    | fio::PERM_WRITABLE
1379                    | fio::Flags::PROTOCOL_FILE,
1380                &Default::default(),
1381            )
1382            .await;
1383            let object_id = file
1384                .get_attributes(fio::NodeAttributesQuery::ID)
1385                .await
1386                .expect("Fidl get attr")
1387                .expect("get attr")
1388                .1
1389                .id
1390                .unwrap();
1391
1392            // Write some data to the file, which will only go to the cache for now.
1393            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1394
1395            // Initialized to the default size.
1396            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1397            let volume = fixture.volume().volume().clone();
1398
1399            let data_has_persisted = || async {
1400                // We have to reopen the object each time since this is a distinct handle from the
1401                // one managed by the FxFile.
1402                let object =
1403                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1404                        .await
1405                        .expect("open_object failed");
1406                let data = object.contents(8192).await.expect("read failed");
1407                data.len() == 1 && data[..] == [123u8]
1408            };
1409            assert!(!data_has_persisted().await);
1410
1411            fixture.volume().volume().start_background_task(
1412                MemoryPressureConfig {
1413                    mem_normal: MemoryPressureLevelConfig {
1414                        background_task_period: Duration::from_millis(100),
1415                        background_task_initial_delay: Duration::from_millis(100),
1416                        ..Default::default()
1417                    },
1418                    mem_warning: Default::default(),
1419                    mem_critical: Default::default(),
1420                },
1421                None,
1422            );
1423
1424            const MAX_WAIT: Duration = Duration::from_secs(20);
1425            let wait_increments = Duration::from_millis(400);
1426            let mut total_waited = Duration::ZERO;
1427
1428            while total_waited < MAX_WAIT {
1429                fasync::Timer::new(wait_increments).await;
1430                total_waited += wait_increments;
1431
1432                if data_has_persisted().await {
1433                    break;
1434                }
1435            }
1436
1437            assert!(data_has_persisted().await);
1438        }
1439
1440        fixture.close().await;
1441    }
1442
1443    #[fuchsia::test(threads = 2)]
1444    async fn test_background_flush_with_warning_memory_pressure() {
1445        let fixture = TestFixture::new().await;
1446        {
1447            let file = open_file_checked(
1448                fixture.root(),
1449                "file",
1450                fio::Flags::FLAG_MAYBE_CREATE
1451                    | fio::PERM_READABLE
1452                    | fio::PERM_WRITABLE
1453                    | fio::Flags::PROTOCOL_FILE,
1454                &Default::default(),
1455            )
1456            .await;
1457            let object_id = file
1458                .get_attributes(fio::NodeAttributesQuery::ID)
1459                .await
1460                .expect("Fidl get attr")
1461                .expect("get attr")
1462                .1
1463                .id
1464                .unwrap();
1465
1466            // Write some data to the file, which will only go to the cache for now.
1467            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1468
1469            // Initialized to the default size.
1470            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1471            let volume = fixture.volume().volume().clone();
1472
1473            let data_has_persisted = || async {
1474                // We have to reopen the object each time since this is a distinct handle from the
1475                // one managed by the FxFile.
1476                let object =
1477                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1478                        .await
1479                        .expect("open_object failed");
1480                let data = object.contents(8192).await.expect("read failed");
1481                data.len() == 1 && data[..] == [123u8]
1482            };
1483            assert!(!data_has_persisted().await);
1484
1485            // Configure the flush task to only flush quickly on warning.
1486            let flush_config = MemoryPressureConfig {
1487                mem_normal: MemoryPressureLevelConfig {
1488                    background_task_period: Duration::from_secs(20),
1489                    cache_size_limit: DIRENT_CACHE_LIMIT,
1490                    ..Default::default()
1491                },
1492                mem_warning: MemoryPressureLevelConfig {
1493                    background_task_period: Duration::from_millis(100),
1494                    cache_size_limit: 100,
1495                    background_task_initial_delay: Duration::from_millis(100),
1496                    ..Default::default()
1497                },
1498                mem_critical: MemoryPressureLevelConfig {
1499                    background_task_period: Duration::from_secs(20),
1500                    cache_size_limit: 50,
1501                    ..Default::default()
1502                },
1503            };
1504            fixture.volume().volume().start_background_task(
1505                flush_config,
1506                fixture.volumes_directory().memory_pressure_monitor(),
1507            );
1508
1509            // Send the memory pressure update.
1510            fixture
1511                .memory_pressure_proxy()
1512                .on_level_changed(MemoryPressureLevel::Warning)
1513                .await
1514                .expect("Failed to send memory pressure level change");
1515
1516            // Wait a bit of time for the flush to occur (but less than the normal and critical
1517            // periods).
1518            const MAX_WAIT: Duration = Duration::from_secs(3);
1519            let wait_increments = Duration::from_millis(400);
1520            let mut total_waited = Duration::ZERO;
1521
1522            while total_waited < MAX_WAIT {
1523                fasync::Timer::new(wait_increments).await;
1524                total_waited += wait_increments;
1525
1526                if data_has_persisted().await {
1527                    break;
1528                }
1529            }
1530
1531            assert!(data_has_persisted().await);
1532            assert_eq!(fixture.volume().volume().dirent_cache().limit(), 100);
1533        }
1534
1535        fixture.close().await;
1536    }
1537
1538    #[fuchsia::test(threads = 2)]
1539    async fn test_background_flush_with_critical_memory_pressure() {
1540        let fixture = TestFixture::new().await;
1541        {
1542            let file = open_file_checked(
1543                fixture.root(),
1544                "file",
1545                fio::Flags::FLAG_MAYBE_CREATE
1546                    | fio::PERM_READABLE
1547                    | fio::PERM_WRITABLE
1548                    | fio::Flags::PROTOCOL_FILE,
1549                &Default::default(),
1550            )
1551            .await;
1552            let object_id = file
1553                .get_attributes(fio::NodeAttributesQuery::ID)
1554                .await
1555                .expect("Fidl get attr")
1556                .expect("get attr")
1557                .1
1558                .id
1559                .unwrap();
1560
1561            // Write some data to the file, which will only go to the cache for now.
1562            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1563
1564            // Initialized to the default size.
1565            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1566            let volume = fixture.volume().volume().clone();
1567
1568            let data_has_persisted = || async {
1569                // We have to reopen the object each time since this is a distinct handle from the
1570                // one managed by the FxFile.
1571                let object =
1572                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1573                        .await
1574                        .expect("open_object failed");
1575                let data = object.contents(8192).await.expect("read failed");
1576                data.len() == 1 && data[..] == [123u8]
1577            };
1578            assert!(!data_has_persisted().await);
1579
1580            let flush_config = MemoryPressureConfig {
1581                mem_normal: MemoryPressureLevelConfig {
1582                    cache_size_limit: DIRENT_CACHE_LIMIT,
1583                    ..Default::default()
1584                },
1585                mem_warning: MemoryPressureLevelConfig {
1586                    cache_size_limit: 100,
1587                    ..Default::default()
1588                },
1589                mem_critical: MemoryPressureLevelConfig {
1590                    cache_size_limit: 50,
1591                    ..Default::default()
1592                },
1593            };
1594            fixture.volume().volume().start_background_task(
1595                flush_config,
1596                fixture.volumes_directory().memory_pressure_monitor(),
1597            );
1598
1599            // Send the memory pressure update.
1600            fixture
1601                .memory_pressure_proxy()
1602                .on_level_changed(MemoryPressureLevel::Critical)
1603                .await
1604                .expect("Failed to send memory pressure level change");
1605
1606            // Critical memory should trigger a flush immediately so expect a flush very quickly.
1607            const MAX_WAIT: Duration = Duration::from_secs(2);
1608            let wait_increments = Duration::from_millis(400);
1609            let mut total_waited = Duration::ZERO;
1610
1611            while total_waited < MAX_WAIT {
1612                fasync::Timer::new(wait_increments).await;
1613                total_waited += wait_increments;
1614
1615                if data_has_persisted().await {
1616                    break;
1617                }
1618            }
1619
1620            assert!(data_has_persisted().await);
1621            assert_eq!(fixture.volume().volume().dirent_cache().limit(), 50);
1622        }
1623
1624        fixture.close().await;
1625    }
1626
1627    // This test verifies that it is safe to query a volume after it is unmounted in case of a race
1628    // during unmount/shutdown.
1629    #[fuchsia::test(threads = 2)]
1630    async fn test_query_info_unmounted_volume() {
1631        const TEST_VOLUME: &str = "test_1234";
1632        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1633
1634        let fixture = TestFixture::new().await;
1635        {
1636            let volumes_directory = fixture.volumes_directory();
1637            let volume = volumes_directory
1638                .create_and_mount_volume(TEST_VOLUME, Some(crypt.clone()), false, None)
1639                .await
1640                .unwrap();
1641
1642            assert!(volume.volume().get_volume_data().await.is_some());
1643
1644            // Unmount it but keep a reference.
1645            volumes_directory
1646                .lock()
1647                .await
1648                .unmount(volume.volume().store().store_object_id())
1649                .await
1650                .expect("unmount failed");
1651
1652            // This returns None, but doesn't crash.
1653            assert!(volume.volume().get_volume_data().await.is_none());
1654        }
1655        fixture.close().await;
1656    }
1657
1658    #[fuchsia::test]
1659    async fn test_project_limit_persistence() {
1660        const BYTES_LIMIT_1: u64 = 123456;
1661        const NODES_LIMIT_1: u64 = 4321;
1662        const BYTES_LIMIT_2: u64 = 456789;
1663        const NODES_LIMIT_2: u64 = 9876;
1664        const VOLUME_NAME: &str = "A";
1665        const FILE_NAME: &str = "B";
1666        const PROJECT_ID: u64 = 42;
1667        const PROJECT_ID2: u64 = 343;
1668        let volume_store_id;
1669        let node_id;
1670        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
1671        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1672        {
1673            let blob_resupplied_count =
1674                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1675            let volumes_directory = VolumesDirectory::new(
1676                root_volume(filesystem.clone()).await.unwrap(),
1677                Weak::new(),
1678                None,
1679                blob_resupplied_count,
1680                MemoryPressureConfig::default(),
1681            )
1682            .await
1683            .unwrap();
1684
1685            let volume_and_root = volumes_directory
1686                .create_and_mount_volume(VOLUME_NAME, None, false, None)
1687                .await
1688                .expect("create unencrypted volume failed");
1689            volume_store_id = volume_and_root.volume().store().store_object_id();
1690
1691            let (volume_dir_proxy, dir_server_end) =
1692                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1693            volumes_directory
1694                .serve_volume(&volume_and_root, dir_server_end, false)
1695                .expect("serve_volume failed");
1696
1697            let project_proxy =
1698                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1699                    .expect("Unable to connect to project id service");
1700
1701            project_proxy
1702                .set_limit(0, BYTES_LIMIT_1, NODES_LIMIT_1)
1703                .await
1704                .unwrap()
1705                .expect_err("Should not set limits for project id 0");
1706
1707            assert_eq!(
1708                project_proxy.clear(0).await.unwrap().expect_err("Should not clear project id 0"),
1709                Status::OUT_OF_RANGE.into_raw()
1710            );
1711
1712            assert_eq!(
1713                project_proxy
1714                    .info(0)
1715                    .await
1716                    .unwrap()
1717                    .expect_err("Should not get info for project id 0"),
1718                Status::OUT_OF_RANGE.into_raw()
1719            );
1720
1721            project_proxy
1722                .set_limit(PROJECT_ID, BYTES_LIMIT_1, NODES_LIMIT_1)
1723                .await
1724                .unwrap()
1725                .expect("To set limits");
1726            {
1727                let BytesAndNodes { bytes, nodes } =
1728                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1729                assert_eq!(bytes, BYTES_LIMIT_1);
1730                assert_eq!(nodes, NODES_LIMIT_1);
1731            }
1732
1733            let file_proxy = {
1734                let (root_proxy, root_server_end) =
1735                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1736                volume_dir_proxy
1737                    .open(
1738                        "root",
1739                        fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
1740                        &Default::default(),
1741                        root_server_end.into_channel(),
1742                    )
1743                    .expect("Failed to open volume root");
1744
1745                open_file_checked(
1746                    &root_proxy,
1747                    FILE_NAME,
1748                    fio::Flags::FLAG_MAYBE_CREATE
1749                        | fio::PERM_READABLE
1750                        | fio::PERM_WRITABLE
1751                        | fio::Flags::PROTOCOL_FILE,
1752                    &Default::default(),
1753                )
1754                .await
1755            };
1756
1757            let (_, immutable_attributes) =
1758                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
1759            node_id = immutable_attributes.id.unwrap();
1760
1761            project_proxy
1762                .set_for_node(node_id, 0)
1763                .await
1764                .unwrap()
1765                .expect_err("Should not set 0 project id");
1766
1767            project_proxy
1768                .set_for_node(node_id, PROJECT_ID)
1769                .await
1770                .unwrap()
1771                .expect("Setting project on node");
1772
1773            project_proxy
1774                .set_limit(PROJECT_ID, BYTES_LIMIT_2, NODES_LIMIT_2)
1775                .await
1776                .unwrap()
1777                .expect("To set limits");
1778            {
1779                let BytesAndNodes { bytes, nodes } =
1780                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1781                assert_eq!(bytes, BYTES_LIMIT_2);
1782                assert_eq!(nodes, NODES_LIMIT_2);
1783            }
1784
1785            assert_eq!(
1786                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1787                PROJECT_ID
1788            );
1789
1790            volumes_directory.terminate().await;
1791            filesystem.close().await.expect("close filesystem failed");
1792        }
1793        device = filesystem.take_device().await;
1794        device.ensure_unique();
1795        device.reopen(false);
1796        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1797        {
1798            fsck(filesystem.clone()).await.expect("Fsck");
1799            fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
1800            let blob_resupplied_count =
1801                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1802            let volumes_directory = VolumesDirectory::new(
1803                root_volume(filesystem.clone()).await.unwrap(),
1804                Weak::new(),
1805                None,
1806                blob_resupplied_count,
1807                MemoryPressureConfig::default(),
1808            )
1809            .await
1810            .unwrap();
1811            let volume_and_root = volumes_directory
1812                .mount_volume(VOLUME_NAME, None, false)
1813                .await
1814                .expect("mount unencrypted volume failed");
1815
1816            let (volume_proxy, _scope) = crate::volumes_directory::serve_startup_volume_proxy(
1817                &volumes_directory,
1818                VOLUME_NAME,
1819            );
1820
1821            let project_proxy = {
1822                let (volume_dir_proxy, dir_server_end) =
1823                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1824                volumes_directory
1825                    .serve_volume(&volume_and_root, dir_server_end, false)
1826                    .expect("serve_volume failed");
1827
1828                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1829                    .expect("Unable to connect to project id service")
1830            };
1831
1832            let usage_bytes_and_nodes = {
1833                let (
1834                    BytesAndNodes { bytes: limit_bytes, nodes: limit_nodes },
1835                    usage_bytes_and_nodes,
1836                ) = project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info");
1837                assert_eq!(limit_bytes, BYTES_LIMIT_2);
1838                assert_eq!(limit_nodes, NODES_LIMIT_2);
1839                usage_bytes_and_nodes
1840            };
1841
1842            // Should be unable to clear the project limit, due to being in use.
1843            project_proxy.clear(PROJECT_ID).await.unwrap().expect("To clear limits");
1844
1845            assert_eq!(
1846                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1847                PROJECT_ID
1848            );
1849            project_proxy
1850                .set_for_node(node_id, PROJECT_ID2)
1851                .await
1852                .unwrap()
1853                .expect("Changing project");
1854            assert_eq!(
1855                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1856                PROJECT_ID2
1857            );
1858
1859            assert_eq!(
1860                project_proxy.info(PROJECT_ID).await.unwrap().expect_err("Expect missing limits"),
1861                Status::NOT_FOUND.into_raw()
1862            );
1863            assert_eq!(
1864                project_proxy.info(PROJECT_ID2).await.unwrap().expect("Fetching project info").1,
1865                usage_bytes_and_nodes
1866            );
1867
1868            std::mem::drop(volume_proxy);
1869            volumes_directory.terminate().await;
1870            std::mem::drop(volumes_directory);
1871            filesystem.close().await.expect("close filesystem failed");
1872        }
1873        device = filesystem.take_device().await;
1874        device.ensure_unique();
1875        device.reopen(false);
1876        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1877        fsck(filesystem.clone()).await.expect("Fsck");
1878        fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
1879        let blob_resupplied_count =
1880            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1881        let volumes_directory = VolumesDirectory::new(
1882            root_volume(filesystem.clone()).await.unwrap(),
1883            Weak::new(),
1884            None,
1885            blob_resupplied_count,
1886            MemoryPressureConfig::default(),
1887        )
1888        .await
1889        .unwrap();
1890        let volume_and_root = volumes_directory
1891            .mount_volume(VOLUME_NAME, None, false)
1892            .await
1893            .expect("mount unencrypted volume failed");
1894        let (volume_dir_proxy, dir_server_end) =
1895            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1896        volumes_directory
1897            .serve_volume(&volume_and_root, dir_server_end, false)
1898            .expect("serve_volume failed");
1899        let project_proxy = connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1900            .expect("Unable to connect to project id service");
1901        assert_eq!(
1902            project_proxy.info(PROJECT_ID).await.unwrap().expect_err("Expect missing limits"),
1903            Status::NOT_FOUND.into_raw()
1904        );
1905        volumes_directory.terminate().await;
1906        std::mem::drop(volumes_directory);
1907        filesystem.close().await.expect("close filesystem failed");
1908    }
1909
1910    #[fuchsia::test]
1911    async fn test_project_limit_accounting() {
1912        const BYTES_LIMIT: u64 = 123456;
1913        const NODES_LIMIT: u64 = 4321;
1914        const VOLUME_NAME: &str = "A";
1915        const FILE_NAME: &str = "B";
1916        const PROJECT_ID: u64 = 42;
1917        let volume_store_id;
1918        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
1919        let first_object_id;
1920        let mut bytes_usage;
1921        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1922        {
1923            let blob_resupplied_count =
1924                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1925            let volumes_directory = VolumesDirectory::new(
1926                root_volume(filesystem.clone()).await.unwrap(),
1927                Weak::new(),
1928                None,
1929                blob_resupplied_count,
1930                MemoryPressureConfig::default(),
1931            )
1932            .await
1933            .unwrap();
1934
1935            let volume_and_root = volumes_directory
1936                .create_and_mount_volume(
1937                    VOLUME_NAME,
1938                    Some(Arc::new(new_insecure_crypt())),
1939                    false,
1940                    None,
1941                )
1942                .await
1943                .expect("create unencrypted volume failed");
1944            volume_store_id = volume_and_root.volume().store().store_object_id();
1945
1946            let (volume_dir_proxy, dir_server_end) =
1947                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1948            volumes_directory
1949                .serve_volume(&volume_and_root, dir_server_end, false)
1950                .expect("serve_volume failed");
1951
1952            let project_proxy =
1953                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1954                    .expect("Unable to connect to project id service");
1955
1956            project_proxy
1957                .set_limit(PROJECT_ID, BYTES_LIMIT, NODES_LIMIT)
1958                .await
1959                .unwrap()
1960                .expect("To set limits");
1961            {
1962                let BytesAndNodes { bytes, nodes } =
1963                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1964                assert_eq!(bytes, BYTES_LIMIT);
1965                assert_eq!(nodes, NODES_LIMIT);
1966            }
1967
1968            let file_proxy = {
1969                let (root_proxy, root_server_end) =
1970                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1971                volume_dir_proxy
1972                    .open(
1973                        "root",
1974                        fio::PERM_READABLE | fio::PERM_WRITABLE,
1975                        &Default::default(),
1976                        root_server_end.into_channel(),
1977                    )
1978                    .expect("Failed to open volume root");
1979
1980                open_file_checked(
1981                    &root_proxy,
1982                    FILE_NAME,
1983                    fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
1984                    &Default::default(),
1985                )
1986                .await
1987            };
1988
1989            assert_eq!(
1990                8192,
1991                file_proxy
1992                    .write(&vec![0xff as u8; 8192])
1993                    .await
1994                    .expect("FIDL call failed")
1995                    .map_err(Status::from_raw)
1996                    .expect("File write was successful")
1997            );
1998            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
1999
2000            {
2001                let BytesAndNodes { bytes, nodes } =
2002                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2003                assert_eq!(bytes, 0);
2004                assert_eq!(nodes, 0);
2005            }
2006
2007            let (_, immutable_attributes) =
2008                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2009            let node_id = immutable_attributes.id.unwrap();
2010
2011            first_object_id = node_id;
2012            project_proxy
2013                .set_for_node(node_id, PROJECT_ID)
2014                .await
2015                .unwrap()
2016                .expect("Setting project on node");
2017
2018            bytes_usage = {
2019                let BytesAndNodes { bytes, nodes } =
2020                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2021                assert!(bytes > 0);
2022                assert_eq!(nodes, 1);
2023                bytes
2024            };
2025
2026            // Grow the file by a block.
2027            assert_eq!(
2028                8192,
2029                file_proxy
2030                    .write(&vec![0xff as u8; 8192])
2031                    .await
2032                    .expect("FIDL call failed")
2033                    .map_err(Status::from_raw)
2034                    .expect("File write was successful")
2035            );
2036            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
2037            bytes_usage = {
2038                let BytesAndNodes { bytes, nodes } =
2039                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2040                assert!(bytes > bytes_usage);
2041                assert_eq!(nodes, 1);
2042                bytes
2043            };
2044
2045            volumes_directory.terminate().await;
2046            filesystem.close().await.expect("close filesystem failed");
2047        }
2048        device = filesystem.take_device().await;
2049        device.ensure_unique();
2050        device.reopen(false);
2051        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2052        {
2053            fsck(filesystem.clone()).await.expect("Fsck");
2054            fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
2055                .await
2056                .expect("Fsck volume");
2057            let blob_resupplied_count =
2058                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2059            let volumes_directory = VolumesDirectory::new(
2060                root_volume(filesystem.clone()).await.unwrap(),
2061                Weak::new(),
2062                None,
2063                blob_resupplied_count,
2064                MemoryPressureConfig::default(),
2065            )
2066            .await
2067            .unwrap();
2068            let volume_and_root = volumes_directory
2069                .mount_volume(VOLUME_NAME, Some(Arc::new(new_insecure_crypt())), false)
2070                .await
2071                .expect("mount unencrypted volume failed");
2072
2073            let (root_proxy, project_proxy) = {
2074                let (volume_dir_proxy, dir_server_end) =
2075                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2076                volumes_directory
2077                    .serve_volume(&volume_and_root, dir_server_end, false)
2078                    .expect("serve_volume failed");
2079
2080                let (root_proxy, root_server_end) =
2081                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2082                volume_dir_proxy
2083                    .open(
2084                        "root",
2085                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2086                        &Default::default(),
2087                        root_server_end.into_channel(),
2088                    )
2089                    .expect("Failed to open volume root");
2090                let project_proxy = {
2091                    connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2092                        .expect("Unable to connect to project id service")
2093                };
2094                (root_proxy, project_proxy)
2095            };
2096
2097            {
2098                let BytesAndNodes { bytes, nodes } =
2099                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2100                assert_eq!(bytes, bytes_usage);
2101                assert_eq!(nodes, 1);
2102            }
2103
2104            assert_eq!(
2105                project_proxy
2106                    .get_for_node(first_object_id)
2107                    .await
2108                    .unwrap()
2109                    .expect("Checking project"),
2110                PROJECT_ID
2111            );
2112            root_proxy
2113                .unlink(FILE_NAME, &fio::UnlinkOptions::default())
2114                .await
2115                .expect("FIDL call failed")
2116                .expect("unlink failed");
2117            filesystem.graveyard().flush().await;
2118
2119            {
2120                let BytesAndNodes { bytes, nodes } =
2121                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2122                assert_eq!(bytes, 0);
2123                assert_eq!(nodes, 0);
2124            }
2125
2126            let file_proxy = open_file_checked(
2127                &root_proxy,
2128                FILE_NAME,
2129                fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2130                &Default::default(),
2131            )
2132            .await;
2133
2134            let (_, immutable_attributes) =
2135                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2136            let node_id = immutable_attributes.id.unwrap();
2137
2138            project_proxy
2139                .set_for_node(node_id, PROJECT_ID)
2140                .await
2141                .unwrap()
2142                .expect("Applying project");
2143
2144            bytes_usage = {
2145                let BytesAndNodes { bytes, nodes } =
2146                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2147                // Empty file should have less space than the non-empty file from above.
2148                assert!(bytes < bytes_usage);
2149                assert_eq!(nodes, 1);
2150                bytes
2151            };
2152
2153            assert_eq!(
2154                8192,
2155                file_proxy
2156                    .write(&vec![0xff as u8; 8192])
2157                    .await
2158                    .expect("FIDL call failed")
2159                    .map_err(Status::from_raw)
2160                    .expect("File write was successful")
2161            );
2162            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
2163            bytes_usage = {
2164                let BytesAndNodes { bytes, nodes } =
2165                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2166                assert!(bytes > bytes_usage);
2167                assert_eq!(nodes, 1);
2168                bytes
2169            };
2170
2171            // Trim to zero. Bytes should decrease.
2172            file_proxy.resize(0).await.expect("FIDL call failed").expect("Resize file");
2173            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
2174            {
2175                let BytesAndNodes { bytes, nodes } =
2176                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2177                assert!(bytes < bytes_usage);
2178                assert_eq!(nodes, 1);
2179            };
2180
2181            // Dropping node from project. Usage should go to zero.
2182            project_proxy
2183                .clear_for_node(node_id)
2184                .await
2185                .expect("FIDL call failed")
2186                .expect("Clear failed.");
2187            {
2188                let BytesAndNodes { bytes, nodes } =
2189                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2190                assert_eq!(bytes, 0);
2191                assert_eq!(nodes, 0);
2192            };
2193
2194            volumes_directory.terminate().await;
2195            filesystem.close().await.expect("close filesystem failed");
2196        }
2197        device = filesystem.take_device().await;
2198        device.ensure_unique();
2199        device.reopen(false);
2200        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2201        fsck(filesystem.clone()).await.expect("Fsck");
2202        fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
2203            .await
2204            .expect("Fsck volume");
2205        filesystem.close().await.expect("close filesystem failed");
2206    }
2207
2208    #[fuchsia::test]
2209    async fn test_project_node_inheritance() {
2210        const BYTES_LIMIT: u64 = 123456;
2211        const NODES_LIMIT: u64 = 4321;
2212        const VOLUME_NAME: &str = "A";
2213        const DIR_NAME: &str = "B";
2214        const SUBDIR_NAME: &str = "C";
2215        const FILE_NAME: &str = "D";
2216        const PROJECT_ID: u64 = 42;
2217        let volume_store_id;
2218        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
2219        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2220        {
2221            let blob_resupplied_count =
2222                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2223            let volumes_directory = VolumesDirectory::new(
2224                root_volume(filesystem.clone()).await.unwrap(),
2225                Weak::new(),
2226                None,
2227                blob_resupplied_count,
2228                MemoryPressureConfig::default(),
2229            )
2230            .await
2231            .unwrap();
2232
2233            let volume_and_root = volumes_directory
2234                .create_and_mount_volume(
2235                    VOLUME_NAME,
2236                    Some(Arc::new(new_insecure_crypt())),
2237                    false,
2238                    None,
2239                )
2240                .await
2241                .expect("create unencrypted volume failed");
2242            volume_store_id = volume_and_root.volume().store().store_object_id();
2243
2244            let (volume_dir_proxy, dir_server_end) =
2245                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2246            volumes_directory
2247                .serve_volume(&volume_and_root, dir_server_end, false)
2248                .expect("serve_volume failed");
2249
2250            let project_proxy =
2251                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2252                    .expect("Unable to connect to project id service");
2253
2254            project_proxy
2255                .set_limit(PROJECT_ID, BYTES_LIMIT, NODES_LIMIT)
2256                .await
2257                .unwrap()
2258                .expect("To set limits");
2259
2260            let dir_proxy = {
2261                let (root_proxy, root_server_end) =
2262                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2263                volume_dir_proxy
2264                    .open(
2265                        "root",
2266                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2267                        &Default::default(),
2268                        root_server_end.into_channel(),
2269                    )
2270                    .expect("Failed to open volume root");
2271
2272                open_dir_checked(
2273                    &root_proxy,
2274                    DIR_NAME,
2275                    fio::Flags::FLAG_MAYBE_CREATE
2276                        | fio::PERM_READABLE
2277                        | fio::PERM_WRITABLE
2278                        | fio::Flags::PROTOCOL_DIRECTORY,
2279                    Default::default(),
2280                )
2281                .await
2282            };
2283            {
2284                let (_, immutable_attributes) =
2285                    dir_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2286                let node_id = immutable_attributes.id.unwrap();
2287
2288                project_proxy
2289                    .set_for_node(node_id, PROJECT_ID)
2290                    .await
2291                    .unwrap()
2292                    .expect("Setting project on node");
2293            }
2294
2295            let subdir_proxy = open_dir_checked(
2296                &dir_proxy,
2297                SUBDIR_NAME,
2298                fio::Flags::FLAG_MAYBE_CREATE
2299                    | fio::PERM_READABLE
2300                    | fio::PERM_WRITABLE
2301                    | fio::Flags::PROTOCOL_DIRECTORY,
2302                Default::default(),
2303            )
2304            .await;
2305            {
2306                let (_, immutable_attributes) = subdir_proxy
2307                    .get_attributes(fio::NodeAttributesQuery::ID)
2308                    .await
2309                    .unwrap()
2310                    .unwrap();
2311                let node_id = immutable_attributes.id.unwrap();
2312
2313                assert_eq!(
2314                    project_proxy
2315                        .get_for_node(node_id)
2316                        .await
2317                        .unwrap()
2318                        .expect("Setting project on node"),
2319                    PROJECT_ID
2320                );
2321            }
2322
2323            let file_proxy = open_file_checked(
2324                &subdir_proxy,
2325                FILE_NAME,
2326                fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2327                &Default::default(),
2328            )
2329            .await;
2330            {
2331                let (_, immutable_attributes) =
2332                    file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2333                let node_id = immutable_attributes.id.unwrap();
2334
2335                assert_eq!(
2336                    project_proxy
2337                        .get_for_node(node_id)
2338                        .await
2339                        .unwrap()
2340                        .expect("Setting project on node"),
2341                    PROJECT_ID
2342                );
2343            }
2344
2345            // An unnamed temporary file is created slightly differently to a regular file object.
2346            // Just in case, check that it inherits project ID as well.
2347            let tmpfile_proxy = open_file_checked(
2348                &subdir_proxy,
2349                ".",
2350                fio::Flags::PROTOCOL_FILE
2351                    | fio::Flags::FLAG_CREATE_AS_UNNAMED_TEMPORARY
2352                    | fio::PERM_READABLE,
2353                &fio::Options::default(),
2354            )
2355            .await;
2356            {
2357                let (_, immutable_attributes) = tmpfile_proxy
2358                    .get_attributes(fio::NodeAttributesQuery::ID)
2359                    .await
2360                    .unwrap()
2361                    .unwrap();
2362                let node_id: u64 = immutable_attributes.id.unwrap();
2363                assert_eq!(
2364                    project_proxy
2365                        .get_for_node(node_id)
2366                        .await
2367                        .unwrap()
2368                        .expect("Setting project on node"),
2369                    PROJECT_ID
2370                );
2371            }
2372
2373            let BytesAndNodes { nodes, .. } =
2374                project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2375            assert_eq!(nodes, 3);
2376            volumes_directory.terminate().await;
2377            filesystem.close().await.expect("close filesystem failed");
2378        }
2379        device = filesystem.take_device().await;
2380        device.ensure_unique();
2381        device.reopen(false);
2382        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2383        fsck(filesystem.clone()).await.expect("Fsck");
2384        fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
2385            .await
2386            .expect("Fsck volume");
2387        filesystem.close().await.expect("close filesystem failed");
2388    }
2389
2390    #[fuchsia::test]
2391    async fn test_project_listing() {
2392        const VOLUME_NAME: &str = "A";
2393        const FILE_NAME: &str = "B";
2394        const NON_ZERO_PROJECT_ID: u64 = 3;
2395        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
2396        let volume_store_id;
2397        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2398        {
2399            let blob_resupplied_count =
2400                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2401            let volumes_directory = VolumesDirectory::new(
2402                root_volume(filesystem.clone()).await.unwrap(),
2403                Weak::new(),
2404                None,
2405                blob_resupplied_count,
2406                MemoryPressureConfig::default(),
2407            )
2408            .await
2409            .unwrap();
2410            let volume_and_root = volumes_directory
2411                .create_and_mount_volume(VOLUME_NAME, None, false, None)
2412                .await
2413                .expect("create unencrypted volume failed");
2414            volume_store_id = volume_and_root.volume().store().store_object_id();
2415
2416            let (volume_dir_proxy, dir_server_end) =
2417                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2418            volumes_directory
2419                .serve_volume(&volume_and_root, dir_server_end, false)
2420                .expect("serve_volume failed");
2421            let project_proxy =
2422                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2423                    .expect("Unable to connect to project id service");
2424            // This is just to ensure that the small numbers below can be used for this test.
2425            assert!(FxVolume::MAX_PROJECT_ENTRIES >= 4);
2426            // Create a bunch of proxies. 3 more than the limit to ensure pagination.
2427            let num_entries = u64::try_from(FxVolume::MAX_PROJECT_ENTRIES + 3).unwrap();
2428            for project_id in 1..=num_entries {
2429                project_proxy.set_limit(project_id, 1, 1).await.unwrap().expect("To set limits");
2430            }
2431
2432            // Add one usage entry to be interspersed with the limit entries. Verifies that the
2433            // iterator will progress passed it with no effect.
2434            let file_proxy = {
2435                let (root_proxy, root_server_end) =
2436                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2437                volume_dir_proxy
2438                    .open(
2439                        "root",
2440                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2441                        &Default::default(),
2442                        root_server_end.into_channel(),
2443                    )
2444                    .expect("Failed to open volume root");
2445
2446                open_file_checked(
2447                    &root_proxy,
2448                    FILE_NAME,
2449                    fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2450                    &Default::default(),
2451                )
2452                .await
2453            };
2454            let (_, immutable_attributes) =
2455                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2456            let node_id = immutable_attributes.id.unwrap();
2457            project_proxy
2458                .set_for_node(node_id, NON_ZERO_PROJECT_ID)
2459                .await
2460                .unwrap()
2461                .expect("Setting project on node");
2462            {
2463                let BytesAndNodes { nodes, .. } = project_proxy
2464                    .info(NON_ZERO_PROJECT_ID)
2465                    .await
2466                    .unwrap()
2467                    .expect("Fetching project info")
2468                    .1;
2469                assert_eq!(nodes, 1);
2470            }
2471
2472            // If this `unwrap()` fails, it is likely the MAX_PROJECT_ENTRIES is too large for fidl.
2473            let (mut entries, mut next_token) =
2474                project_proxy.list(None).await.unwrap().expect("To get project listing");
2475            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2476            assert!(next_token.is_some());
2477            assert!(entries.contains(&1));
2478            assert!(entries.contains(&3));
2479            assert!(!entries.contains(&num_entries));
2480            // Page two should have a small set at the end.
2481            (entries, next_token) = project_proxy
2482                .list(next_token.as_deref())
2483                .await
2484                .unwrap()
2485                .expect("To get project listing");
2486            assert_eq!(entries.len(), 3);
2487            assert!(next_token.is_none());
2488            assert!(entries.contains(&num_entries));
2489            assert!(!entries.contains(&1));
2490            assert!(!entries.contains(&3));
2491            // Delete a couple and list all again, but one has usage still.
2492            project_proxy.clear(1).await.unwrap().expect("Clear project");
2493            project_proxy.clear(3).await.unwrap().expect("Clear project");
2494            (entries, next_token) =
2495                project_proxy.list(None).await.unwrap().expect("To get project listing");
2496            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2497            assert!(next_token.is_some());
2498            assert!(!entries.contains(&num_entries));
2499            assert!(!entries.contains(&1));
2500            assert!(entries.contains(&3));
2501            (entries, next_token) = project_proxy
2502                .list(next_token.as_deref())
2503                .await
2504                .unwrap()
2505                .expect("To get project listing");
2506            assert_eq!(entries.len(), 2);
2507            assert!(next_token.is_none());
2508            assert!(entries.contains(&num_entries));
2509            // Delete two more to hit the edge case.
2510            project_proxy.clear(2).await.unwrap().expect("Clear project");
2511            project_proxy.clear(4).await.unwrap().expect("Clear project");
2512            (entries, next_token) =
2513                project_proxy.list(None).await.unwrap().expect("To get project listing");
2514            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2515            assert!(next_token.is_none());
2516            assert!(entries.contains(&num_entries));
2517            volumes_directory.terminate().await;
2518            filesystem.close().await.expect("close filesystem failed");
2519        }
2520        device = filesystem.take_device().await;
2521        device.ensure_unique();
2522        device.reopen(false);
2523        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2524        fsck(filesystem.clone()).await.expect("Fsck");
2525        fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
2526        filesystem.close().await.expect("close filesystem failed");
2527    }
2528
2529    #[fuchsia::test(threads = 10)]
2530    async fn test_profile_blob() {
2531        let mut hashes = Vec::new();
2532        let device = {
2533            let fixture = blob_testing::new_blob_fixture().await;
2534
2535            for i in 0..3u64 {
2536                let hash =
2537                    fixture.write_blob(i.to_string().as_bytes(), CompressionMode::Never).await;
2538                hashes.push(hash);
2539            }
2540            fixture.close().await
2541        };
2542        device.ensure_unique();
2543
2544        device.reopen(false);
2545        let mut device = {
2546            let fixture = blob_testing::open_blob_fixture(device).await;
2547            fixture
2548                .volume()
2549                .volume()
2550                .record_and_replay_profile(new_profile_state(true), "foo")
2551                .await
2552                .expect("Recording");
2553
2554            // Page in the zero offsets only to avoid readahead strangeness.
2555            let mut writable = [0u8];
2556            for hash in &hashes {
2557                let vmo = fixture.get_blob_vmo(*hash).await;
2558                vmo.read(&mut writable, 0).expect("Vmo read");
2559            }
2560            fixture.volume().volume().stop_profile_tasks().await;
2561            fixture.close().await
2562        };
2563
2564        // Do this multiple times to ensure that the re-recording doesn't drop anything.
2565        for i in 0..3 {
2566            device.ensure_unique();
2567            device.reopen(false);
2568            let fixture = blob_testing::open_blob_fixture(device).await;
2569            {
2570                // Ensure that nothing is paged in right now.
2571                for hash in &hashes {
2572                    let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2573                    assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2574                }
2575
2576                fixture
2577                    .volume()
2578                    .volume()
2579                    .record_and_replay_profile(new_profile_state(true), "foo")
2580                    .await
2581                    .expect("Replaying");
2582
2583                // Move the file in flight to ensure a new version lands to be used next time.
2584                {
2585                    let store = fixture.volume().volume().store();
2586                    let store_id = store.store_object_id();
2587                    let dir = fixture.volume().volume().get_profile_directory().await.unwrap();
2588                    let old_file = dir.lookup("foo").await.unwrap().unwrap().0;
2589                    let mut transaction = store
2590                        .new_transaction(
2591                            lock_keys!(
2592                                LockKey::object(store_id, dir.object_id()),
2593                                LockKey::object(store_id, old_file),
2594                            ),
2595                            Options::default(),
2596                        )
2597                        .await
2598                        .unwrap();
2599                    replace_child(&mut transaction, Some((&dir, "foo")), (&dir, &i.to_string()))
2600                        .await
2601                        .expect("Replace old profile.");
2602                    transaction.commit().await.unwrap();
2603                    assert!(
2604                        dir.lookup("foo").await.unwrap().is_none(),
2605                        "Old profile should be moved"
2606                    );
2607                }
2608
2609                // Await all data being played back by checking that things have paged in.
2610                async {
2611                    for hash in &hashes {
2612                        // Fetch vmo this way as well to ensure that the open is counting the file
2613                        // as used in the current recording.
2614                        let _vmo = fixture.get_blob_vmo(*hash).await;
2615                        let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2616                        while blob.vmo().info().unwrap().populated_bytes == 0 {
2617                            fasync::Timer::new(Duration::from_millis(25)).await;
2618                        }
2619                    }
2620                }
2621                .on_timeout(std::time::Duration::from_secs(120), || {
2622                    panic!("Replay did not page in for all VMOs")
2623                })
2624                .await;
2625
2626                // Complete the recording.
2627                fixture.volume().volume().stop_profile_tasks().await;
2628            }
2629            device = fixture.close().await;
2630        }
2631    }
2632
2633    #[fuchsia::test(threads = 10)]
2634    async fn test_profile_file() {
2635        let mut hashes = Vec::new();
2636        let crypt_file_id;
2637        let device = {
2638            let fixture = TestFixture::new().await;
2639            // Include a crypt file to access during the recording. It should not be added.
2640            {
2641                let crypt_dir = open_dir_checked(
2642                    fixture.root(),
2643                    "crypt_dir",
2644                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2645                    Default::default(),
2646                )
2647                .await;
2648                let crypt: Arc<CryptBase> = fixture.crypt().unwrap();
2649                crypt
2650                    .add_wrapping_key(WRAPPING_KEY_ID, [1; 32].into())
2651                    .expect("add_wrapping_key failed");
2652                crypt_dir
2653                    .update_attributes(&fio::MutableNodeAttributes {
2654                        wrapping_key_id: Some(WRAPPING_KEY_ID),
2655                        ..Default::default()
2656                    })
2657                    .await
2658                    .expect("update_attributes wire call failed")
2659                    .expect("update_attributes failed");
2660                let crypt_file = open_file_checked(
2661                    &crypt_dir,
2662                    "crypt_file",
2663                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2664                    &Default::default(),
2665                )
2666                .await;
2667                crypt_file.write("asdf".as_bytes()).await.unwrap().expect("Writing crypt file");
2668                crypt_file_id = crypt_file
2669                    .get_attributes(fio::NodeAttributesQuery::ID)
2670                    .await
2671                    .unwrap()
2672                    .expect("Get id")
2673                    .1
2674                    .id
2675                    .expect("Reading id in response");
2676            }
2677
2678            for i in 0..3u64 {
2679                let file_proxy = open_file_checked(
2680                    fixture.root(),
2681                    &i.to_string(),
2682                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2683                    &Default::default(),
2684                )
2685                .await;
2686                file_proxy.write(i.to_string().as_bytes()).await.unwrap().expect("Writing file");
2687                let id = file_proxy
2688                    .get_attributes(fio::NodeAttributesQuery::ID)
2689                    .await
2690                    .unwrap()
2691                    .expect("Get id")
2692                    .1
2693                    .id
2694                    .expect("Reading id in response");
2695                hashes.push((i, id));
2696            }
2697            fixture.close().await
2698        };
2699        device.ensure_unique();
2700
2701        device.reopen(false);
2702        let mut device = {
2703            let fixture = TestFixture::open(
2704                device,
2705                TestFixtureOptions { format: false, ..Default::default() },
2706            )
2707            .await;
2708            fixture
2709                .volume()
2710                .volume()
2711                .record_and_replay_profile(new_profile_state(false), "foo")
2712                .await
2713                .expect("Recording");
2714
2715            {
2716                let crypt: Arc<CryptBase> = fixture.crypt().unwrap();
2717                crypt
2718                    .add_wrapping_key(WRAPPING_KEY_ID, [1; 32].into())
2719                    .expect("add wrapping key failed");
2720                let crypt_file = open_file_checked(
2721                    fixture.root(),
2722                    "crypt_dir/crypt_file",
2723                    fio::PERM_READABLE,
2724                    &Default::default(),
2725                )
2726                .await;
2727                crypt_file.read(1).await.unwrap().expect("Reading crypt file");
2728            }
2729            // Page in the zero offsets only to avoid readahead strangeness.
2730            for (i, _) in &hashes {
2731                let file_proxy = open_file_checked(
2732                    fixture.root(),
2733                    &i.to_string(),
2734                    fio::PERM_READABLE,
2735                    &Default::default(),
2736                )
2737                .await;
2738                file_proxy.read(1).await.unwrap().expect("Reading file");
2739            }
2740            fixture.volume().volume().stop_profile_tasks().await;
2741            fixture.close().await
2742        };
2743
2744        // Do this multiple times to ensure that the re-recording doesn't drop anything.
2745        for i in 0..3 {
2746            device.ensure_unique();
2747            device.reopen(false);
2748            let fixture = TestFixture::open(
2749                device,
2750                TestFixtureOptions { format: false, ..Default::default() },
2751            )
2752            .await;
2753            {
2754                // Need to get the root vmo to check committed bytes.
2755                let volume = fixture.volume().volume().clone();
2756                // Ensure that nothing is paged in right now.
2757                {
2758                    let crypt_file = volume
2759                        .get_or_load_node(crypt_file_id, ObjectDescriptor::File, None)
2760                        .await
2761                        .expect("Opening file internally")
2762                        .into_any()
2763                        .downcast::<FxFile>()
2764                        .expect("Should be file");
2765                    assert_eq!(crypt_file.vmo().info().unwrap().populated_bytes, 0);
2766                }
2767                for (_, id) in &hashes {
2768                    let file = volume
2769                        .get_or_load_node(*id, ObjectDescriptor::File, None)
2770                        .await
2771                        .expect("Opening file internally")
2772                        .into_any()
2773                        .downcast::<FxFile>()
2774                        .expect("Should be file");
2775                    assert_eq!(file.vmo().info().unwrap().populated_bytes, 0);
2776                }
2777
2778                fixture
2779                    .volume()
2780                    .volume()
2781                    .record_and_replay_profile(new_profile_state(false), "foo")
2782                    .await
2783                    .expect("Replaying");
2784
2785                // Move the file in flight to ensure a new version lands to be used next time.
2786                {
2787                    let store = fixture.volume().volume().store();
2788                    let store_id = store.store_object_id();
2789                    let dir = fixture.volume().volume().get_profile_directory().await.unwrap();
2790                    let old_file = dir.lookup("foo").await.unwrap().unwrap().0;
2791                    let mut transaction = store
2792                        .new_transaction(
2793                            lock_keys!(
2794                                LockKey::object(store_id, dir.object_id()),
2795                                LockKey::object(store_id, old_file),
2796                            ),
2797                            Options::default(),
2798                        )
2799                        .await
2800                        .unwrap();
2801                    replace_child(&mut transaction, Some((&dir, "foo")), (&dir, &i.to_string()))
2802                        .await
2803                        .expect("Replace old profile.");
2804                    transaction.commit().await.unwrap();
2805                    assert!(
2806                        dir.lookup("foo").await.unwrap().is_none(),
2807                        "Old profile should be moved"
2808                    );
2809                }
2810
2811                // Await all data being played back by checking that things have paged in.
2812                async {
2813                    for (_, id) in &hashes {
2814                        let file = volume
2815                            .get_or_load_node(*id, ObjectDescriptor::File, None)
2816                            .await
2817                            .expect("Opening file internally")
2818                            .into_any()
2819                            .downcast::<FxFile>()
2820                            .expect("Should be file");
2821                        while file.vmo().info().unwrap().populated_bytes == 0 {
2822                            fasync::Timer::new(Duration::from_millis(25)).await;
2823                        }
2824                    }
2825                }
2826                .on_timeout(std::time::Duration::from_secs(120), || {
2827                    panic!("Replay did not page in for all VMOs")
2828                })
2829                .await;
2830                // The crypt file access should not have been recorded or replayed.
2831                {
2832                    let crypt_file = volume
2833                        .get_or_load_node(crypt_file_id, ObjectDescriptor::File, None)
2834                        .await
2835                        .expect("Opening file internally")
2836                        .into_any()
2837                        .downcast::<FxFile>()
2838                        .expect("Should be file");
2839                    assert_eq!(crypt_file.vmo().info().unwrap().populated_bytes, 0);
2840                }
2841
2842                // Open all the files to show that they have been used.
2843                for (i, _) in &hashes {
2844                    let _file_proxy = open_file_checked(
2845                        fixture.root(),
2846                        &i.to_string(),
2847                        fio::PERM_READABLE,
2848                        &Default::default(),
2849                    )
2850                    .await;
2851                }
2852
2853                // Complete the recording.
2854                fixture.volume().volume().stop_profile_tasks().await;
2855            }
2856            device = fixture.close().await;
2857        }
2858    }
2859
2860    #[fuchsia::test(threads = 10)]
2861    async fn test_profile_update() {
2862        let mut hashes = Vec::new();
2863        let device = {
2864            let fixture = blob_testing::new_blob_fixture().await;
2865            for i in 0..2u64 {
2866                let hash =
2867                    fixture.write_blob(i.to_string().as_bytes(), CompressionMode::Never).await;
2868                hashes.push(hash);
2869            }
2870            fixture.close().await
2871        };
2872        device.ensure_unique();
2873
2874        device.reopen(false);
2875        let device = {
2876            let fixture = blob_testing::open_blob_fixture(device).await;
2877
2878            {
2879                let volume = fixture.volume().volume();
2880                volume
2881                    .record_and_replay_profile(new_profile_state(true), "foo")
2882                    .await
2883                    .expect("Recording");
2884
2885                let original_recorded = RECORDED.load(Ordering::Relaxed);
2886
2887                // Page in the zero offsets only to avoid readahead strangeness.
2888                {
2889                    let mut writable = [0u8];
2890                    let hash = &hashes[0];
2891                    let vmo = fixture.get_blob_vmo(*hash).await;
2892                    vmo.read(&mut writable, 0).expect("Vmo read");
2893                }
2894
2895                // The recording happens asynchronously, so we must wait.  This is crude, but it's
2896                // only for testing and it's simple.
2897                while RECORDED.load(Ordering::Relaxed) == original_recorded {
2898                    fasync::Timer::new(std::time::Duration::from_millis(10)).await;
2899                }
2900
2901                volume.stop_profile_tasks().await;
2902            }
2903            fixture.close().await
2904        };
2905
2906        device.ensure_unique();
2907        device.reopen(false);
2908        let fixture = blob_testing::open_blob_fixture(device).await;
2909        {
2910            // Need to get the root vmo to check committed bytes.
2911            // Ensure that nothing is paged in right now.
2912            for hash in &hashes {
2913                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2914                assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2915            }
2916
2917            let volume = fixture.volume().volume();
2918
2919            volume
2920                .record_and_replay_profile(new_profile_state(true), "foo")
2921                .await
2922                .expect("Replaying");
2923
2924            // Await all data being played back by checking that things have paged in.
2925            async {
2926                let hash = &hashes[0];
2927                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2928                while blob.vmo().info().unwrap().populated_bytes == 0 {
2929                    fasync::Timer::new(Duration::from_millis(25)).await;
2930                }
2931            }
2932            .on_timeout(std::time::Duration::from_secs(120), || {
2933                panic!("Replay did not page in for all VMOs")
2934            })
2935            .await;
2936
2937            let original_recorded = RECORDED.load(Ordering::Relaxed);
2938
2939            // Record the new profile that will overwrite it.
2940            {
2941                let mut writable = [0u8];
2942                let hash = &hashes[1];
2943                let vmo = fixture.get_blob_vmo(*hash).await;
2944                vmo.read(&mut writable, 0).expect("Vmo read");
2945            }
2946
2947            // The recording happens asynchronously, so we must wait.  This is crude, but it's only
2948            // for testing and it's simple.
2949            while RECORDED.load(Ordering::Relaxed) == original_recorded {
2950                fasync::Timer::new(std::time::Duration::from_millis(10)).await;
2951            }
2952
2953            // Complete the recording.
2954            volume.stop_profile_tasks().await;
2955        }
2956        let device = fixture.close().await;
2957
2958        device.ensure_unique();
2959        device.reopen(false);
2960        let fixture = blob_testing::open_blob_fixture(device).await;
2961        {
2962            // Need to get the root vmo to check committed bytes.
2963            // Ensure that nothing is paged in right now.
2964            for hash in &hashes {
2965                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2966                assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2967            }
2968
2969            fixture
2970                .volume()
2971                .volume()
2972                .record_and_replay_profile(new_profile_state(true), "foo")
2973                .await
2974                .expect("Replaying");
2975
2976            // Await all data being played back by checking that things have paged in.
2977            async {
2978                let hash = &hashes[1];
2979                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2980                while blob.vmo().info().unwrap().populated_bytes == 0 {
2981                    fasync::Timer::new(Duration::from_millis(25)).await;
2982                }
2983            }
2984            .on_timeout(std::time::Duration::from_secs(30), || {
2985                panic!("Replay did not page in for all VMOs")
2986            })
2987            .await;
2988
2989            // Complete the recording.
2990            fixture.volume().volume().stop_profile_tasks().await;
2991
2992            // Verify that first blob was not paged in as the it should be dropped from the profile.
2993            {
2994                let hash = &hashes[0];
2995                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2996                assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2997            }
2998        }
2999        fixture.close().await;
3000    }
3001
3002    #[fuchsia::test(threads = 10)]
3003    async fn test_unencrypted_volume() {
3004        let fixture = TestFixture::new_unencrypted().await;
3005        let root = fixture.root();
3006
3007        let f = open_file_checked(
3008            &root,
3009            "foo",
3010            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
3011            &Default::default(),
3012        )
3013        .await;
3014        close_file_checked(f).await;
3015
3016        fixture.close().await;
3017    }
3018
3019    #[fuchsia::test]
3020    async fn test_read_only_unencrypted_volume() {
3021        // Make a new Fxfs filesystem with an unencrypted volume named "vol".
3022        let fs = {
3023            let device = fxfs::filesystem::mkfs_with_volume(
3024                DeviceHolder::new(FakeDevice::new(8192, 512)),
3025                "vol",
3026                None,
3027            )
3028            .await
3029            .unwrap();
3030            // Re-open the device as read-only and mount the filesystem as read-only.
3031            device.reopen(true);
3032            FxFilesystemBuilder::new().read_only(true).open(device).await.unwrap()
3033        };
3034        // Ensure we can access the volume and gracefully terminate any tasks.
3035        {
3036            let root_volume = root_volume(fs.clone()).await.unwrap();
3037            let store = root_volume.volume("vol", StoreOptions::default()).await.unwrap();
3038            let unique_id = store.store_object_id();
3039            let blob_resupplied_count =
3040                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
3041            let volume = FxVolume::new(
3042                Weak::new(),
3043                store,
3044                unique_id,
3045                "vol".to_owned(),
3046                blob_resupplied_count,
3047                MemoryPressureConfig::default(),
3048            )
3049            .unwrap();
3050            volume.terminate().await;
3051        }
3052        // Close the filesystem, and make sure we don't have any dangling references.
3053        fs.close().await.unwrap();
3054        let device = fs.take_device().await;
3055        device.ensure_unique();
3056    }
3057
3058    #[fuchsia::test]
3059    async fn test_read_only_encrypted_volume() {
3060        let crypt: Arc<CryptBase> = Arc::new(new_insecure_crypt());
3061        // Make a new Fxfs filesystem with an encrypted volume named "vol".
3062        let fs = {
3063            let device = fxfs::filesystem::mkfs_with_volume(
3064                DeviceHolder::new(FakeDevice::new(8192, 512)),
3065                "vol",
3066                Some(crypt.clone()),
3067            )
3068            .await
3069            .unwrap();
3070            // Re-open the device as read-only and mount the filesystem as read-only.
3071            device.reopen(true);
3072            FxFilesystemBuilder::new().read_only(true).open(device).await.unwrap()
3073        };
3074        // Ensure we can access the volume and gracefully terminate any tasks.
3075        {
3076            let root_volume = root_volume(fs.clone()).await.unwrap();
3077            let store = root_volume
3078                .volume("vol", StoreOptions { crypt: Some(crypt), ..StoreOptions::default() })
3079                .await
3080                .unwrap();
3081            let unique_id = store.store_object_id();
3082            let blob_resupplied_count =
3083                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
3084            let volume = FxVolume::new(
3085                Weak::new(),
3086                store,
3087                unique_id,
3088                "vol".to_owned(),
3089                blob_resupplied_count,
3090                MemoryPressureConfig::default(),
3091            )
3092            .unwrap();
3093            volume.terminate().await;
3094        }
3095        // Close the filesystem, and make sure we don't have any dangling references.
3096        fs.close().await.unwrap();
3097        let device = fs.take_device().await;
3098        device.ensure_unique();
3099    }
3100}