Skip to main content

fxfs_platform/fuchsia/
volume.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::component::map_to_raw_status;
6use crate::fuchsia::directory::FxDirectory;
7use crate::fuchsia::dirent_cache::DirentCache;
8use crate::fuchsia::file::{FlushType, FxFile};
9use crate::fuchsia::memory_pressure::{MemoryPressureLevel, MemoryPressureMonitor};
10use crate::fuchsia::node::{FxNode, GetResult, NodeCache};
11use crate::fuchsia::pager::Pager;
12use crate::fuchsia::profile::ProfileState;
13use crate::fuchsia::symlink::FxSymlink;
14use crate::fuchsia::volumes_directory::VolumesDirectory;
15use anyhow::{Error, bail, ensure};
16use async_trait::async_trait;
17use fidl::endpoints::ServerEnd;
18use fidl_fuchsia_fxfs::{
19    BytesAndNodes, FileBackedVolumeProviderRequest, FileBackedVolumeProviderRequestStream,
20    ProjectIdRequest, ProjectIdRequestStream, ProjectIterToken,
21};
22use fidl_fuchsia_io as fio;
23use fs_inspect::{FsInspectVolume, VolumeData};
24use fuchsia_async as fasync;
25use fuchsia_async::epoch::Epoch;
26use fuchsia_sync::Mutex;
27use futures::channel::oneshot;
28use futures::stream::{self, FusedStream, Stream};
29use futures::{FutureExt, StreamExt, TryStreamExt};
30use fxfs::errors::FxfsError;
31use fxfs::filesystem::{self, SyncOptions};
32use fxfs::future_with_guard::FutureWithGuard;
33use fxfs::log::*;
34use fxfs::object_store::directory::Directory;
35use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
36use fxfs::object_store::{DirType, HandleOptions, HandleOwner, ObjectDescriptor, ObjectStore};
37use refaults_vmo::PageRefaultCounter;
38use std::future::Future;
39use std::pin::pin;
40#[cfg(any(test, feature = "testing"))]
41use std::sync::atomic::AtomicBool;
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::{Arc, Weak};
44use std::time::Duration;
45use vfs::directory::entry::DirectoryEntry;
46use vfs::directory::simple::Simple;
47use vfs::execution_scope::ExecutionScope;
48
49// LINT.IfChange
50// TODO:(b/299919008) Fix this number to something reasonable, or maybe just for fxblob.
51const DIRENT_CACHE_LIMIT: usize = 2000;
52// LINT.ThenChange(//src/storage/stressor/src/aggressive.rs)
53
54/// The read ahead/around size to target. Increase reads to be this size within the restrictions of
55/// the format for the target object.
56pub const READ_AHEAD_SIZE: u64 = 128 * 1024;
57
58const PROFILE_DIRECTORY: &str = "profiles";
59
60#[derive(Clone, Copy, Debug)]
61pub struct MemoryPressureLevelConfig {
62    /// The period to wait between flushes, as well as perform other background maintenance tasks
63    /// (e.g. purging caches).
64    pub background_task_period: Duration,
65
66    /// The limit of cached nodes.
67    pub cache_size_limit: usize,
68
69    /// The initial delay before the background task runs. The background task has a longer initial
70    /// delay to avoid running the task during boot.
71    pub background_task_initial_delay: Duration,
72}
73
74impl Default for MemoryPressureLevelConfig {
75    fn default() -> Self {
76        Self {
77            background_task_period: Duration::from_secs(20),
78            cache_size_limit: DIRENT_CACHE_LIMIT,
79            background_task_initial_delay: Duration::from_secs(70),
80        }
81    }
82}
83
84#[derive(Clone, Copy, Debug)]
85pub struct MemoryPressureConfig {
86    /// The configuration to use at [`MemoryPressureLevel::Normal`].
87    pub mem_normal: MemoryPressureLevelConfig,
88
89    /// The configuration to use at [`MemoryPressureLevel::Warning`].
90    pub mem_warning: MemoryPressureLevelConfig,
91
92    /// The configuration to use at [`MemoryPressureLevel::Critical`].
93    pub mem_critical: MemoryPressureLevelConfig,
94}
95
96impl MemoryPressureConfig {
97    pub fn for_level(&self, level: &MemoryPressureLevel) -> &MemoryPressureLevelConfig {
98        match level {
99            MemoryPressureLevel::Normal => &self.mem_normal,
100            MemoryPressureLevel::Warning => &self.mem_warning,
101            MemoryPressureLevel::Critical => &self.mem_critical,
102        }
103    }
104}
105
106impl Default for MemoryPressureConfig {
107    fn default() -> Self {
108        // TODO(https://fxbug.dev/42061389): investigate a smarter strategy for determining flush
109        // frequency.
110        Self {
111            mem_normal: MemoryPressureLevelConfig {
112                background_task_period: Duration::from_secs(20),
113                cache_size_limit: DIRENT_CACHE_LIMIT,
114                background_task_initial_delay: Duration::from_secs(70),
115            },
116            mem_warning: MemoryPressureLevelConfig {
117                background_task_period: Duration::from_secs(5),
118                cache_size_limit: 100,
119                background_task_initial_delay: Duration::from_secs(5),
120            },
121            mem_critical: MemoryPressureLevelConfig {
122                background_task_period: Duration::from_millis(1500),
123                cache_size_limit: 20,
124                background_task_initial_delay: Duration::from_millis(1500),
125            },
126        }
127    }
128}
129
130/// FxVolume represents an opened volume. It is also a (weak) cache for all opened Nodes within the
131/// volume.
132pub struct FxVolume {
133    parent: Weak<VolumesDirectory>,
134    cache: NodeCache,
135    store: Arc<ObjectStore>,
136    pager: Pager,
137    executor: fasync::EHandle,
138    name: String,
139
140    // A tuple of the actual task and a channel to signal to terminate the task.
141    background_task: Mutex<Option<(fasync::Task<()>, oneshot::Sender<()>)>>,
142
143    // Unique identifier of the filesystem that owns this volume.
144    fs_id: u64,
145
146    // The execution scope for this volume.
147    scope: ExecutionScope,
148
149    dirent_cache: DirentCache,
150
151    profile_state: Mutex<Option<Box<dyn ProfileState>>>,
152
153    #[cfg(any(test, feature = "testing"))]
154    poisoned: AtomicBool,
155
156    blob_resupplied_count: Arc<PageRefaultCounter>,
157
158    /// The number of dirty bytes in pager backed VMOs that belong to this volume. VolumesDirectory
159    /// holds a count for all volumes. This count is only used for tracing.
160    pager_dirty_byte_count: AtomicU64,
161}
162
163#[fxfs_trace::trace]
164impl FxVolume {
165    pub fn new(
166        parent: Weak<VolumesDirectory>,
167        store: Arc<ObjectStore>,
168        fs_id: u64,
169        name: String,
170        blob_resupplied_count: Arc<PageRefaultCounter>,
171        memory_pressure_config: MemoryPressureConfig,
172    ) -> Result<Self, Error> {
173        let scope = ExecutionScope::new();
174        Ok(Self {
175            parent,
176            cache: NodeCache::new(),
177            store,
178            name,
179            pager: Pager::new(scope.clone())?,
180            executor: fasync::EHandle::local(),
181            background_task: Mutex::new(None),
182            fs_id,
183            scope,
184            dirent_cache: DirentCache::new(memory_pressure_config.mem_normal.cache_size_limit),
185            profile_state: Mutex::new(None),
186            #[cfg(any(test, feature = "testing"))]
187            poisoned: AtomicBool::new(false),
188            blob_resupplied_count,
189            pager_dirty_byte_count: AtomicU64::new(0),
190        })
191    }
192
193    pub fn store(&self) -> &Arc<ObjectStore> {
194        &self.store
195    }
196
197    pub fn cache(&self) -> &NodeCache {
198        &self.cache
199    }
200
201    pub fn dirent_cache(&self) -> &DirentCache {
202        &self.dirent_cache
203    }
204
205    pub fn pager(&self) -> &Pager {
206        &self.pager
207    }
208
209    pub fn id(&self) -> u64 {
210        self.fs_id
211    }
212
213    pub fn scope(&self) -> &ExecutionScope {
214        &self.scope
215    }
216
217    pub fn blob_resupplied_count(&self) -> &PageRefaultCounter {
218        &self.blob_resupplied_count
219    }
220
221    pub fn name(&self) -> &str {
222        &self.name
223    }
224
225    /// Reports the filesystem info, but if the volume has a space limit applied then the space
226    /// available and space used are reported based on the volume instead.
227    pub fn filesystem_info_for_volume(&self) -> fio::FilesystemInfo {
228        let allocator = self.store.filesystem().allocator();
229        let info =
230            if let Some(limit) = allocator.get_owner_bytes_limit(self.store.store_object_id()) {
231                filesystem::Info {
232                    used_bytes: allocator.get_owner_bytes_used(self.store.store_object_id()),
233                    total_bytes: limit,
234                }
235            } else {
236                self.store.filesystem().get_info()
237            };
238
239        info_to_filesystem_info(info, self.store.block_size(), self.store.object_count(), self.id())
240    }
241
242    /// Stop profiling, recover resources from it and finalize recordings.
243    pub async fn stop_profile_tasks(self: &Arc<Self>) {
244        let Some(mut state) = self.profile_state.lock().take() else { return };
245        state.wait_for_replay_to_finish().await;
246        self.pager.set_recorder(None);
247        let _ = state.wait_for_recording_to_finish().await;
248    }
249
250    /// Opens or creates the profile directory in the volume's internal directory.
251    pub async fn get_profile_directory(self: &Arc<Self>) -> Result<Directory<FxVolume>, Error> {
252        let internal_dir = self
253            .get_or_create_internal_dir()
254            .await
255            .map_err(|e| e.context("Opening internal directory"))?;
256        // Have to do separate calls to create the profile dir if necessary.
257        let mut transaction = self
258            .store()
259            .filesystem()
260            .new_transaction(
261                lock_keys![LockKey::object(
262                    self.store().store_object_id(),
263                    internal_dir.object_id(),
264                )],
265                Options::default(),
266            )
267            .await?;
268        Ok(match internal_dir.directory().lookup(PROFILE_DIRECTORY).await? {
269            Some((object_id, _, _)) => {
270                Directory::open_unchecked(self.clone(), object_id, DirType::Normal)
271            }
272            None => {
273                let new_dir = internal_dir
274                    .directory()
275                    .create_child_dir(&mut transaction, PROFILE_DIRECTORY)
276                    .await?;
277                transaction.commit().await?;
278                new_dir
279            }
280        })
281    }
282
283    /// Starts recording a profile for the volume under the name given, and if a profile exists
284    /// under that same name it is replayed and will be replaced after by the new recording if it
285    /// is cleanly shutdown and finalized.
286    pub async fn record_or_replay_profile(
287        self: &Arc<Self>,
288        mut state: Box<dyn ProfileState>,
289        name: &str,
290    ) -> Result<(), Error> {
291        // We don't meddle in FxDirectory or FxFile here because we don't want a paged object.
292        // Normally we ensure that there's only one copy by using the Node cache on the volume, but
293        // that would create FxFile, so in this case we just assume that only one profile operation
294        // should be ongoing at a time, as that is ensured in `VolumesDirectory`.
295
296        // If there is a recording already, prepare to replay it.
297        let profile_dir = self.get_profile_directory().await?;
298        let replay_handle = if let Some((id, descriptor, _)) = profile_dir.lookup(name).await? {
299            ensure!(matches!(descriptor, ObjectDescriptor::File), FxfsError::Inconsistent);
300            Some(Box::new(
301                ObjectStore::open_object(self, id, HandleOptions::default(), None).await?,
302            ))
303        } else {
304            None
305        };
306
307        let mut profile_state = self.profile_state.lock();
308
309        info!("Recording new profile '{name}' for volume object {}", self.store.store_object_id());
310        // Begin recording first to ensure that we capture any activity from the replay.
311        self.pager.set_recorder(Some(state.record_new(self, name)));
312        if let Some(handle) = replay_handle {
313            if let Some(guard) = self.scope().try_active_guard() {
314                state.replay_profile(handle, self.clone(), guard);
315                info!(
316                    "Replaying existing profile '{name}' for volume object {}",
317                    self.store.store_object_id()
318                );
319            }
320        }
321        *profile_state = Some(state);
322        Ok(())
323    }
324
325    async fn get_or_create_internal_dir(self: &Arc<Self>) -> Result<Arc<FxDirectory>, Error> {
326        let internal_data_id = self.store().get_or_create_internal_directory_id().await?;
327        let internal_dir = self
328            .get_or_load_node(internal_data_id, ObjectDescriptor::Directory, None)
329            .await?
330            .into_any()
331            .downcast::<FxDirectory>()
332            .unwrap();
333        Ok(internal_dir)
334    }
335
336    pub async fn terminate(&self) {
337        let task = std::mem::replace(&mut *self.background_task.lock(), None);
338        if let Some((task, terminate)) = task {
339            let _ = terminate.send(());
340            task.await;
341        }
342
343        // `NodeCache::terminate` will break any strong reference cycles contained within nodes
344        // (pager registration). The only remaining nodes should be those with open FIDL
345        // connections or vmo references in the process of handling the VMO_ZERO_CHILDREN signal.
346        // `ExecutionScope::shutdown` + `ExecutionScope::wait` will close the open FIDL connections
347        // and synchonrize the signal handling which should result in all nodes flushing and then
348        // dropping. Any async tasks required to flush a node should take an active guard on the
349        // `ExecutionScope` which will prevent `ExecutionScope::wait` from completing until all
350        // nodes are flushed.
351        self.scope.shutdown();
352        self.cache.terminate();
353        self.scope.wait().await;
354
355        // Make sure there are no deferred operations still pending for this volume.
356        Epoch::global().barrier().await;
357
358        // The dirent_cache must be cleared *after* shutting down the scope because there can be
359        // tasks that insert entries into the cache.
360        self.dirent_cache.clear();
361
362        if self.store.filesystem().options().read_only {
363            // If the filesystem is read only, we don't need to flush/sync anything.
364            if self.store.is_unlocked() {
365                self.store.lock_read_only();
366            }
367            return;
368        }
369
370        self.flush_all_files(FlushType::LastChance).await;
371        self.store.filesystem().graveyard().flush().await;
372        if self.store.crypt().is_some() {
373            if let Err(e) = self.store.lock().await {
374                // The store will be left in a safe state and there won't be data-loss unless
375                // there's an issue flushing the journal later.
376                warn!(error:? = e; "Locking store error");
377            }
378        }
379        let sync_status = self
380            .store
381            .filesystem()
382            .sync(SyncOptions { flush_device: true, ..Default::default() })
383            .await;
384        if let Err(e) = sync_status {
385            error!(error:? = e; "Failed to sync filesystem; data may be lost");
386        }
387    }
388
389    /// Attempts to get a node from the node cache. If the node wasn't present in the cache, loads
390    /// the object from the object store, installing the returned node into the cache and returns
391    /// the newly created FxNode backed by the loaded object.  |parent| is only set on the node if
392    /// the node was not present in the cache.  Otherwise, it is ignored.
393    pub async fn get_or_load_node(
394        self: &Arc<Self>,
395        object_id: u64,
396        object_descriptor: ObjectDescriptor,
397        parent: Option<Arc<FxDirectory>>,
398    ) -> Result<Arc<dyn FxNode>, Error> {
399        match self.cache.get_or_reserve(object_id).await {
400            GetResult::Node(node) => Ok(node),
401            GetResult::Placeholder(placeholder) => {
402                let node = match object_descriptor {
403                    ObjectDescriptor::File => FxFile::new(
404                        ObjectStore::open_object(self, object_id, HandleOptions::default(), None)
405                            .await?,
406                    ) as Arc<dyn FxNode>,
407                    ObjectDescriptor::Directory => {
408                        // Can't use open_unchecked because we don't know if the dir is casefolded
409                        // or encrypted.
410                        Arc::new(FxDirectory::new(parent, Directory::open(self, object_id).await?))
411                            as Arc<dyn FxNode>
412                    }
413                    ObjectDescriptor::Symlink => Arc::new(FxSymlink::new(self.clone(), object_id)),
414                    _ => bail!(FxfsError::Inconsistent),
415                };
416                placeholder.commit(&node);
417                Ok(node)
418            }
419        }
420    }
421
422    /// Marks the given directory deleted.
423    pub fn mark_directory_deleted(&self, object_id: u64) {
424        if let Some(node) = self.cache.get(object_id) {
425            // It's possible that node is a placeholder, in which case we don't need to wait for it
426            // to be resolved because it should be blocked behind the locks that are held by the
427            // caller, and once they're dropped, it'll be found to be deleted via the tree.
428            if let Ok(dir) = node.into_any().downcast::<FxDirectory>() {
429                dir.set_deleted();
430            }
431        }
432    }
433
434    /// Removes resources associated with |object_id| (which ought to be a file), if there are no
435    /// open connections to that file.
436    ///
437    /// This must be called *after committing* a transaction which deletes the last reference to
438    /// |object_id|, since before that point, new connections could be established.
439    pub(super) async fn maybe_purge_file(&self, object_id: u64) -> Result<(), Error> {
440        if let Some(node) = self.cache.get(object_id) {
441            node.clone().mark_to_be_purged();
442            return Ok(());
443        }
444        // If this fails, the graveyard should clean it up on next mount.
445        self.store
446            .tombstone_object(
447                object_id,
448                Options { borrow_metadata_space: true, ..Default::default() },
449            )
450            .await?;
451        Ok(())
452    }
453
454    /// Starts the background work task.  This task will periodically:
455    ///   - scan all files and flush them to disk, and
456    ///   - purge unused cached data.
457    /// The task will hold a strong reference to the FxVolume while it is running, so the task must
458    /// be closed later with Self::terminate, or the FxVolume will never be dropped.
459    pub fn start_background_task(
460        self: &Arc<Self>,
461        config: MemoryPressureConfig,
462        mem_monitor: Option<&MemoryPressureMonitor>,
463    ) {
464        let mut background_task = self.background_task.lock();
465        if background_task.is_none() {
466            let (tx, rx) = oneshot::channel();
467
468            let task = if let Some(mem_monitor) = mem_monitor {
469                fasync::Task::spawn(self.clone().background_task(
470                    config,
471                    mem_monitor.get_level_stream(),
472                    rx,
473                ))
474            } else {
475                // With no memory pressure monitoring, just stub the stream out as always pending.
476                fasync::Task::spawn(self.clone().background_task(config, stream::pending(), rx))
477            };
478
479            *background_task = Some((task, tx));
480        }
481    }
482
483    #[trace]
484    async fn background_task(
485        self: Arc<Self>,
486        config: MemoryPressureConfig,
487        mut level_stream: impl Stream<Item = MemoryPressureLevel> + FusedStream + Unpin,
488        terminate: oneshot::Receiver<()>,
489    ) {
490        debug!(store_id = self.store.store_object_id(); "FxVolume::background_task start");
491        let mut terminate = terminate.fuse();
492        // Default to the normal period until updates come from the `level_stream`.
493        let mut level = MemoryPressureLevel::Normal;
494        let mut timer =
495            pin!(fasync::Timer::new(config.for_level(&level).background_task_initial_delay));
496
497        loop {
498            let mut should_terminate = false;
499            let mut should_flush = false;
500            let mut low_mem = false;
501            let mut should_purge_layer_files = false;
502            let mut should_update_cache_limit = false;
503
504            futures::select_biased! {
505                _ = terminate => should_terminate = true,
506                new_level = level_stream.next() => {
507                    // Because `level_stream` will never terminate, this is safe to unwrap.
508                    let new_level = new_level.unwrap();
509                    // At critical levels, it's okay to undertake expensive work immediately
510                    // to reclaim memory.
511                    low_mem = matches!(new_level, MemoryPressureLevel::Critical);
512                    should_purge_layer_files = true;
513                    if new_level != level {
514                        level = new_level;
515                        should_update_cache_limit = true;
516                        let level_config = config.for_level(&level);
517                        timer.as_mut().reset(fasync::MonotonicInstant::after(
518                            level_config.background_task_period.into())
519                        );
520                        debug!(
521                            "Background task period changed to {:?} due to new memory pressure \
522                            level ({:?}).",
523                            config.for_level(&level).background_task_period, level
524                        );
525                    }
526                }
527                _ = timer => {
528                    timer.as_mut().reset(fasync::MonotonicInstant::after(
529                        config.for_level(&level).background_task_period.into())
530                    );
531                    should_flush = true;
532                    // Only purge layer file caches once we have elevated memory pressure.
533                    should_purge_layer_files = !matches!(level, MemoryPressureLevel::Normal);
534                }
535            };
536            if should_terminate {
537                break;
538            }
539            // Maybe close extra files *before* iterating them for flush/low mem.
540            if should_update_cache_limit {
541                self.dirent_cache.set_limit(config.for_level(&level).cache_size_limit);
542            }
543            if should_flush {
544                self.flush_all_files(FlushType::Sync).await;
545                self.dirent_cache.recycle_stale_files();
546            } else if low_mem {
547                // This is a softer version of flushing files, so don't bother if we're flushing.
548                self.minimize_memory().await;
549            }
550            if should_purge_layer_files {
551                for layer in self.store.tree().immutable_layer_set().layers {
552                    layer.purge_cached_data();
553                }
554            }
555        }
556        debug!(store_id = self.store.store_object_id(); "FxVolume::background_task end");
557    }
558
559    /// Reports that a certain number of bytes will be dirtied in a pager-backed VMO.
560    ///
561    /// Note that this function may await flush tasks.
562    pub fn report_pager_dirty(
563        self: Arc<Self>,
564        byte_count: u64,
565        mark_dirty: impl FnOnce() + Send + 'static,
566    ) {
567        let this = self.clone();
568        let callback = move || {
569            mark_dirty();
570            let prev = this.pager_dirty_byte_count.fetch_add(byte_count, Ordering::Relaxed);
571            fxfs_trace::counter!("dirty-bytes", 0, this.name => prev.saturating_add(byte_count));
572        };
573        if let Some(parent) = self.parent.upgrade() {
574            parent.report_pager_dirty(byte_count, self, callback);
575        } else {
576            callback();
577        }
578    }
579
580    /// Reports that a certain number of bytes were cleaned in a pager-backed VMO.
581    pub fn report_pager_clean(&self, byte_count: u64) {
582        if let Some(parent) = self.parent.upgrade() {
583            parent.report_pager_clean(byte_count);
584        }
585        let prev = self.pager_dirty_byte_count.fetch_sub(byte_count, Ordering::Relaxed);
586        fxfs_trace::counter!("dirty-bytes", 0, self.name => prev.saturating_sub(byte_count));
587    }
588
589    #[trace]
590    pub async fn flush_all_files(&self, flush_type: FlushType) {
591        let mut flushed = 0;
592        for file in self.cache.files() {
593            if let Some(node) = file.into_opened_node() {
594                if let Err(e) = FxFile::flush(&node, flush_type).await {
595                    warn!(
596                        store_id = self.store.store_object_id(),
597                        oid = node.object_id(),
598                        error:? = e;
599                        "Failed to flush",
600                    )
601                }
602                if flush_type == FlushType::LastChance {
603                    let file = node.clone();
604                    std::mem::drop(node);
605                    file.force_clean();
606                }
607            }
608            flushed += 1;
609        }
610        debug!(store_id = self.store.store_object_id(), file_count = flushed; "FxVolume flushed");
611    }
612
613    /// Flushes only files with dirty pages.
614    ///
615    /// `PagedObjectHandle` tracks the number dirty pages locally (except for overwrite files) which
616    /// makes determining whether flushing a file will reduce the number of dirty pages efficient.
617    /// `flush_all_files` checks if any metadata needs to be flushed which involves a syscall making
618    /// it significantly slower when there are lots of open files without dirty pages.
619    #[trace]
620    pub async fn minimize_memory(&self) {
621        for file in self.cache.files() {
622            if let Some(node) = file.into_opened_node() {
623                if let Err(e) = node.handle().minimize_memory().await {
624                    warn!(
625                        store_id = self.store.store_object_id(),
626                        oid = node.object_id(),
627                        error:? = e;
628                        "Failed to flush",
629                    )
630                }
631            }
632        }
633    }
634
635    /// Spawns a short term task for the volume that includes a guard that will prevent termination.
636    pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) {
637        if let Some(guard) = self.scope.try_active_guard() {
638            self.executor.spawn_detached(FutureWithGuard::new(guard, task));
639        }
640    }
641
642    /// Tries to unwrap this volume.  If it fails, it will poison the volume so that when it is
643    /// dropped, you get a backtrace.
644    #[cfg(any(test, feature = "testing"))]
645    pub fn try_unwrap(self: Arc<Self>) -> Option<FxVolume> {
646        self.poisoned.store(true, Ordering::Relaxed);
647        match Arc::try_unwrap(self) {
648            Ok(volume) => {
649                volume.poisoned.store(false, Ordering::Relaxed);
650                Some(volume)
651            }
652            Err(this) => {
653                // Log details about all the places where there might be a reference cycle.
654                info!(
655                    "background_task: {}, profile_state: {}, dirent_cache count: {}, \
656                     pager strong file refs={}, no tasks={}",
657                    this.background_task.lock().is_some(),
658                    this.profile_state.lock().is_some(),
659                    this.dirent_cache.len(),
660                    crate::pager::STRONG_FILE_REFS.load(Ordering::Relaxed),
661                    {
662                        let mut no_tasks = pin!(this.scope.wait());
663                        no_tasks
664                            .poll_unpin(&mut std::task::Context::from_waker(
665                                std::task::Waker::noop(),
666                            ))
667                            .is_ready()
668                    },
669                );
670                None
671            }
672        }
673    }
674
675    pub async fn handle_file_backed_volume_provider_requests(
676        this: Weak<Self>,
677        scope: ExecutionScope,
678        mut requests: FileBackedVolumeProviderRequestStream,
679    ) -> Result<(), Error> {
680        while let Some(request) = requests.try_next().await? {
681            match request {
682                FileBackedVolumeProviderRequest::Open {
683                    parent_directory_token,
684                    name,
685                    server_end,
686                    control_handle: _,
687                } => {
688                    // Try and get an active guard before upgrading.
689                    let Some(_guard) = scope.try_active_guard() else {
690                        bail!("Volume shutting down")
691                    };
692                    let Some(this) = this.upgrade() else { bail!("FxVolume dropped") };
693                    match this
694                        .scope
695                        .token_registry()
696                        // NB: For now, we only expect these calls in a regular (non-blob) volume.
697                        // Hard-code the type for simplicity; attempts to call on a blob volume will
698                        // get an error.
699                        .get_owner_and_rights(parent_directory_token)
700                        .and_then(|dir| {
701                            dir.ok_or(zx::Status::BAD_HANDLE).and_then(|(dir, rights)| {
702                                if !rights.contains(fio::Rights::MODIFY_DIRECTORY) {
703                                    return Err(zx::Status::BAD_HANDLE);
704                                }
705                                dir.into_any()
706                                    .downcast::<FxDirectory>()
707                                    .map_err(|_| zx::Status::BAD_HANDLE)
708                            })
709                        }) {
710                        Ok(dir) => {
711                            dir.open_block_file(&name, server_end).await;
712                        }
713                        Err(status) => {
714                            let _ = server_end.close_with_epitaph(status).unwrap_or_else(|e| {
715                                error!(error:? = e; "open failed to send epitaph");
716                            });
717                        }
718                    }
719                }
720            }
721        }
722        Ok(())
723    }
724
725    pub async fn handle_project_id_requests(
726        this: Weak<Self>,
727        scope: ExecutionScope,
728        mut requests: ProjectIdRequestStream,
729    ) -> Result<(), Error> {
730        while let Some(request) = requests.try_next().await? {
731            // Try and get an active guard before upgrading.
732            let Some(_guard) = scope.try_active_guard() else { bail!("Volume shutting down") };
733            let Some(this) = this.upgrade() else { bail!("FxVolume dropped") };
734            let store_id = this.store.store_object_id();
735
736            match request {
737                ProjectIdRequest::SetLimit { responder, project_id, bytes, nodes } => responder
738                    .send(
739                    this.store().set_project_limit(project_id, bytes, nodes).await.map_err(
740                        |error| {
741                            error!(error:?, store_id, project_id; "Failed to set project limit");
742                            map_to_raw_status(error)
743                        },
744                    ),
745                )?,
746                ProjectIdRequest::Clear { responder, project_id } => responder.send(
747                    this.store().clear_project_limit(project_id).await.map_err(|error| {
748                        error!(error:?, store_id, project_id; "Failed to clear project limit");
749                        map_to_raw_status(error)
750                    }),
751                )?,
752                ProjectIdRequest::SetForNode { responder, node_id, project_id } => {
753                    responder
754                        .send(this.store().set_project_for_node(node_id, project_id).await.map_err(
755                        |error| {
756                            error!(error:?, store_id, node_id, project_id; "Failed to apply node.");
757                            map_to_raw_status(error)
758                        },
759                    ))?
760                }
761                ProjectIdRequest::GetForNode { responder, node_id } => responder.send(
762                    this.store().get_project_for_node(node_id).await.map_err(|error| {
763                        error!(error:?, store_id, node_id; "Failed to get node.");
764                        map_to_raw_status(error)
765                    }),
766                )?,
767                ProjectIdRequest::ClearForNode { responder, node_id } => responder.send(
768                    this.store().clear_project_for_node(node_id).await.map_err(|error| {
769                        error!(error:?, store_id, node_id; "Failed to clear for node.");
770                        map_to_raw_status(error)
771                    }),
772                )?,
773                ProjectIdRequest::List { responder, token } => {
774                    responder.send(match this.list_projects(&token).await {
775                        Ok((ref entries, ref next_token)) => Ok((entries, next_token.as_ref())),
776                        Err(error) => {
777                            error!(error:?, store_id, token:?; "Failed to list projects.");
778                            Err(map_to_raw_status(error))
779                        }
780                    })?
781                }
782                ProjectIdRequest::Info { responder, project_id } => {
783                    responder.send(match this.project_info(project_id).await {
784                        Ok((ref limit, ref usage)) => Ok((limit, usage)),
785                        Err(error) => {
786                            error!(error:?, store_id, project_id; "Failed to get project info.");
787                            Err(map_to_raw_status(error))
788                        }
789                    })?
790                }
791            }
792        }
793        Ok(())
794    }
795
796    // Maximum entries to fit based on 64KiB message size minus 16 bytes of header, 16 bytes
797    // of vector header, 16 bytes for the optional token header, and 8 bytes of token value.
798    // https://fuchsia.dev/fuchsia-src/development/languages/fidl/guides/max-out-pagination
799    const MAX_PROJECT_ENTRIES: usize = 8184;
800
801    // Calls out to the inner volume to list available projects, removing and re-adding the fidl
802    // wrapper types for the pagination token.
803    async fn list_projects(
804        &self,
805        last_token: &Option<Box<ProjectIterToken>>,
806    ) -> Result<(Vec<u64>, Option<ProjectIterToken>), Error> {
807        let (entries, token) = self
808            .store()
809            .list_projects(
810                match last_token {
811                    None => 0,
812                    Some(v) => v.value,
813                },
814                Self::MAX_PROJECT_ENTRIES,
815            )
816            .await?;
817        Ok((entries, token.map(|value| ProjectIterToken { value })))
818    }
819
820    async fn project_info(&self, project_id: u64) -> Result<(BytesAndNodes, BytesAndNodes), Error> {
821        let (limit, usage) = self.store().project_info(project_id).await?;
822        // At least one of them needs to be around to return anything.
823        ensure!(limit.is_some() || usage.is_some(), FxfsError::NotFound);
824        Ok((
825            limit.map_or_else(
826                || BytesAndNodes { bytes: u64::MAX, nodes: u64::MAX },
827                |v| BytesAndNodes { bytes: v.0, nodes: v.1 },
828            ),
829            usage.map_or_else(
830                || BytesAndNodes { bytes: 0, nodes: 0 },
831                |v| BytesAndNodes { bytes: v.0, nodes: v.1 },
832            ),
833        ))
834    }
835}
836
837#[cfg(any(test, feature = "testing"))]
838impl Drop for FxVolume {
839    fn drop(&mut self) {
840        assert!(!*self.poisoned.get_mut());
841    }
842}
843
844impl HandleOwner for FxVolume {}
845
846impl AsRef<ObjectStore> for FxVolume {
847    fn as_ref(&self) -> &ObjectStore {
848        &self.store
849    }
850}
851
852#[async_trait]
853impl FsInspectVolume for FxVolume {
854    async fn get_volume_data(&self) -> Option<VolumeData> {
855        // Don't try to return data if the volume is shutting down.
856        let _guard = self.scope.try_active_guard()?;
857
858        let object_count = self.store().object_count();
859        let (used_bytes, bytes_limit) =
860            self.store.filesystem().allocator().owner_allocation_info(self.store.store_object_id());
861        let encrypted = self.store().crypt().is_some();
862        let port_koid = fasync::EHandle::local().port().as_handle_ref().koid().unwrap().raw_koid();
863        Some(VolumeData { bytes_limit, used_bytes, used_nodes: object_count, encrypted, port_koid })
864    }
865}
866
867pub trait RootDir: FxNode + DirectoryEntry {
868    fn as_directory_entry(self: Arc<Self>) -> Arc<dyn DirectoryEntry>;
869
870    fn serve(self: Arc<Self>, flags: fio::Flags, server_end: ServerEnd<fio::DirectoryMarker>);
871
872    fn as_node(self: Arc<Self>) -> Arc<dyn FxNode>;
873
874    fn register_additional_volume_services(
875        self: Arc<Self>,
876        _svc_dir: &Simple,
877    ) -> Result<(), Error> {
878        Ok(())
879    }
880}
881
882#[derive(Clone)]
883pub struct FxVolumeAndRoot {
884    volume: Arc<FxVolume>,
885    root: Arc<dyn RootDir>,
886
887    // This is used for service connections and anything that isn't the actual volume.
888    admin_scope: ExecutionScope,
889
890    // The outgoing directory that the volume might be served on.
891    outgoing_dir: Arc<Simple>,
892}
893
894impl FxVolumeAndRoot {
895    pub async fn new<T: From<Directory<FxVolume>> + RootDir>(
896        parent: Weak<VolumesDirectory>,
897        store: Arc<ObjectStore>,
898        unique_id: u64,
899        volume_name: String,
900        blob_resupplied_count: Arc<PageRefaultCounter>,
901        memory_pressure_config: MemoryPressureConfig,
902    ) -> Result<Self, Error> {
903        let volume = Arc::new(FxVolume::new(
904            parent,
905            store,
906            unique_id,
907            volume_name,
908            blob_resupplied_count.clone(),
909            memory_pressure_config,
910        )?);
911        let root_object_id = volume.store().root_directory_object_id();
912        let root_dir = Directory::open(&volume, root_object_id).await?;
913        let root = Arc::<T>::new(root_dir.into()) as Arc<dyn RootDir>;
914        volume
915            .cache
916            .get_or_reserve(root_object_id)
917            .await
918            .placeholder()
919            .unwrap()
920            .commit(&root.clone().as_node());
921        Ok(Self {
922            volume,
923            root,
924            admin_scope: ExecutionScope::new(),
925            outgoing_dir: vfs::directory::immutable::simple(),
926        })
927    }
928
929    pub fn volume(&self) -> &Arc<FxVolume> {
930        &self.volume
931    }
932
933    pub fn root(&self) -> &Arc<dyn RootDir> {
934        &self.root
935    }
936
937    pub fn admin_scope(&self) -> &ExecutionScope {
938        &self.admin_scope
939    }
940
941    pub fn outgoing_dir(&self) -> &Arc<Simple> {
942        &self.outgoing_dir
943    }
944
945    // The same as root but downcasted to FxDirectory.
946    pub fn root_dir(&self) -> Arc<FxDirectory> {
947        self.root().clone().into_any().downcast::<FxDirectory>().expect("Invalid type for root")
948    }
949
950    pub fn into_volume(self) -> Arc<FxVolume> {
951        self.volume
952    }
953}
954
955// The correct number here is arguably u64::MAX - 1 (because node 0 is reserved). There's a bug
956// where inspect test cases fail if we try and use that, possibly because of a signed/unsigned bug.
957// See https://fxbug.dev/42168242.  Until that's fixed, we'll have to use i64::MAX.
958const TOTAL_NODES: u64 = i64::MAX as u64;
959
960// An array used to initialize the FilesystemInfo |name| field. This just spells "fxfs" 0-padded to
961// 32 bytes.
962const FXFS_INFO_NAME_FIDL: [i8; 32] = [
963    0x66, 0x78, 0x66, 0x73, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
964    0, 0, 0, 0,
965];
966
967fn info_to_filesystem_info(
968    info: filesystem::Info,
969    block_size: u64,
970    object_count: u64,
971    fs_id: u64,
972) -> fio::FilesystemInfo {
973    fio::FilesystemInfo {
974        total_bytes: info.total_bytes,
975        used_bytes: info.used_bytes,
976        total_nodes: TOTAL_NODES,
977        used_nodes: object_count,
978        // TODO(https://fxbug.dev/42175592): Support free_shared_pool_bytes.
979        free_shared_pool_bytes: 0,
980        fs_id,
981        block_size: block_size as u32,
982        max_filename_size: fio::MAX_NAME_LENGTH as u32,
983        fs_type: fidl_fuchsia_fs::VfsType::Fxfs.into_primitive(),
984        padding: 0,
985        name: FXFS_INFO_NAME_FIDL,
986    }
987}
988
989#[cfg(test)]
990mod tests {
991    use super::DIRENT_CACHE_LIMIT;
992    use crate::fuchsia::file::FxFile;
993    use crate::fuchsia::fxblob::testing::{self as blob_testing, BlobFixture};
994    use crate::fuchsia::memory_pressure::MemoryPressureLevel;
995    use crate::fuchsia::pager::PagerBacked;
996    use crate::fuchsia::profile::{RECORDED, new_profile_state};
997    use crate::fuchsia::testing::{
998        TestFixture, TestFixtureOptions, close_dir_checked, close_file_checked, open_dir,
999        open_dir_checked, open_file, open_file_checked,
1000    };
1001    use crate::fuchsia::volume::{FxVolume, MemoryPressureConfig, MemoryPressureLevelConfig};
1002    use crate::fuchsia::volumes_directory::VolumesDirectory;
1003    use delivery_blob::CompressionMode;
1004    use fidl_fuchsia_fxfs::{BytesAndNodes, ProjectIdMarker};
1005    use fidl_fuchsia_io as fio;
1006    use fs_inspect::FsInspectVolume;
1007    use fuchsia_async::{self as fasync, TimeoutExt as _};
1008    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
1009    use fuchsia_fs::file;
1010    use fxfs::filesystem::{FxFilesystem, FxFilesystemBuilder};
1011    use fxfs::fsck::{fsck, fsck_volume};
1012    use fxfs::object_store::directory::replace_child;
1013    use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
1014    use fxfs::object_store::volume::root_volume;
1015    use fxfs::object_store::{HandleOptions, ObjectDescriptor, ObjectStore, StoreOptions};
1016    use fxfs_crypt_common::CryptBase;
1017    use fxfs_crypto::{Crypt, WrappingKeyId};
1018    use fxfs_insecure_crypto::new_insecure_crypt;
1019    use refaults_vmo::PageRefaultCounter;
1020    use std::sync::atomic::Ordering;
1021    use std::sync::{Arc, Weak};
1022    use std::time::Duration;
1023    use storage_device::DeviceHolder;
1024    use storage_device::fake_device::FakeDevice;
1025    use zx::Status;
1026
1027    const WRAPPING_KEY_ID: WrappingKeyId = u128::to_le_bytes(123);
1028
1029    #[fuchsia::test(threads = 10)]
1030    async fn test_rename_different_dirs() {
1031        use zx::Event;
1032
1033        let fixture = TestFixture::new().await;
1034        let root = fixture.root();
1035
1036        let src = open_dir_checked(
1037            &root,
1038            "foo",
1039            fio::Flags::FLAG_MAYBE_CREATE
1040                | fio::PERM_READABLE
1041                | fio::PERM_WRITABLE
1042                | fio::Flags::PROTOCOL_DIRECTORY,
1043            Default::default(),
1044        )
1045        .await;
1046
1047        let dst = open_dir_checked(
1048            &root,
1049            "bar",
1050            fio::Flags::FLAG_MAYBE_CREATE
1051                | fio::PERM_READABLE
1052                | fio::PERM_WRITABLE
1053                | fio::Flags::PROTOCOL_DIRECTORY,
1054            Default::default(),
1055        )
1056        .await;
1057
1058        let f = open_file_checked(
1059            &root,
1060            "foo/a",
1061            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1062            &Default::default(),
1063        )
1064        .await;
1065        close_file_checked(f).await;
1066
1067        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1068        Status::ok(status).expect("get_token failed");
1069        src.rename("a", Event::from(dst_token.unwrap()), "b")
1070            .await
1071            .expect("FIDL call failed")
1072            .expect("rename failed");
1073
1074        assert_eq!(
1075            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1076                .await
1077                .expect_err("Open succeeded")
1078                .root_cause()
1079                .downcast_ref::<Status>()
1080                .expect("No status"),
1081            &Status::NOT_FOUND,
1082        );
1083        let f =
1084            open_file_checked(&root, "bar/b", fio::Flags::PROTOCOL_FILE, &Default::default()).await;
1085        close_file_checked(f).await;
1086
1087        close_dir_checked(dst).await;
1088        close_dir_checked(src).await;
1089        fixture.close().await;
1090    }
1091
1092    #[fuchsia::test(threads = 10)]
1093    async fn test_rename_same_dir() {
1094        use zx::Event;
1095        let fixture = TestFixture::new().await;
1096        let root = fixture.root();
1097
1098        let src = open_dir_checked(
1099            &root,
1100            "foo",
1101            fio::Flags::FLAG_MAYBE_CREATE
1102                | fio::PERM_READABLE
1103                | fio::PERM_WRITABLE
1104                | fio::Flags::PROTOCOL_DIRECTORY,
1105            Default::default(),
1106        )
1107        .await;
1108
1109        let f = open_file_checked(
1110            &root,
1111            "foo/a",
1112            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1113            &Default::default(),
1114        )
1115        .await;
1116        close_file_checked(f).await;
1117
1118        let (status, src_token) = src.get_token().await.expect("FIDL call failed");
1119        Status::ok(status).expect("get_token failed");
1120        src.rename("a", Event::from(src_token.unwrap()), "b")
1121            .await
1122            .expect("FIDL call failed")
1123            .expect("rename failed");
1124
1125        assert_eq!(
1126            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1127                .await
1128                .expect_err("Open succeeded")
1129                .root_cause()
1130                .downcast_ref::<Status>()
1131                .expect("No status"),
1132            &Status::NOT_FOUND,
1133        );
1134        let f =
1135            open_file_checked(&root, "foo/b", fio::Flags::PROTOCOL_FILE, &Default::default()).await;
1136        close_file_checked(f).await;
1137
1138        close_dir_checked(src).await;
1139        fixture.close().await;
1140    }
1141
1142    #[fuchsia::test(threads = 10)]
1143    async fn test_rename_overwrites_file() {
1144        use zx::Event;
1145        let fixture = TestFixture::new().await;
1146        let root = fixture.root();
1147
1148        let src = open_dir_checked(
1149            &root,
1150            "foo",
1151            fio::Flags::FLAG_MAYBE_CREATE
1152                | fio::PERM_READABLE
1153                | fio::PERM_WRITABLE
1154                | fio::Flags::PROTOCOL_DIRECTORY,
1155            Default::default(),
1156        )
1157        .await;
1158
1159        let dst = open_dir_checked(
1160            &root,
1161            "bar",
1162            fio::Flags::FLAG_MAYBE_CREATE
1163                | fio::PERM_READABLE
1164                | fio::PERM_WRITABLE
1165                | fio::Flags::PROTOCOL_DIRECTORY,
1166            Default::default(),
1167        )
1168        .await;
1169
1170        // The src file is non-empty.
1171        let src_file = open_file_checked(
1172            &root,
1173            "foo/a",
1174            fio::Flags::FLAG_MAYBE_CREATE
1175                | fio::PERM_READABLE
1176                | fio::PERM_WRITABLE
1177                | fio::Flags::PROTOCOL_FILE,
1178            &Default::default(),
1179        )
1180        .await;
1181        let buf = vec![0xaa as u8; 8192];
1182        file::write(&src_file, buf.as_slice()).await.expect("Failed to write to file");
1183        close_file_checked(src_file).await;
1184
1185        // The dst file is empty (so we can distinguish it).
1186        let f = open_file_checked(
1187            &root,
1188            "bar/b",
1189            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1190            &Default::default(),
1191        )
1192        .await;
1193        close_file_checked(f).await;
1194
1195        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1196        Status::ok(status).expect("get_token failed");
1197        src.rename("a", Event::from(dst_token.unwrap()), "b")
1198            .await
1199            .expect("FIDL call failed")
1200            .expect("rename failed");
1201
1202        assert_eq!(
1203            open_file(&root, "foo/a", fio::Flags::PROTOCOL_FILE, &Default::default())
1204                .await
1205                .expect_err("Open succeeded")
1206                .root_cause()
1207                .downcast_ref::<Status>()
1208                .expect("No status"),
1209            &Status::NOT_FOUND,
1210        );
1211        let file = open_file_checked(
1212            &root,
1213            "bar/b",
1214            fio::PERM_READABLE | fio::Flags::PROTOCOL_FILE,
1215            &Default::default(),
1216        )
1217        .await;
1218        let buf = file::read(&file).await.expect("read file failed");
1219        assert_eq!(buf, vec![0xaa as u8; 8192]);
1220        close_file_checked(file).await;
1221
1222        close_dir_checked(dst).await;
1223        close_dir_checked(src).await;
1224        fixture.close().await;
1225    }
1226
1227    #[fuchsia::test(threads = 10)]
1228    async fn test_rename_overwrites_dir() {
1229        use zx::Event;
1230        let fixture = TestFixture::new().await;
1231        let root = fixture.root();
1232
1233        let src = open_dir_checked(
1234            &root,
1235            "foo",
1236            fio::Flags::FLAG_MAYBE_CREATE
1237                | fio::PERM_READABLE
1238                | fio::PERM_WRITABLE
1239                | fio::Flags::PROTOCOL_DIRECTORY,
1240            Default::default(),
1241        )
1242        .await;
1243
1244        let dst = open_dir_checked(
1245            &root,
1246            "bar",
1247            fio::Flags::FLAG_MAYBE_CREATE
1248                | fio::PERM_READABLE
1249                | fio::PERM_WRITABLE
1250                | fio::Flags::PROTOCOL_DIRECTORY,
1251            Default::default(),
1252        )
1253        .await;
1254
1255        // The src dir is non-empty.
1256        open_dir_checked(
1257            &root,
1258            "foo/a",
1259            fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
1260            Default::default(),
1261        )
1262        .await;
1263        open_file_checked(
1264            &root,
1265            "foo/a/file",
1266            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
1267            &Default::default(),
1268        )
1269        .await;
1270        open_dir_checked(
1271            &root,
1272            "bar/b",
1273            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_DIRECTORY,
1274            Default::default(),
1275        )
1276        .await;
1277
1278        let (status, dst_token) = dst.get_token().await.expect("FIDL call failed");
1279        Status::ok(status).expect("get_token failed");
1280        src.rename("a", Event::from(dst_token.unwrap()), "b")
1281            .await
1282            .expect("FIDL call failed")
1283            .expect("rename failed");
1284
1285        assert_eq!(
1286            open_dir(&root, "foo/a", fio::Flags::PROTOCOL_DIRECTORY, &Default::default())
1287                .await
1288                .expect_err("Open succeeded")
1289                .root_cause()
1290                .downcast_ref::<Status>()
1291                .expect("No status"),
1292            &Status::NOT_FOUND,
1293        );
1294        let f =
1295            open_file_checked(&root, "bar/b/file", fio::Flags::PROTOCOL_FILE, &Default::default())
1296                .await;
1297        close_file_checked(f).await;
1298
1299        close_dir_checked(dst).await;
1300        close_dir_checked(src).await;
1301
1302        fixture.close().await;
1303    }
1304
1305    #[fuchsia::test]
1306    async fn test_background_flush() {
1307        let fixture = TestFixture::new().await;
1308        {
1309            let file = open_file_checked(
1310                fixture.root(),
1311                "file",
1312                fio::Flags::FLAG_MAYBE_CREATE
1313                    | fio::PERM_READABLE
1314                    | fio::PERM_WRITABLE
1315                    | fio::Flags::PROTOCOL_FILE,
1316                &Default::default(),
1317            )
1318            .await;
1319            let object_id = file
1320                .get_attributes(fio::NodeAttributesQuery::ID)
1321                .await
1322                .expect("Fidl get attr")
1323                .expect("get attr")
1324                .1
1325                .id
1326                .unwrap();
1327
1328            // Write some data to the file, which will only go to the cache for now.
1329            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1330
1331            // Initialized to the default size.
1332            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1333            let volume = fixture.volume().volume().clone();
1334
1335            let data_has_persisted = || async {
1336                // We have to reopen the object each time since this is a distinct handle from the
1337                // one managed by the FxFile.
1338                let object =
1339                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1340                        .await
1341                        .expect("open_object failed");
1342                let data = object.contents(8192).await.expect("read failed");
1343                data.len() == 1 && data[..] == [123u8]
1344            };
1345            assert!(!data_has_persisted().await);
1346
1347            fixture.volume().volume().start_background_task(
1348                MemoryPressureConfig {
1349                    mem_normal: MemoryPressureLevelConfig {
1350                        background_task_period: Duration::from_millis(100),
1351                        background_task_initial_delay: Duration::from_millis(100),
1352                        ..Default::default()
1353                    },
1354                    mem_warning: Default::default(),
1355                    mem_critical: Default::default(),
1356                },
1357                None,
1358            );
1359
1360            const MAX_WAIT: Duration = Duration::from_secs(20);
1361            let wait_increments = Duration::from_millis(400);
1362            let mut total_waited = Duration::ZERO;
1363
1364            while total_waited < MAX_WAIT {
1365                fasync::Timer::new(wait_increments).await;
1366                total_waited += wait_increments;
1367
1368                if data_has_persisted().await {
1369                    break;
1370                }
1371            }
1372
1373            assert!(data_has_persisted().await);
1374        }
1375
1376        fixture.close().await;
1377    }
1378
1379    #[fuchsia::test(threads = 2)]
1380    async fn test_background_flush_with_warning_memory_pressure() {
1381        let fixture = TestFixture::new().await;
1382        {
1383            let file = open_file_checked(
1384                fixture.root(),
1385                "file",
1386                fio::Flags::FLAG_MAYBE_CREATE
1387                    | fio::PERM_READABLE
1388                    | fio::PERM_WRITABLE
1389                    | fio::Flags::PROTOCOL_FILE,
1390                &Default::default(),
1391            )
1392            .await;
1393            let object_id = file
1394                .get_attributes(fio::NodeAttributesQuery::ID)
1395                .await
1396                .expect("Fidl get attr")
1397                .expect("get attr")
1398                .1
1399                .id
1400                .unwrap();
1401
1402            // Write some data to the file, which will only go to the cache for now.
1403            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1404
1405            // Initialized to the default size.
1406            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1407            let volume = fixture.volume().volume().clone();
1408
1409            let data_has_persisted = || async {
1410                // We have to reopen the object each time since this is a distinct handle from the
1411                // one managed by the FxFile.
1412                let object =
1413                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1414                        .await
1415                        .expect("open_object failed");
1416                let data = object.contents(8192).await.expect("read failed");
1417                data.len() == 1 && data[..] == [123u8]
1418            };
1419            assert!(!data_has_persisted().await);
1420
1421            // Configure the flush task to only flush quickly on warning.
1422            let flush_config = MemoryPressureConfig {
1423                mem_normal: MemoryPressureLevelConfig {
1424                    background_task_period: Duration::from_secs(20),
1425                    cache_size_limit: DIRENT_CACHE_LIMIT,
1426                    ..Default::default()
1427                },
1428                mem_warning: MemoryPressureLevelConfig {
1429                    background_task_period: Duration::from_millis(100),
1430                    cache_size_limit: 100,
1431                    background_task_initial_delay: Duration::from_millis(100),
1432                    ..Default::default()
1433                },
1434                mem_critical: MemoryPressureLevelConfig {
1435                    background_task_period: Duration::from_secs(20),
1436                    cache_size_limit: 50,
1437                    ..Default::default()
1438                },
1439            };
1440            fixture.volume().volume().start_background_task(
1441                flush_config,
1442                fixture.volumes_directory().memory_pressure_monitor(),
1443            );
1444
1445            // Send the memory pressure update.
1446            fixture
1447                .memory_pressure_proxy()
1448                .on_level_changed(MemoryPressureLevel::Warning)
1449                .await
1450                .expect("Failed to send memory pressure level change");
1451
1452            // Wait a bit of time for the flush to occur (but less than the normal and critical
1453            // periods).
1454            const MAX_WAIT: Duration = Duration::from_secs(3);
1455            let wait_increments = Duration::from_millis(400);
1456            let mut total_waited = Duration::ZERO;
1457
1458            while total_waited < MAX_WAIT {
1459                fasync::Timer::new(wait_increments).await;
1460                total_waited += wait_increments;
1461
1462                if data_has_persisted().await {
1463                    break;
1464                }
1465            }
1466
1467            assert!(data_has_persisted().await);
1468            assert_eq!(fixture.volume().volume().dirent_cache().limit(), 100);
1469        }
1470
1471        fixture.close().await;
1472    }
1473
1474    #[fuchsia::test(threads = 2)]
1475    async fn test_background_flush_with_critical_memory_pressure() {
1476        let fixture = TestFixture::new().await;
1477        {
1478            let file = open_file_checked(
1479                fixture.root(),
1480                "file",
1481                fio::Flags::FLAG_MAYBE_CREATE
1482                    | fio::PERM_READABLE
1483                    | fio::PERM_WRITABLE
1484                    | fio::Flags::PROTOCOL_FILE,
1485                &Default::default(),
1486            )
1487            .await;
1488            let object_id = file
1489                .get_attributes(fio::NodeAttributesQuery::ID)
1490                .await
1491                .expect("Fidl get attr")
1492                .expect("get attr")
1493                .1
1494                .id
1495                .unwrap();
1496
1497            // Write some data to the file, which will only go to the cache for now.
1498            file.write_at(&[123u8], 0).await.expect("FIDL write_at").expect("write_at");
1499
1500            // Initialized to the default size.
1501            assert_eq!(fixture.volume().volume().dirent_cache().limit(), DIRENT_CACHE_LIMIT);
1502            let volume = fixture.volume().volume().clone();
1503
1504            let data_has_persisted = || async {
1505                // We have to reopen the object each time since this is a distinct handle from the
1506                // one managed by the FxFile.
1507                let object =
1508                    ObjectStore::open_object(&volume, object_id, HandleOptions::default(), None)
1509                        .await
1510                        .expect("open_object failed");
1511                let data = object.contents(8192).await.expect("read failed");
1512                data.len() == 1 && data[..] == [123u8]
1513            };
1514            assert!(!data_has_persisted().await);
1515
1516            let flush_config = MemoryPressureConfig {
1517                mem_normal: MemoryPressureLevelConfig {
1518                    cache_size_limit: DIRENT_CACHE_LIMIT,
1519                    ..Default::default()
1520                },
1521                mem_warning: MemoryPressureLevelConfig {
1522                    cache_size_limit: 100,
1523                    ..Default::default()
1524                },
1525                mem_critical: MemoryPressureLevelConfig {
1526                    cache_size_limit: 50,
1527                    ..Default::default()
1528                },
1529            };
1530            fixture.volume().volume().start_background_task(
1531                flush_config,
1532                fixture.volumes_directory().memory_pressure_monitor(),
1533            );
1534
1535            // Send the memory pressure update.
1536            fixture
1537                .memory_pressure_proxy()
1538                .on_level_changed(MemoryPressureLevel::Critical)
1539                .await
1540                .expect("Failed to send memory pressure level change");
1541
1542            // Critical memory should trigger a flush immediately so expect a flush very quickly.
1543            const MAX_WAIT: Duration = Duration::from_secs(2);
1544            let wait_increments = Duration::from_millis(400);
1545            let mut total_waited = Duration::ZERO;
1546
1547            while total_waited < MAX_WAIT {
1548                fasync::Timer::new(wait_increments).await;
1549                total_waited += wait_increments;
1550
1551                if data_has_persisted().await {
1552                    break;
1553                }
1554            }
1555
1556            assert!(data_has_persisted().await);
1557            assert_eq!(fixture.volume().volume().dirent_cache().limit(), 50);
1558        }
1559
1560        fixture.close().await;
1561    }
1562
1563    // This test verifies that it is safe to query a volume after it is unmounted in case of a race
1564    // during unmount/shutdown.
1565    #[fuchsia::test(threads = 2)]
1566    async fn test_query_info_unmounted_volume() {
1567        const TEST_VOLUME: &str = "test_1234";
1568        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1569
1570        let fixture = TestFixture::new().await;
1571        {
1572            let volumes_directory = fixture.volumes_directory();
1573            let volume = volumes_directory
1574                .create_and_mount_volume(TEST_VOLUME, Some(crypt.clone()), false, None)
1575                .await
1576                .unwrap();
1577
1578            assert!(volume.volume().get_volume_data().await.is_some());
1579
1580            // Unmount it but keep a reference.
1581            volumes_directory
1582                .lock()
1583                .await
1584                .unmount(volume.volume().store().store_object_id())
1585                .await
1586                .expect("unmount failed");
1587
1588            // This returns None, but doesn't crash.
1589            assert!(volume.volume().get_volume_data().await.is_none());
1590        }
1591        fixture.close().await;
1592    }
1593
1594    #[fuchsia::test]
1595    async fn test_project_limit_persistence() {
1596        const BYTES_LIMIT_1: u64 = 123456;
1597        const NODES_LIMIT_1: u64 = 4321;
1598        const BYTES_LIMIT_2: u64 = 456789;
1599        const NODES_LIMIT_2: u64 = 9876;
1600        const VOLUME_NAME: &str = "A";
1601        const FILE_NAME: &str = "B";
1602        const PROJECT_ID: u64 = 42;
1603        const PROJECT_ID2: u64 = 343;
1604        let volume_store_id;
1605        let node_id;
1606        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
1607        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1608        {
1609            let blob_resupplied_count =
1610                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1611            let volumes_directory = VolumesDirectory::new(
1612                root_volume(filesystem.clone()).await.unwrap(),
1613                Weak::new(),
1614                None,
1615                blob_resupplied_count,
1616                MemoryPressureConfig::default(),
1617            )
1618            .await
1619            .unwrap();
1620
1621            let volume_and_root = volumes_directory
1622                .create_and_mount_volume(VOLUME_NAME, None, false, None)
1623                .await
1624                .expect("create unencrypted volume failed");
1625            volume_store_id = volume_and_root.volume().store().store_object_id();
1626
1627            let (volume_dir_proxy, dir_server_end) =
1628                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1629            volumes_directory
1630                .serve_volume(&volume_and_root, dir_server_end, false)
1631                .expect("serve_volume failed");
1632
1633            let project_proxy =
1634                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1635                    .expect("Unable to connect to project id service");
1636
1637            project_proxy
1638                .set_limit(0, BYTES_LIMIT_1, NODES_LIMIT_1)
1639                .await
1640                .unwrap()
1641                .expect_err("Should not set limits for project id 0");
1642
1643            project_proxy
1644                .set_limit(PROJECT_ID, BYTES_LIMIT_1, NODES_LIMIT_1)
1645                .await
1646                .unwrap()
1647                .expect("To set limits");
1648            {
1649                let BytesAndNodes { bytes, nodes } =
1650                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1651                assert_eq!(bytes, BYTES_LIMIT_1);
1652                assert_eq!(nodes, NODES_LIMIT_1);
1653            }
1654
1655            let file_proxy = {
1656                let (root_proxy, root_server_end) =
1657                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1658                volume_dir_proxy
1659                    .open(
1660                        "root",
1661                        fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
1662                        &Default::default(),
1663                        root_server_end.into_channel(),
1664                    )
1665                    .expect("Failed to open volume root");
1666
1667                open_file_checked(
1668                    &root_proxy,
1669                    FILE_NAME,
1670                    fio::Flags::FLAG_MAYBE_CREATE
1671                        | fio::PERM_READABLE
1672                        | fio::PERM_WRITABLE
1673                        | fio::Flags::PROTOCOL_FILE,
1674                    &Default::default(),
1675                )
1676                .await
1677            };
1678
1679            let (_, immutable_attributes) =
1680                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
1681            node_id = immutable_attributes.id.unwrap();
1682
1683            project_proxy
1684                .set_for_node(node_id, 0)
1685                .await
1686                .unwrap()
1687                .expect_err("Should not set 0 project id");
1688
1689            project_proxy
1690                .set_for_node(node_id, PROJECT_ID)
1691                .await
1692                .unwrap()
1693                .expect("Setting project on node");
1694
1695            project_proxy
1696                .set_limit(PROJECT_ID, BYTES_LIMIT_2, NODES_LIMIT_2)
1697                .await
1698                .unwrap()
1699                .expect("To set limits");
1700            {
1701                let BytesAndNodes { bytes, nodes } =
1702                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1703                assert_eq!(bytes, BYTES_LIMIT_2);
1704                assert_eq!(nodes, NODES_LIMIT_2);
1705            }
1706
1707            assert_eq!(
1708                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1709                PROJECT_ID
1710            );
1711
1712            volumes_directory.terminate().await;
1713            filesystem.close().await.expect("close filesystem failed");
1714        }
1715        device = filesystem.take_device().await;
1716        device.ensure_unique();
1717        device.reopen(false);
1718        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1719        {
1720            fsck(filesystem.clone()).await.expect("Fsck");
1721            fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
1722            let blob_resupplied_count =
1723                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1724            let volumes_directory = VolumesDirectory::new(
1725                root_volume(filesystem.clone()).await.unwrap(),
1726                Weak::new(),
1727                None,
1728                blob_resupplied_count,
1729                MemoryPressureConfig::default(),
1730            )
1731            .await
1732            .unwrap();
1733            let volume_and_root = volumes_directory
1734                .mount_volume(VOLUME_NAME, None, false)
1735                .await
1736                .expect("mount unencrypted volume failed");
1737
1738            let (volume_proxy, _scope) = crate::volumes_directory::serve_startup_volume_proxy(
1739                &volumes_directory,
1740                VOLUME_NAME,
1741            );
1742
1743            let project_proxy = {
1744                let (volume_dir_proxy, dir_server_end) =
1745                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1746                volumes_directory
1747                    .serve_volume(&volume_and_root, dir_server_end, false)
1748                    .expect("serve_volume failed");
1749
1750                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1751                    .expect("Unable to connect to project id service")
1752            };
1753
1754            let usage_bytes_and_nodes = {
1755                let (
1756                    BytesAndNodes { bytes: limit_bytes, nodes: limit_nodes },
1757                    usage_bytes_and_nodes,
1758                ) = project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info");
1759                assert_eq!(limit_bytes, BYTES_LIMIT_2);
1760                assert_eq!(limit_nodes, NODES_LIMIT_2);
1761                usage_bytes_and_nodes
1762            };
1763
1764            // Should be unable to clear the project limit, due to being in use.
1765            project_proxy.clear(PROJECT_ID).await.unwrap().expect("To clear limits");
1766
1767            assert_eq!(
1768                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1769                PROJECT_ID
1770            );
1771            project_proxy
1772                .set_for_node(node_id, PROJECT_ID2)
1773                .await
1774                .unwrap()
1775                .expect("Changing project");
1776            assert_eq!(
1777                project_proxy.get_for_node(node_id).await.unwrap().expect("Checking project"),
1778                PROJECT_ID2
1779            );
1780
1781            assert_eq!(
1782                project_proxy.info(PROJECT_ID).await.unwrap().expect_err("Expect missing limits"),
1783                Status::NOT_FOUND.into_raw()
1784            );
1785            assert_eq!(
1786                project_proxy.info(PROJECT_ID2).await.unwrap().expect("Fetching project info").1,
1787                usage_bytes_and_nodes
1788            );
1789
1790            std::mem::drop(volume_proxy);
1791            volumes_directory.terminate().await;
1792            std::mem::drop(volumes_directory);
1793            filesystem.close().await.expect("close filesystem failed");
1794        }
1795        device = filesystem.take_device().await;
1796        device.ensure_unique();
1797        device.reopen(false);
1798        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1799        fsck(filesystem.clone()).await.expect("Fsck");
1800        fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
1801        let blob_resupplied_count =
1802            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1803        let volumes_directory = VolumesDirectory::new(
1804            root_volume(filesystem.clone()).await.unwrap(),
1805            Weak::new(),
1806            None,
1807            blob_resupplied_count,
1808            MemoryPressureConfig::default(),
1809        )
1810        .await
1811        .unwrap();
1812        let volume_and_root = volumes_directory
1813            .mount_volume(VOLUME_NAME, None, false)
1814            .await
1815            .expect("mount unencrypted volume failed");
1816        let (volume_dir_proxy, dir_server_end) =
1817            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1818        volumes_directory
1819            .serve_volume(&volume_and_root, dir_server_end, false)
1820            .expect("serve_volume failed");
1821        let project_proxy = connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1822            .expect("Unable to connect to project id service");
1823        assert_eq!(
1824            project_proxy.info(PROJECT_ID).await.unwrap().expect_err("Expect missing limits"),
1825            Status::NOT_FOUND.into_raw()
1826        );
1827        volumes_directory.terminate().await;
1828        std::mem::drop(volumes_directory);
1829        filesystem.close().await.expect("close filesystem failed");
1830    }
1831
1832    #[fuchsia::test]
1833    async fn test_project_limit_accounting() {
1834        const BYTES_LIMIT: u64 = 123456;
1835        const NODES_LIMIT: u64 = 4321;
1836        const VOLUME_NAME: &str = "A";
1837        const FILE_NAME: &str = "B";
1838        const PROJECT_ID: u64 = 42;
1839        let volume_store_id;
1840        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
1841        let first_object_id;
1842        let mut bytes_usage;
1843        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1844        {
1845            let blob_resupplied_count =
1846                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1847            let volumes_directory = VolumesDirectory::new(
1848                root_volume(filesystem.clone()).await.unwrap(),
1849                Weak::new(),
1850                None,
1851                blob_resupplied_count,
1852                MemoryPressureConfig::default(),
1853            )
1854            .await
1855            .unwrap();
1856
1857            let volume_and_root = volumes_directory
1858                .create_and_mount_volume(
1859                    VOLUME_NAME,
1860                    Some(Arc::new(new_insecure_crypt())),
1861                    false,
1862                    None,
1863                )
1864                .await
1865                .expect("create unencrypted volume failed");
1866            volume_store_id = volume_and_root.volume().store().store_object_id();
1867
1868            let (volume_dir_proxy, dir_server_end) =
1869                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1870            volumes_directory
1871                .serve_volume(&volume_and_root, dir_server_end, false)
1872                .expect("serve_volume failed");
1873
1874            let project_proxy =
1875                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
1876                    .expect("Unable to connect to project id service");
1877
1878            project_proxy
1879                .set_limit(PROJECT_ID, BYTES_LIMIT, NODES_LIMIT)
1880                .await
1881                .unwrap()
1882                .expect("To set limits");
1883            {
1884                let BytesAndNodes { bytes, nodes } =
1885                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").0;
1886                assert_eq!(bytes, BYTES_LIMIT);
1887                assert_eq!(nodes, NODES_LIMIT);
1888            }
1889
1890            let file_proxy = {
1891                let (root_proxy, root_server_end) =
1892                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1893                volume_dir_proxy
1894                    .open(
1895                        "root",
1896                        fio::PERM_READABLE | fio::PERM_WRITABLE,
1897                        &Default::default(),
1898                        root_server_end.into_channel(),
1899                    )
1900                    .expect("Failed to open volume root");
1901
1902                open_file_checked(
1903                    &root_proxy,
1904                    FILE_NAME,
1905                    fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
1906                    &Default::default(),
1907                )
1908                .await
1909            };
1910
1911            assert_eq!(
1912                8192,
1913                file_proxy
1914                    .write(&vec![0xff as u8; 8192])
1915                    .await
1916                    .expect("FIDL call failed")
1917                    .map_err(Status::from_raw)
1918                    .expect("File write was successful")
1919            );
1920            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
1921
1922            {
1923                let BytesAndNodes { bytes, nodes } =
1924                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
1925                assert_eq!(bytes, 0);
1926                assert_eq!(nodes, 0);
1927            }
1928
1929            let (_, immutable_attributes) =
1930                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
1931            let node_id = immutable_attributes.id.unwrap();
1932
1933            first_object_id = node_id;
1934            project_proxy
1935                .set_for_node(node_id, PROJECT_ID)
1936                .await
1937                .unwrap()
1938                .expect("Setting project on node");
1939
1940            bytes_usage = {
1941                let BytesAndNodes { bytes, nodes } =
1942                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
1943                assert!(bytes > 0);
1944                assert_eq!(nodes, 1);
1945                bytes
1946            };
1947
1948            // Grow the file by a block.
1949            assert_eq!(
1950                8192,
1951                file_proxy
1952                    .write(&vec![0xff as u8; 8192])
1953                    .await
1954                    .expect("FIDL call failed")
1955                    .map_err(Status::from_raw)
1956                    .expect("File write was successful")
1957            );
1958            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
1959            bytes_usage = {
1960                let BytesAndNodes { bytes, nodes } =
1961                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
1962                assert!(bytes > bytes_usage);
1963                assert_eq!(nodes, 1);
1964                bytes
1965            };
1966
1967            volumes_directory.terminate().await;
1968            filesystem.close().await.expect("close filesystem failed");
1969        }
1970        device = filesystem.take_device().await;
1971        device.ensure_unique();
1972        device.reopen(false);
1973        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1974        {
1975            fsck(filesystem.clone()).await.expect("Fsck");
1976            fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
1977                .await
1978                .expect("Fsck volume");
1979            let blob_resupplied_count =
1980                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1981            let volumes_directory = VolumesDirectory::new(
1982                root_volume(filesystem.clone()).await.unwrap(),
1983                Weak::new(),
1984                None,
1985                blob_resupplied_count,
1986                MemoryPressureConfig::default(),
1987            )
1988            .await
1989            .unwrap();
1990            let volume_and_root = volumes_directory
1991                .mount_volume(VOLUME_NAME, Some(Arc::new(new_insecure_crypt())), false)
1992                .await
1993                .expect("mount unencrypted volume failed");
1994
1995            let (root_proxy, project_proxy) = {
1996                let (volume_dir_proxy, dir_server_end) =
1997                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1998                volumes_directory
1999                    .serve_volume(&volume_and_root, dir_server_end, false)
2000                    .expect("serve_volume failed");
2001
2002                let (root_proxy, root_server_end) =
2003                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2004                volume_dir_proxy
2005                    .open(
2006                        "root",
2007                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2008                        &Default::default(),
2009                        root_server_end.into_channel(),
2010                    )
2011                    .expect("Failed to open volume root");
2012                let project_proxy = {
2013                    connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2014                        .expect("Unable to connect to project id service")
2015                };
2016                (root_proxy, project_proxy)
2017            };
2018
2019            {
2020                let BytesAndNodes { bytes, nodes } =
2021                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2022                assert_eq!(bytes, bytes_usage);
2023                assert_eq!(nodes, 1);
2024            }
2025
2026            assert_eq!(
2027                project_proxy
2028                    .get_for_node(first_object_id)
2029                    .await
2030                    .unwrap()
2031                    .expect("Checking project"),
2032                PROJECT_ID
2033            );
2034            root_proxy
2035                .unlink(FILE_NAME, &fio::UnlinkOptions::default())
2036                .await
2037                .expect("FIDL call failed")
2038                .expect("unlink failed");
2039            filesystem.graveyard().flush().await;
2040
2041            {
2042                let BytesAndNodes { bytes, nodes } =
2043                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2044                assert_eq!(bytes, 0);
2045                assert_eq!(nodes, 0);
2046            }
2047
2048            let file_proxy = open_file_checked(
2049                &root_proxy,
2050                FILE_NAME,
2051                fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2052                &Default::default(),
2053            )
2054            .await;
2055
2056            let (_, immutable_attributes) =
2057                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2058            let node_id = immutable_attributes.id.unwrap();
2059
2060            project_proxy
2061                .set_for_node(node_id, PROJECT_ID)
2062                .await
2063                .unwrap()
2064                .expect("Applying project");
2065
2066            bytes_usage = {
2067                let BytesAndNodes { bytes, nodes } =
2068                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2069                // Empty file should have less space than the non-empty file from above.
2070                assert!(bytes < bytes_usage);
2071                assert_eq!(nodes, 1);
2072                bytes
2073            };
2074
2075            assert_eq!(
2076                8192,
2077                file_proxy
2078                    .write(&vec![0xff as u8; 8192])
2079                    .await
2080                    .expect("FIDL call failed")
2081                    .map_err(Status::from_raw)
2082                    .expect("File write was successful")
2083            );
2084            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
2085            bytes_usage = {
2086                let BytesAndNodes { bytes, nodes } =
2087                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2088                assert!(bytes > bytes_usage);
2089                assert_eq!(nodes, 1);
2090                bytes
2091            };
2092
2093            // Trim to zero. Bytes should decrease.
2094            file_proxy.resize(0).await.expect("FIDL call failed").expect("Resize file");
2095            file_proxy.sync().await.expect("FIDL call failed").expect("Sync failed.");
2096            {
2097                let BytesAndNodes { bytes, nodes } =
2098                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2099                assert!(bytes < bytes_usage);
2100                assert_eq!(nodes, 1);
2101            };
2102
2103            // Dropping node from project. Usage should go to zero.
2104            project_proxy
2105                .clear_for_node(node_id)
2106                .await
2107                .expect("FIDL call failed")
2108                .expect("Clear failed.");
2109            {
2110                let BytesAndNodes { bytes, nodes } =
2111                    project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2112                assert_eq!(bytes, 0);
2113                assert_eq!(nodes, 0);
2114            };
2115
2116            volumes_directory.terminate().await;
2117            filesystem.close().await.expect("close filesystem failed");
2118        }
2119        device = filesystem.take_device().await;
2120        device.ensure_unique();
2121        device.reopen(false);
2122        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2123        fsck(filesystem.clone()).await.expect("Fsck");
2124        fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
2125            .await
2126            .expect("Fsck volume");
2127        filesystem.close().await.expect("close filesystem failed");
2128    }
2129
2130    #[fuchsia::test]
2131    async fn test_project_node_inheritance() {
2132        const BYTES_LIMIT: u64 = 123456;
2133        const NODES_LIMIT: u64 = 4321;
2134        const VOLUME_NAME: &str = "A";
2135        const DIR_NAME: &str = "B";
2136        const SUBDIR_NAME: &str = "C";
2137        const FILE_NAME: &str = "D";
2138        const PROJECT_ID: u64 = 42;
2139        let volume_store_id;
2140        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
2141        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2142        {
2143            let blob_resupplied_count =
2144                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2145            let volumes_directory = VolumesDirectory::new(
2146                root_volume(filesystem.clone()).await.unwrap(),
2147                Weak::new(),
2148                None,
2149                blob_resupplied_count,
2150                MemoryPressureConfig::default(),
2151            )
2152            .await
2153            .unwrap();
2154
2155            let volume_and_root = volumes_directory
2156                .create_and_mount_volume(
2157                    VOLUME_NAME,
2158                    Some(Arc::new(new_insecure_crypt())),
2159                    false,
2160                    None,
2161                )
2162                .await
2163                .expect("create unencrypted volume failed");
2164            volume_store_id = volume_and_root.volume().store().store_object_id();
2165
2166            let (volume_dir_proxy, dir_server_end) =
2167                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2168            volumes_directory
2169                .serve_volume(&volume_and_root, dir_server_end, false)
2170                .expect("serve_volume failed");
2171
2172            let project_proxy =
2173                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2174                    .expect("Unable to connect to project id service");
2175
2176            project_proxy
2177                .set_limit(PROJECT_ID, BYTES_LIMIT, NODES_LIMIT)
2178                .await
2179                .unwrap()
2180                .expect("To set limits");
2181
2182            let dir_proxy = {
2183                let (root_proxy, root_server_end) =
2184                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2185                volume_dir_proxy
2186                    .open(
2187                        "root",
2188                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2189                        &Default::default(),
2190                        root_server_end.into_channel(),
2191                    )
2192                    .expect("Failed to open volume root");
2193
2194                open_dir_checked(
2195                    &root_proxy,
2196                    DIR_NAME,
2197                    fio::Flags::FLAG_MAYBE_CREATE
2198                        | fio::PERM_READABLE
2199                        | fio::PERM_WRITABLE
2200                        | fio::Flags::PROTOCOL_DIRECTORY,
2201                    Default::default(),
2202                )
2203                .await
2204            };
2205            {
2206                let (_, immutable_attributes) =
2207                    dir_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2208                let node_id = immutable_attributes.id.unwrap();
2209
2210                project_proxy
2211                    .set_for_node(node_id, PROJECT_ID)
2212                    .await
2213                    .unwrap()
2214                    .expect("Setting project on node");
2215            }
2216
2217            let subdir_proxy = open_dir_checked(
2218                &dir_proxy,
2219                SUBDIR_NAME,
2220                fio::Flags::FLAG_MAYBE_CREATE
2221                    | fio::PERM_READABLE
2222                    | fio::PERM_WRITABLE
2223                    | fio::Flags::PROTOCOL_DIRECTORY,
2224                Default::default(),
2225            )
2226            .await;
2227            {
2228                let (_, immutable_attributes) = subdir_proxy
2229                    .get_attributes(fio::NodeAttributesQuery::ID)
2230                    .await
2231                    .unwrap()
2232                    .unwrap();
2233                let node_id = immutable_attributes.id.unwrap();
2234
2235                assert_eq!(
2236                    project_proxy
2237                        .get_for_node(node_id)
2238                        .await
2239                        .unwrap()
2240                        .expect("Setting project on node"),
2241                    PROJECT_ID
2242                );
2243            }
2244
2245            let file_proxy = open_file_checked(
2246                &subdir_proxy,
2247                FILE_NAME,
2248                fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2249                &Default::default(),
2250            )
2251            .await;
2252            {
2253                let (_, immutable_attributes) =
2254                    file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2255                let node_id = immutable_attributes.id.unwrap();
2256
2257                assert_eq!(
2258                    project_proxy
2259                        .get_for_node(node_id)
2260                        .await
2261                        .unwrap()
2262                        .expect("Setting project on node"),
2263                    PROJECT_ID
2264                );
2265            }
2266
2267            // An unnamed temporary file is created slightly differently to a regular file object.
2268            // Just in case, check that it inherits project ID as well.
2269            let tmpfile_proxy = open_file_checked(
2270                &subdir_proxy,
2271                ".",
2272                fio::Flags::PROTOCOL_FILE
2273                    | fio::Flags::FLAG_CREATE_AS_UNNAMED_TEMPORARY
2274                    | fio::PERM_READABLE,
2275                &fio::Options::default(),
2276            )
2277            .await;
2278            {
2279                let (_, immutable_attributes) = tmpfile_proxy
2280                    .get_attributes(fio::NodeAttributesQuery::ID)
2281                    .await
2282                    .unwrap()
2283                    .unwrap();
2284                let node_id: u64 = immutable_attributes.id.unwrap();
2285                assert_eq!(
2286                    project_proxy
2287                        .get_for_node(node_id)
2288                        .await
2289                        .unwrap()
2290                        .expect("Setting project on node"),
2291                    PROJECT_ID
2292                );
2293            }
2294
2295            let BytesAndNodes { nodes, .. } =
2296                project_proxy.info(PROJECT_ID).await.unwrap().expect("Fetching project info").1;
2297            assert_eq!(nodes, 3);
2298            volumes_directory.terminate().await;
2299            filesystem.close().await.expect("close filesystem failed");
2300        }
2301        device = filesystem.take_device().await;
2302        device.ensure_unique();
2303        device.reopen(false);
2304        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2305        fsck(filesystem.clone()).await.expect("Fsck");
2306        fsck_volume(filesystem.as_ref(), volume_store_id, Some(Arc::new(new_insecure_crypt())))
2307            .await
2308            .expect("Fsck volume");
2309        filesystem.close().await.expect("close filesystem failed");
2310    }
2311
2312    #[fuchsia::test]
2313    async fn test_project_listing() {
2314        const VOLUME_NAME: &str = "A";
2315        const FILE_NAME: &str = "B";
2316        const NON_ZERO_PROJECT_ID: u64 = 3;
2317        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
2318        let volume_store_id;
2319        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2320        {
2321            let blob_resupplied_count =
2322                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2323            let volumes_directory = VolumesDirectory::new(
2324                root_volume(filesystem.clone()).await.unwrap(),
2325                Weak::new(),
2326                None,
2327                blob_resupplied_count,
2328                MemoryPressureConfig::default(),
2329            )
2330            .await
2331            .unwrap();
2332            let volume_and_root = volumes_directory
2333                .create_and_mount_volume(VOLUME_NAME, None, false, None)
2334                .await
2335                .expect("create unencrypted volume failed");
2336            volume_store_id = volume_and_root.volume().store().store_object_id();
2337
2338            let (volume_dir_proxy, dir_server_end) =
2339                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2340            volumes_directory
2341                .serve_volume(&volume_and_root, dir_server_end, false)
2342                .expect("serve_volume failed");
2343            let project_proxy =
2344                connect_to_protocol_at_dir_svc::<ProjectIdMarker>(&volume_dir_proxy)
2345                    .expect("Unable to connect to project id service");
2346            // This is just to ensure that the small numbers below can be used for this test.
2347            assert!(FxVolume::MAX_PROJECT_ENTRIES >= 4);
2348            // Create a bunch of proxies. 3 more than the limit to ensure pagination.
2349            let num_entries = u64::try_from(FxVolume::MAX_PROJECT_ENTRIES + 3).unwrap();
2350            for project_id in 1..=num_entries {
2351                project_proxy.set_limit(project_id, 1, 1).await.unwrap().expect("To set limits");
2352            }
2353
2354            // Add one usage entry to be interspersed with the limit entries. Verifies that the
2355            // iterator will progress passed it with no effect.
2356            let file_proxy = {
2357                let (root_proxy, root_server_end) =
2358                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2359                volume_dir_proxy
2360                    .open(
2361                        "root",
2362                        fio::PERM_READABLE | fio::PERM_WRITABLE,
2363                        &Default::default(),
2364                        root_server_end.into_channel(),
2365                    )
2366                    .expect("Failed to open volume root");
2367
2368                open_file_checked(
2369                    &root_proxy,
2370                    FILE_NAME,
2371                    fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE | fio::PERM_WRITABLE,
2372                    &Default::default(),
2373                )
2374                .await
2375            };
2376            let (_, immutable_attributes) =
2377                file_proxy.get_attributes(fio::NodeAttributesQuery::ID).await.unwrap().unwrap();
2378            let node_id = immutable_attributes.id.unwrap();
2379            project_proxy
2380                .set_for_node(node_id, NON_ZERO_PROJECT_ID)
2381                .await
2382                .unwrap()
2383                .expect("Setting project on node");
2384            {
2385                let BytesAndNodes { nodes, .. } = project_proxy
2386                    .info(NON_ZERO_PROJECT_ID)
2387                    .await
2388                    .unwrap()
2389                    .expect("Fetching project info")
2390                    .1;
2391                assert_eq!(nodes, 1);
2392            }
2393
2394            // If this `unwrap()` fails, it is likely the MAX_PROJECT_ENTRIES is too large for fidl.
2395            let (mut entries, mut next_token) =
2396                project_proxy.list(None).await.unwrap().expect("To get project listing");
2397            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2398            assert!(next_token.is_some());
2399            assert!(entries.contains(&1));
2400            assert!(entries.contains(&3));
2401            assert!(!entries.contains(&num_entries));
2402            // Page two should have a small set at the end.
2403            (entries, next_token) = project_proxy
2404                .list(next_token.as_deref())
2405                .await
2406                .unwrap()
2407                .expect("To get project listing");
2408            assert_eq!(entries.len(), 3);
2409            assert!(next_token.is_none());
2410            assert!(entries.contains(&num_entries));
2411            assert!(!entries.contains(&1));
2412            assert!(!entries.contains(&3));
2413            // Delete a couple and list all again, but one has usage still.
2414            project_proxy.clear(1).await.unwrap().expect("Clear project");
2415            project_proxy.clear(3).await.unwrap().expect("Clear project");
2416            (entries, next_token) =
2417                project_proxy.list(None).await.unwrap().expect("To get project listing");
2418            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2419            assert!(next_token.is_some());
2420            assert!(!entries.contains(&num_entries));
2421            assert!(!entries.contains(&1));
2422            assert!(entries.contains(&3));
2423            (entries, next_token) = project_proxy
2424                .list(next_token.as_deref())
2425                .await
2426                .unwrap()
2427                .expect("To get project listing");
2428            assert_eq!(entries.len(), 2);
2429            assert!(next_token.is_none());
2430            assert!(entries.contains(&num_entries));
2431            // Delete two more to hit the edge case.
2432            project_proxy.clear(2).await.unwrap().expect("Clear project");
2433            project_proxy.clear(4).await.unwrap().expect("Clear project");
2434            (entries, next_token) =
2435                project_proxy.list(None).await.unwrap().expect("To get project listing");
2436            assert_eq!(entries.len(), FxVolume::MAX_PROJECT_ENTRIES);
2437            assert!(next_token.is_none());
2438            assert!(entries.contains(&num_entries));
2439            volumes_directory.terminate().await;
2440            filesystem.close().await.expect("close filesystem failed");
2441        }
2442        device = filesystem.take_device().await;
2443        device.ensure_unique();
2444        device.reopen(false);
2445        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2446        fsck(filesystem.clone()).await.expect("Fsck");
2447        fsck_volume(filesystem.as_ref(), volume_store_id, None).await.expect("Fsck volume");
2448        filesystem.close().await.expect("close filesystem failed");
2449    }
2450
2451    #[fuchsia::test(threads = 10)]
2452    async fn test_profile_blob() {
2453        let mut hashes = Vec::new();
2454        let device = {
2455            let fixture = blob_testing::new_blob_fixture().await;
2456
2457            for i in 0..3u64 {
2458                let hash =
2459                    fixture.write_blob(i.to_string().as_bytes(), CompressionMode::Never).await;
2460                hashes.push(hash);
2461            }
2462            fixture.close().await
2463        };
2464        device.ensure_unique();
2465
2466        device.reopen(false);
2467        let mut device = {
2468            let fixture = blob_testing::open_blob_fixture(device).await;
2469            fixture
2470                .volume()
2471                .volume()
2472                .record_or_replay_profile(new_profile_state(true), "foo")
2473                .await
2474                .expect("Recording");
2475
2476            // Page in the zero offsets only to avoid readahead strangeness.
2477            let mut writable = [0u8];
2478            for hash in &hashes {
2479                let vmo = fixture.get_blob_vmo(*hash).await;
2480                vmo.read(&mut writable, 0).expect("Vmo read");
2481            }
2482            fixture.volume().volume().stop_profile_tasks().await;
2483            fixture.close().await
2484        };
2485
2486        // Do this multiple times to ensure that the re-recording doesn't drop anything.
2487        for i in 0..3 {
2488            device.ensure_unique();
2489            device.reopen(false);
2490            let fixture = blob_testing::open_blob_fixture(device).await;
2491            {
2492                // Ensure that nothing is paged in right now.
2493                for hash in &hashes {
2494                    let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2495                    assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2496                }
2497
2498                fixture
2499                    .volume()
2500                    .volume()
2501                    .record_or_replay_profile(new_profile_state(true), "foo")
2502                    .await
2503                    .expect("Replaying");
2504
2505                // Move the file in flight to ensure a new version lands to be used next time.
2506                {
2507                    let store_id = fixture.volume().volume().store().store_object_id();
2508                    let dir = fixture.volume().volume().get_profile_directory().await.unwrap();
2509                    let old_file = dir.lookup("foo").await.unwrap().unwrap().0;
2510                    let mut transaction = fixture
2511                        .fs()
2512                        .clone()
2513                        .new_transaction(
2514                            lock_keys!(
2515                                LockKey::object(store_id, dir.object_id()),
2516                                LockKey::object(store_id, old_file),
2517                            ),
2518                            Options::default(),
2519                        )
2520                        .await
2521                        .unwrap();
2522                    replace_child(&mut transaction, Some((&dir, "foo")), (&dir, &i.to_string()))
2523                        .await
2524                        .expect("Replace old profile.");
2525                    transaction.commit().await.unwrap();
2526                    assert!(
2527                        dir.lookup("foo").await.unwrap().is_none(),
2528                        "Old profile should be moved"
2529                    );
2530                }
2531
2532                // Await all data being played back by checking that things have paged in.
2533                async {
2534                    for hash in &hashes {
2535                        // Fetch vmo this way as well to ensure that the open is counting the file
2536                        // as used in the current recording.
2537                        let _vmo = fixture.get_blob_vmo(*hash).await;
2538                        let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2539                        while blob.vmo().info().unwrap().populated_bytes == 0 {
2540                            fasync::Timer::new(Duration::from_millis(25)).await;
2541                        }
2542                    }
2543                }
2544                .on_timeout(std::time::Duration::from_secs(120), || {
2545                    panic!("Replay did not page in for all VMOs")
2546                })
2547                .await;
2548
2549                // Complete the recording.
2550                fixture.volume().volume().stop_profile_tasks().await;
2551            }
2552            device = fixture.close().await;
2553        }
2554    }
2555
2556    #[fuchsia::test(threads = 10)]
2557    async fn test_profile_file() {
2558        let mut hashes = Vec::new();
2559        let crypt_file_id;
2560        let device = {
2561            let fixture = TestFixture::new().await;
2562            // Include a crypt file to access during the recording. It should not be added.
2563            {
2564                let crypt_dir = open_dir_checked(
2565                    fixture.root(),
2566                    "crypt_dir",
2567                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2568                    Default::default(),
2569                )
2570                .await;
2571                let crypt: Arc<CryptBase> = fixture.crypt().unwrap();
2572                crypt
2573                    .add_wrapping_key(WRAPPING_KEY_ID, [1; 32].into())
2574                    .expect("add_wrapping_key failed");
2575                crypt_dir
2576                    .update_attributes(&fio::MutableNodeAttributes {
2577                        wrapping_key_id: Some(WRAPPING_KEY_ID),
2578                        ..Default::default()
2579                    })
2580                    .await
2581                    .expect("update_attributes wire call failed")
2582                    .expect("update_attributes failed");
2583                let crypt_file = open_file_checked(
2584                    &crypt_dir,
2585                    "crypt_file",
2586                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2587                    &Default::default(),
2588                )
2589                .await;
2590                crypt_file.write("asdf".as_bytes()).await.unwrap().expect("Writing crypt file");
2591                crypt_file_id = crypt_file
2592                    .get_attributes(fio::NodeAttributesQuery::ID)
2593                    .await
2594                    .unwrap()
2595                    .expect("Get id")
2596                    .1
2597                    .id
2598                    .expect("Reading id in response");
2599            }
2600
2601            for i in 0..3u64 {
2602                let file_proxy = open_file_checked(
2603                    fixture.root(),
2604                    &i.to_string(),
2605                    fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2606                    &Default::default(),
2607                )
2608                .await;
2609                file_proxy.write(i.to_string().as_bytes()).await.unwrap().expect("Writing file");
2610                let id = file_proxy
2611                    .get_attributes(fio::NodeAttributesQuery::ID)
2612                    .await
2613                    .unwrap()
2614                    .expect("Get id")
2615                    .1
2616                    .id
2617                    .expect("Reading id in response");
2618                hashes.push((i, id));
2619            }
2620            fixture.close().await
2621        };
2622        device.ensure_unique();
2623
2624        device.reopen(false);
2625        let mut device = {
2626            let fixture = TestFixture::open(
2627                device,
2628                TestFixtureOptions { format: false, ..Default::default() },
2629            )
2630            .await;
2631            fixture
2632                .volume()
2633                .volume()
2634                .record_or_replay_profile(new_profile_state(false), "foo")
2635                .await
2636                .expect("Recording");
2637
2638            {
2639                let crypt: Arc<CryptBase> = fixture.crypt().unwrap();
2640                crypt
2641                    .add_wrapping_key(WRAPPING_KEY_ID, [1; 32].into())
2642                    .expect("add wrapping key failed");
2643                let crypt_file = open_file_checked(
2644                    fixture.root(),
2645                    "crypt_dir/crypt_file",
2646                    fio::PERM_READABLE,
2647                    &Default::default(),
2648                )
2649                .await;
2650                crypt_file.read(1).await.unwrap().expect("Reading crypt file");
2651            }
2652            // Page in the zero offsets only to avoid readahead strangeness.
2653            for (i, _) in &hashes {
2654                let file_proxy = open_file_checked(
2655                    fixture.root(),
2656                    &i.to_string(),
2657                    fio::PERM_READABLE,
2658                    &Default::default(),
2659                )
2660                .await;
2661                file_proxy.read(1).await.unwrap().expect("Reading file");
2662            }
2663            fixture.volume().volume().stop_profile_tasks().await;
2664            fixture.close().await
2665        };
2666
2667        // Do this multiple times to ensure that the re-recording doesn't drop anything.
2668        for i in 0..3 {
2669            device.ensure_unique();
2670            device.reopen(false);
2671            let fixture = TestFixture::open(
2672                device,
2673                TestFixtureOptions { format: false, ..Default::default() },
2674            )
2675            .await;
2676            {
2677                // Need to get the root vmo to check committed bytes.
2678                let volume = fixture.volume().volume().clone();
2679                // Ensure that nothing is paged in right now.
2680                {
2681                    let crypt_file = volume
2682                        .get_or_load_node(crypt_file_id, ObjectDescriptor::File, None)
2683                        .await
2684                        .expect("Opening file internally")
2685                        .into_any()
2686                        .downcast::<FxFile>()
2687                        .expect("Should be file");
2688                    assert_eq!(crypt_file.vmo().info().unwrap().populated_bytes, 0);
2689                }
2690                for (_, id) in &hashes {
2691                    let file = volume
2692                        .get_or_load_node(*id, ObjectDescriptor::File, None)
2693                        .await
2694                        .expect("Opening file internally")
2695                        .into_any()
2696                        .downcast::<FxFile>()
2697                        .expect("Should be file");
2698                    assert_eq!(file.vmo().info().unwrap().populated_bytes, 0);
2699                }
2700
2701                fixture
2702                    .volume()
2703                    .volume()
2704                    .record_or_replay_profile(new_profile_state(false), "foo")
2705                    .await
2706                    .expect("Replaying");
2707
2708                // Move the file in flight to ensure a new version lands to be used next time.
2709                {
2710                    let store_id = fixture.volume().volume().store().store_object_id();
2711                    let dir = fixture.volume().volume().get_profile_directory().await.unwrap();
2712                    let old_file = dir.lookup("foo").await.unwrap().unwrap().0;
2713                    let mut transaction = fixture
2714                        .fs()
2715                        .clone()
2716                        .new_transaction(
2717                            lock_keys!(
2718                                LockKey::object(store_id, dir.object_id()),
2719                                LockKey::object(store_id, old_file),
2720                            ),
2721                            Options::default(),
2722                        )
2723                        .await
2724                        .unwrap();
2725                    replace_child(&mut transaction, Some((&dir, "foo")), (&dir, &i.to_string()))
2726                        .await
2727                        .expect("Replace old profile.");
2728                    transaction.commit().await.unwrap();
2729                    assert!(
2730                        dir.lookup("foo").await.unwrap().is_none(),
2731                        "Old profile should be moved"
2732                    );
2733                }
2734
2735                // Await all data being played back by checking that things have paged in.
2736                async {
2737                    for (_, id) in &hashes {
2738                        let file = volume
2739                            .get_or_load_node(*id, ObjectDescriptor::File, None)
2740                            .await
2741                            .expect("Opening file internally")
2742                            .into_any()
2743                            .downcast::<FxFile>()
2744                            .expect("Should be file");
2745                        while file.vmo().info().unwrap().populated_bytes == 0 {
2746                            fasync::Timer::new(Duration::from_millis(25)).await;
2747                        }
2748                    }
2749                }
2750                .on_timeout(std::time::Duration::from_secs(120), || {
2751                    panic!("Replay did not page in for all VMOs")
2752                })
2753                .await;
2754                // The crypt file access should not have been recorded or replayed.
2755                {
2756                    let crypt_file = volume
2757                        .get_or_load_node(crypt_file_id, ObjectDescriptor::File, None)
2758                        .await
2759                        .expect("Opening file internally")
2760                        .into_any()
2761                        .downcast::<FxFile>()
2762                        .expect("Should be file");
2763                    assert_eq!(crypt_file.vmo().info().unwrap().populated_bytes, 0);
2764                }
2765
2766                // Open all the files to show that they have been used.
2767                for (i, _) in &hashes {
2768                    let _file_proxy = open_file_checked(
2769                        fixture.root(),
2770                        &i.to_string(),
2771                        fio::PERM_READABLE,
2772                        &Default::default(),
2773                    )
2774                    .await;
2775                }
2776
2777                // Complete the recording.
2778                fixture.volume().volume().stop_profile_tasks().await;
2779            }
2780            device = fixture.close().await;
2781        }
2782    }
2783
2784    #[fuchsia::test(threads = 10)]
2785    async fn test_profile_update() {
2786        let mut hashes = Vec::new();
2787        let device = {
2788            let fixture = blob_testing::new_blob_fixture().await;
2789            for i in 0..2u64 {
2790                let hash =
2791                    fixture.write_blob(i.to_string().as_bytes(), CompressionMode::Never).await;
2792                hashes.push(hash);
2793            }
2794            fixture.close().await
2795        };
2796        device.ensure_unique();
2797
2798        device.reopen(false);
2799        let device = {
2800            let fixture = blob_testing::open_blob_fixture(device).await;
2801
2802            {
2803                let volume = fixture.volume().volume();
2804                volume
2805                    .record_or_replay_profile(new_profile_state(true), "foo")
2806                    .await
2807                    .expect("Recording");
2808
2809                let original_recorded = RECORDED.load(Ordering::Relaxed);
2810
2811                // Page in the zero offsets only to avoid readahead strangeness.
2812                {
2813                    let mut writable = [0u8];
2814                    let hash = &hashes[0];
2815                    let vmo = fixture.get_blob_vmo(*hash).await;
2816                    vmo.read(&mut writable, 0).expect("Vmo read");
2817                }
2818
2819                // The recording happens asynchronously, so we must wait.  This is crude, but it's
2820                // only for testing and it's simple.
2821                while RECORDED.load(Ordering::Relaxed) == original_recorded {
2822                    fasync::Timer::new(std::time::Duration::from_millis(10)).await;
2823                }
2824
2825                volume.stop_profile_tasks().await;
2826            }
2827            fixture.close().await
2828        };
2829
2830        device.ensure_unique();
2831        device.reopen(false);
2832        let fixture = blob_testing::open_blob_fixture(device).await;
2833        {
2834            // Need to get the root vmo to check committed bytes.
2835            // Ensure that nothing is paged in right now.
2836            for hash in &hashes {
2837                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2838                assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2839            }
2840
2841            let volume = fixture.volume().volume();
2842
2843            volume
2844                .record_or_replay_profile(new_profile_state(true), "foo")
2845                .await
2846                .expect("Replaying");
2847
2848            // Await all data being played back by checking that things have paged in.
2849            async {
2850                let hash = &hashes[0];
2851                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2852                while blob.vmo().info().unwrap().populated_bytes == 0 {
2853                    fasync::Timer::new(Duration::from_millis(25)).await;
2854                }
2855            }
2856            .on_timeout(std::time::Duration::from_secs(120), || {
2857                panic!("Replay did not page in for all VMOs")
2858            })
2859            .await;
2860
2861            let original_recorded = RECORDED.load(Ordering::Relaxed);
2862
2863            // Record the new profile that will overwrite it.
2864            {
2865                let mut writable = [0u8];
2866                let hash = &hashes[1];
2867                let vmo = fixture.get_blob_vmo(*hash).await;
2868                vmo.read(&mut writable, 0).expect("Vmo read");
2869            }
2870
2871            // The recording happens asynchronously, so we must wait.  This is crude, but it's only
2872            // for testing and it's simple.
2873            while RECORDED.load(Ordering::Relaxed) == original_recorded {
2874                fasync::Timer::new(std::time::Duration::from_millis(10)).await;
2875            }
2876
2877            // Complete the recording.
2878            volume.stop_profile_tasks().await;
2879        }
2880        let device = fixture.close().await;
2881
2882        device.ensure_unique();
2883        device.reopen(false);
2884        let fixture = blob_testing::open_blob_fixture(device).await;
2885        {
2886            // Need to get the root vmo to check committed bytes.
2887            // Ensure that nothing is paged in right now.
2888            for hash in &hashes {
2889                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2890                assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2891            }
2892
2893            fixture
2894                .volume()
2895                .volume()
2896                .record_or_replay_profile(new_profile_state(true), "foo")
2897                .await
2898                .expect("Replaying");
2899
2900            // Await all data being played back by checking that things have paged in.
2901            async {
2902                let hash = &hashes[1];
2903                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2904                while blob.vmo().info().unwrap().populated_bytes == 0 {
2905                    fasync::Timer::new(Duration::from_millis(25)).await;
2906                }
2907            }
2908            .on_timeout(std::time::Duration::from_secs(30), || {
2909                panic!("Replay did not page in for all VMOs")
2910            })
2911            .await;
2912
2913            // Complete the recording.
2914            fixture.volume().volume().stop_profile_tasks().await;
2915
2916            // Verify that first blob was not paged in as the it should be dropped from the profile.
2917            {
2918                let hash = &hashes[0];
2919                let blob = fixture.get_blob(*hash).await.expect("Opening blob");
2920                assert_eq!(blob.vmo().info().unwrap().populated_bytes, 0);
2921            }
2922        }
2923        fixture.close().await;
2924    }
2925
2926    #[fuchsia::test(threads = 10)]
2927    async fn test_unencrypted_volume() {
2928        let fixture = TestFixture::new_unencrypted().await;
2929        let root = fixture.root();
2930
2931        let f = open_file_checked(
2932            &root,
2933            "foo",
2934            fio::Flags::FLAG_MAYBE_CREATE | fio::Flags::PROTOCOL_FILE,
2935            &Default::default(),
2936        )
2937        .await;
2938        close_file_checked(f).await;
2939
2940        fixture.close().await;
2941    }
2942
2943    #[fuchsia::test]
2944    async fn test_read_only_unencrypted_volume() {
2945        // Make a new Fxfs filesystem with an unencrypted volume named "vol".
2946        let fs = {
2947            let device = fxfs::filesystem::mkfs_with_volume(
2948                DeviceHolder::new(FakeDevice::new(8192, 512)),
2949                "vol",
2950                None,
2951            )
2952            .await
2953            .unwrap();
2954            // Re-open the device as read-only and mount the filesystem as read-only.
2955            device.reopen(true);
2956            FxFilesystemBuilder::new().read_only(true).open(device).await.unwrap()
2957        };
2958        // Ensure we can access the volume and gracefully terminate any tasks.
2959        {
2960            let root_volume = root_volume(fs.clone()).await.unwrap();
2961            let store = root_volume.volume("vol", StoreOptions::default()).await.unwrap();
2962            let unique_id = store.store_object_id();
2963            let blob_resupplied_count =
2964                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2965            let volume = FxVolume::new(
2966                Weak::new(),
2967                store,
2968                unique_id,
2969                "vol".to_owned(),
2970                blob_resupplied_count,
2971                MemoryPressureConfig::default(),
2972            )
2973            .unwrap();
2974            volume.terminate().await;
2975        }
2976        // Close the filesystem, and make sure we don't have any dangling references.
2977        fs.close().await.unwrap();
2978        let device = fs.take_device().await;
2979        device.ensure_unique();
2980    }
2981
2982    #[fuchsia::test]
2983    async fn test_read_only_encrypted_volume() {
2984        let crypt: Arc<CryptBase> = Arc::new(new_insecure_crypt());
2985        // Make a new Fxfs filesystem with an encrypted volume named "vol".
2986        let fs = {
2987            let device = fxfs::filesystem::mkfs_with_volume(
2988                DeviceHolder::new(FakeDevice::new(8192, 512)),
2989                "vol",
2990                Some(crypt.clone()),
2991            )
2992            .await
2993            .unwrap();
2994            // Re-open the device as read-only and mount the filesystem as read-only.
2995            device.reopen(true);
2996            FxFilesystemBuilder::new().read_only(true).open(device).await.unwrap()
2997        };
2998        // Ensure we can access the volume and gracefully terminate any tasks.
2999        {
3000            let root_volume = root_volume(fs.clone()).await.unwrap();
3001            let store = root_volume
3002                .volume("vol", StoreOptions { crypt: Some(crypt), ..StoreOptions::default() })
3003                .await
3004                .unwrap();
3005            let unique_id = store.store_object_id();
3006            let blob_resupplied_count =
3007                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
3008            let volume = FxVolume::new(
3009                Weak::new(),
3010                store,
3011                unique_id,
3012                "vol".to_owned(),
3013                blob_resupplied_count,
3014                MemoryPressureConfig::default(),
3015            )
3016            .unwrap();
3017            volume.terminate().await;
3018        }
3019        // Close the filesystem, and make sure we don't have any dangling references.
3020        fs.close().await.unwrap();
3021        let device = fs.take_device().await;
3022        device.ensure_unique();
3023    }
3024}