Skip to main content

fxfs_platform/fuchsia/
volumes_directory.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::RemoteCrypt;
6use crate::fuchsia::component::map_to_raw_status;
7use crate::fuchsia::directory::FxDirectory;
8use crate::fuchsia::errors::map_to_status;
9use crate::fuchsia::fxblob::BlobDirectory;
10use crate::fuchsia::memory_pressure::{MemoryPressureLevel, MemoryPressureMonitor};
11use crate::fuchsia::profile::new_profile_state;
12use crate::fuchsia::volume::{FxVolume, FxVolumeAndRoot, MemoryPressureConfig, RootDir};
13use anyhow::{Context, Error, anyhow, ensure};
14use async_trait::async_trait;
15use fidl::endpoints::{DiscoverableProtocolMarker, ServerEnd};
16use fidl_fuchsia_fs::{AdminMarker, AdminRequest, AdminRequestStream};
17use fidl_fuchsia_fs_startup::{
18    CheckOptions, CreateOptions, MountOptions, VolumeRequest, VolumeRequestStream,
19};
20use fidl_fuchsia_fxfs::{FileBackedVolumeProviderMarker, ProjectIdMarker};
21use fs_inspect::{FsInspectTree, FsInspectVolume};
22use futures::stream::FuturesUnordered;
23use futures::{StreamExt, TryStreamExt};
24use fxfs::errors::FxfsError;
25use fxfs::fsck;
26use fxfs::log::*;
27use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
28use fxfs::object_store::volume::RootVolume;
29use fxfs::object_store::{
30    Directory, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions, StoreOwner,
31};
32use fxfs_crypto::Crypt;
33use fxfs_trace::{TraceFutureExt, trace_future_args};
34use refaults_vmo::PageRefaultCounter;
35use rustc_hash::FxHashMap as HashMap;
36use std::sync::atomic::{AtomicU64, Ordering};
37use std::sync::{Arc, OnceLock, Weak};
38use vfs::directory::entry_container::MutableDirectory;
39use vfs::directory::helper::DirectlyMutable;
40use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
41const MEBIBYTE: u64 = 1024 * 1024;
42
43struct ProfileState {
44    task: fasync::Task<()>,
45    profile_name: String,
46    all_volumes: bool,
47}
48
49/// VolumesDirectory is a special pseudo-directory used to enumerate and operate on volumes.
50/// Volume creation happens via fuchsia.fs.startup.Volumes.Create, rather than open.
51///
52/// Note that VolumesDirectory assumes exclusive access to |root_volume| and if volumes are
53/// manipulated from elsewhere, strange things will happen.
54pub struct VolumesDirectory {
55    root_volume: RootVolume,
56    directory_node: Arc<vfs::directory::immutable::Simple>,
57    mounted_volumes: futures::lock::Mutex<HashMap<u64, MountedVolume>>,
58    inspect_tree: Weak<FsInspectTree>,
59    mem_monitor: Option<MemoryPressureMonitor>,
60    blob_resupplied_count: Arc<PageRefaultCounter>,
61    // The state of profile recordings. Should be locked *after* mounted_volumes.
62    profiling_state: futures::lock::Mutex<Option<ProfileState>>,
63
64    /// A running estimate of the number of dirty bytes outstanding in all pager-backed VMOs across
65    /// all volumes.
66    pager_dirty_bytes_count: PagerDirtyByteCount,
67
68    /// Max outstanding dirty bytes under critical memory pressure. This could be hardcoded, but is
69    /// broken out for testing.
70    max_dirty_bytes_when_critical: AtomicU64,
71
72    // A callback to invoke when a volume is added.  When the volume is removed, this is called
73    // again with `None` as the second parameter.
74    on_volume_added: OnceLock<Box<dyn Fn(&str, Option<Arc<FxVolume>>) + Send + Sync>>,
75
76    /// The cache configuration to use under different memory pressure levels.
77    memory_pressure_config: MemoryPressureConfig,
78}
79
80/// Operations on VolumesDirectory that cannot be performed concurrently (i.e. most
81/// volume creation/removal ops) should exist on this guard instead of VolumesDirectory.
82pub struct MountedVolumesGuard<'a> {
83    volumes_directory: Arc<VolumesDirectory>,
84    mounted_volumes: futures::lock::MutexGuard<'a, HashMap<u64, MountedVolume>>,
85}
86
87struct MountedVolume {
88    sequence: u64,
89    volume: FxVolumeAndRoot,
90
91    // True if the volume was forcibly locked.
92    locked: bool,
93}
94
95pub(crate) enum Mode {
96    Mount,
97    Create { guid: Option<[u8; 16]>, low_32_bit_object_ids: bool },
98}
99
100impl MountedVolumesGuard<'_> {
101    /// Creates or mounts a volume. If |crypt| is set, the volume will be created or mounted as
102    /// encrypted. The volume is mounted according to |as_blob|.
103    async fn create_or_mount_volume(
104        &mut self,
105        name: &str,
106        crypt: Option<Arc<dyn Crypt>>,
107        mode: Mode,
108        as_blob: bool,
109    ) -> Result<FxVolumeAndRoot, Error> {
110        let owner = Arc::downgrade(&self.volumes_directory) as Weak<dyn StoreOwner>;
111        let store = match mode {
112            Mode::Create { guid, low_32_bit_object_ids } => self
113                .volumes_directory
114                .root_volume
115                .new_volume(
116                    name,
117                    NewChildStoreOptions {
118                        options: StoreOptions { owner, crypt },
119                        guid,
120                        low_32_bit_object_ids,
121                        ..Default::default()
122                    },
123                )
124                .await
125                .context("failed to create new volume")?,
126            Mode::Mount => {
127                self.volumes_directory
128                    .root_volume
129                    .volume(name, StoreOptions { owner, crypt })
130                    .await?
131            }
132        };
133        ensure!(
134            !self.mounted_volumes.contains_key(&store.store_object_id()),
135            FxfsError::AlreadyBound
136        );
137
138        let volume = if as_blob {
139            self.mount_store::<BlobDirectory>(
140                name,
141                store,
142                self.volumes_directory.memory_pressure_config,
143            )
144            .await?
145        } else {
146            self.mount_store::<FxDirectory>(
147                name,
148                store,
149                self.volumes_directory.memory_pressure_config,
150            )
151            .await?
152        };
153        // If there is an ongoing profile activity, we should apply it to the mounted volume.
154        if let Some(ProfileState { profile_name, all_volumes: true, .. }) =
155            &(*self.volumes_directory.profiling_state.lock().await)
156        {
157            if let Err(e) = volume
158                .volume()
159                .record_or_replay_profile(new_profile_state(as_blob), profile_name)
160                .await
161            {
162                error!(
163                    "Failed to record or replay profile '{}' for volume {}: {:?}",
164                    profile_name, name, e
165                );
166            }
167        }
168
169        if let Mode::Create { .. } = mode {
170            let store_object_id = volume.volume().store().store_object_id();
171            self.volumes_directory.add_directory_entry(name, store_object_id);
172        }
173        Ok(volume)
174    }
175
176    // Mounts the given store.  A lock *must* be held on the volume directory.
177    async fn mount_store<T: From<Directory<FxVolume>> + RootDir>(
178        &mut self,
179        name: &str,
180        store: Arc<ObjectStore>,
181        flush_task_config: MemoryPressureConfig,
182    ) -> Result<FxVolumeAndRoot, Error> {
183        let unique_id = zx::Event::create();
184        let volume = FxVolumeAndRoot::new::<T>(
185            Arc::downgrade(&self.volumes_directory),
186            store,
187            unique_id.koid().unwrap().raw_koid(),
188            name.to_owned(),
189            self.volumes_directory.blob_resupplied_count.clone(),
190            self.volumes_directory.memory_pressure_config,
191        )
192        .await?;
193        volume
194            .volume()
195            .start_background_task(flush_task_config, self.volumes_directory.mem_monitor.as_ref());
196        self.add_mount(name, &volume);
197        Ok(volume)
198    }
199
200    /// Adds a volume (`FxVolumeAndRoot`) into the mount list.
201    pub fn add_mount(&mut self, name: &str, volume: &FxVolumeAndRoot) {
202        static SEQUENCE: AtomicU64 = AtomicU64::new(0);
203        let sequence = SEQUENCE.fetch_add(1, Ordering::Relaxed);
204        self.mounted_volumes.insert(
205            volume.volume().store().store_object_id(),
206            MountedVolume { sequence, volume: volume.clone(), locked: false },
207        );
208        if let Some(inspect) = self.volumes_directory.inspect_tree.upgrade() {
209            inspect.register_volume(
210                name.to_string(),
211                Arc::downgrade(volume.volume()) as Weak<dyn FsInspectVolume + Send + Sync>,
212            )
213        }
214        if let Some(callback) = self.volumes_directory.on_volume_added.get() {
215            callback(name, Some(volume.volume().clone()));
216        }
217    }
218
219    async fn lock_mount(&self, mounted_volume: &mut MountedVolume) {
220        let MountedVolume { volume, locked, .. } = mounted_volume;
221
222        if let Some(callback) = self.volumes_directory.on_volume_added.get() {
223            callback(&volume.volume().name(), None);
224        }
225        if !*locked {
226            if let Some(inspect) = self.volumes_directory.inspect_tree.upgrade() {
227                inspect.unregister_volume(volume.volume().name());
228            }
229            // We must make sure to remove the root entry which holds strong references to the
230            // volume since otherwise `Volume::try_unwrap` might fail.
231            let _ = volume.outgoing_dir().remove_entry("root", true);
232            volume.volume().terminate().await;
233            *locked = true;
234        }
235    }
236
237    async fn remove_volume(&mut self, name: &str) -> Result<(), Error> {
238        let (object_id, transaction) = self
239            .volumes_directory
240            .root_volume
241            .acquire_transaction_for_remove_volume(name, [], false)
242            .await?;
243
244        // Cowardly refuse to delete a mounted volume.
245        ensure!(!self.mounted_volumes.contains_key(&object_id), FxfsError::AlreadyBound);
246        let directory_node = self.volumes_directory.directory_node.clone();
247        self.volumes_directory
248            .root_volume
249            .delete_volume(name, transaction, || {
250                // This shouldn't fail because the entry should exist.
251                directory_node.remove_entry(name, /* must_be_directory: */ false).unwrap();
252            })
253            .await?;
254        Ok(())
255    }
256
257    async fn terminate(&mut self) {
258        let mut volumes = std::mem::take(&mut *self.mounted_volumes);
259        for mounted_volume in volumes.values_mut() {
260            let admin_scope = mounted_volume.volume.admin_scope();
261            admin_scope.shutdown();
262            admin_scope.wait().await;
263
264            self.lock_mount(mounted_volume).await;
265        }
266    }
267
268    // Unmounts the volume identified by `store_id`.  The caller should take locks to avoid races if
269    // necessary.
270    //
271    // NOTE: This will not terminate any connections on the admin scope.
272    pub async fn unmount(&mut self, store_id: u64) -> Result<FxVolumeAndRoot, Error> {
273        let mut mounted_volume =
274            self.mounted_volumes.remove(&store_id).ok_or(FxfsError::NotFound)?;
275        self.lock_mount(&mut mounted_volume).await;
276        Ok(mounted_volume.volume)
277    }
278
279    async fn force_lock(&mut self, store_id: u64) -> Result<(), Error> {
280        if let Some(mut mounted_volume) = self.mounted_volumes.remove(&store_id) {
281            self.lock_mount(&mut mounted_volume).await;
282            // Reinsert the volume as locked. This looks racy but it isn't, we held the mutable
283            // reference to `self` for this whole duration. `get_mut()` would be cleaner but that
284            // would hold a mutable reference while this call would take another reference.
285            self.mounted_volumes.insert(store_id, mounted_volume);
286        }
287
288        Ok(())
289    }
290
291    // Auto-unmount the volume when the last connection to the volume is closed.
292    fn auto_unmount(&self, store_id: u64) {
293        let volumes_directory = self.volumes_directory.clone();
294        let mounted_volume = self.mounted_volumes.get(&store_id).unwrap();
295        let sequence = mounted_volume.sequence;
296        let admin_scope = mounted_volume.volume.admin_scope().clone();
297        let scope = mounted_volume.volume.volume().scope().clone();
298        fasync::Task::spawn(
299            async move {
300                // Check the admin_scope first because once that has finished, there can never be
301                // any more connections to it.
302                admin_scope.wait().await;
303                scope.wait().await;
304
305                // Check that the same volume is still mounted i.e. there wasn't an explicit
306                // unmount.
307                let mut mounted_volumes = volumes_directory.lock().await;
308                match mounted_volumes.mounted_volumes.get(&store_id) {
309                    Some(m) if m.sequence == sequence => {}
310                    _ => return,
311                }
312
313                warn!(store_id; "Last connection to volume closed without unmount, shutting down");
314                let root_store = volumes_directory.root_volume.volume_directory().store();
315                let fs = root_store.filesystem();
316                let _guard = fs
317                    .lock_manager()
318                    .txn_lock(lock_keys![LockKey::object(
319                        root_store.store_object_id(),
320                        volumes_directory.root_volume.volume_directory().object_id(),
321                    )])
322                    .await;
323
324                if let Err(e) = mounted_volumes.unmount(store_id).await {
325                    warn!(e:?, store_id; "Failed to unmount volume");
326                }
327            }
328            .trace(trace_future_args!("Volume::auto_unmount")),
329        )
330        .detach();
331    }
332}
333
334impl VolumesDirectory {
335    /// Fills the VolumesDirectory with all volumes in |root_volume|.  No volume is opened during
336    /// this.
337    pub async fn new(
338        root_volume: RootVolume,
339        inspect_tree: Weak<FsInspectTree>,
340        mem_monitor: Option<MemoryPressureMonitor>,
341        blob_resupplied_count: Arc<PageRefaultCounter>,
342        memory_pressure_config: MemoryPressureConfig,
343    ) -> Result<Arc<Self>, Error> {
344        let layer_set = root_volume.volume_directory().store().tree().layer_set();
345        let mut merger = layer_set.merger();
346        let me = Arc::new(Self {
347            root_volume,
348            directory_node: vfs::directory::immutable::simple(),
349            mounted_volumes: futures::lock::Mutex::new(HashMap::default()),
350            inspect_tree,
351            mem_monitor,
352            blob_resupplied_count,
353            profiling_state: futures::lock::Mutex::new(None),
354            pager_dirty_bytes_count: PagerDirtyByteCount::new(),
355            max_dirty_bytes_when_critical: AtomicU64::new(zx::system_get_physmem() / 100),
356            on_volume_added: OnceLock::new(),
357            memory_pressure_config,
358        });
359        let mut iter = me.root_volume.volume_directory().iter(&mut merger).await?;
360        while let Some((name, store_id, object_descriptor)) = iter.get() {
361            ensure!(*object_descriptor == ObjectDescriptor::Volume, FxfsError::Inconsistent);
362
363            me.add_directory_entry(&name, store_id);
364
365            iter.advance().await?;
366        }
367        Ok(me)
368    }
369
370    /// Delete a profile for a given volume. Fails if that volume isn't mounted or if there is
371    /// active profile recording or replay.
372    pub async fn delete_profile(
373        self: &Arc<Self>,
374        volume_name: &str,
375        profile_name: &str,
376    ) -> Result<(), zx::Status> {
377        // Volumes lock is taken first to provide consistent lock ordering with mounting a volume.
378        let volumes = self.mounted_volumes.lock().await;
379        let state = self.profiling_state.lock().await;
380
381        // Only allow deletion when no operations are in flight. This removes confusion around
382        // deleting a profile while one is recording with the same name, as the profile will not be
383        // available for deletion until the recording completes. This would also mean that deleting
384        // during a recording may succeed for deleting an older version but will be confusingly
385        // replaced moments later.
386        if state.is_some() {
387            warn!("Failing profile deletion while profile operations are in flight.");
388            return Err(zx::Status::SHOULD_WAIT);
389        }
390        for MountedVolume { volume, .. } in volumes.values() {
391            if volume.volume().name() == volume_name {
392                let dir = Arc::new(FxDirectory::new(
393                    None,
394                    volume.volume().get_profile_directory().await.map_err(map_to_status)?,
395                ));
396                return dir.unlink(profile_name, false).await;
397            }
398        }
399        warn!(volume_name, profile_name; "Volume not found while deleting profile");
400        Err(zx::Status::NOT_FOUND)
401    }
402
403    pub fn memory_pressure_monitor(&self) -> Option<&MemoryPressureMonitor> {
404        self.mem_monitor.as_ref()
405    }
406
407    /// Stop all ongoing replays, and complete and persist ongoing recordings.
408    pub async fn stop_profile_tasks(self: &Arc<Self>) {
409        let mut state;
410        let volumes;
411        // Take the mounted_volumes lock first to keep consistent lock ordering with other
412        // operations, but don't need to hold it for the entire operation. We need to take the
413        // profiling_state lock before the mounted_volumes lock is dropped to ensure that another
414        // thread doesn't mount a volume and start a profile task on it in between.
415        {
416            volumes = self
417                .mounted_volumes
418                .lock()
419                .await
420                .values()
421                .map(|v| v.volume.volume().clone()) // Clones of each FxVolume.
422                .collect::<Vec<Arc<FxVolume>>>();
423            state = self.profiling_state.lock().await;
424        }
425        for volume in volumes {
426            volume.stop_profile_tasks().await;
427        }
428        *state = None;
429    }
430
431    /// Record a named profile for a number of seconds, fails if there is an in flight recording or
432    /// replay. The given volume must be unloacked, if no volume is given then all volumes will be
433    /// affected and all volumes mounted during the process will also be affected.
434    pub async fn record_or_replay_profile(
435        self: &Arc<Self>,
436        volume_name: Option<String>,
437        profile_name: String,
438        duration_secs: u32,
439    ) -> Result<(), zx::Status> {
440        // Volumes lock is taken first to provide consistent lock ordering with mounting a volume.
441        let volumes = self.lock().await;
442        let mut state = self.profiling_state.lock().await;
443        if state.is_some() {
444            // Consistency in the recording and replaying cannot be ensured at the volume level
445            // if more than one operation can be in flight at a time.
446            return Err(zx::Status::SHOULD_WAIT);
447        }
448        match volume_name.as_ref() {
449            Some(volume_name) => {
450                let (store_object_id, _, _) = volumes
451                    .volumes_directory
452                    .root_volume
453                    .volume_directory()
454                    .lookup(&volume_name)
455                    .await
456                    .map_err(|_| zx::Status::INTERNAL)?
457                    .ok_or(zx::Status::NOT_FOUND)?;
458                if let Some(MountedVolume { volume, .. }) =
459                    volumes.mounted_volumes.get(&store_object_id)
460                {
461                    let is_blob =
462                        volume.root().clone().into_any().downcast::<BlobDirectory>().is_ok();
463                    if let Err(error) = volume
464                        .volume()
465                        .record_or_replay_profile(new_profile_state(is_blob), &profile_name)
466                        .await
467                    {
468                        error!(
469                            error:?,
470                            profile_name = profile_name.as_str(),
471                            volume_name = volume_name.as_str();
472                            "Failed to record or replay profile",
473                        );
474                        return Err(zx::Status::INTERNAL);
475                    }
476                } else {
477                    return Err(zx::Status::UNAVAILABLE);
478                }
479            }
480            None => {
481                for MountedVolume { volume, .. } in volumes.mounted_volumes.values() {
482                    let is_blob =
483                        volume.root().clone().into_any().downcast::<BlobDirectory>().is_ok();
484                    // Just log the errors, don't stop half-way.
485                    if let Err(error) = volume
486                        .volume()
487                        .record_or_replay_profile(new_profile_state(is_blob), &profile_name)
488                        .await
489                    {
490                        error!(
491                            error:?,
492                            profile_name = profile_name.as_str(),
493                            volume_name = volume.volume().name();
494                            "Failed to record or replay profile",
495                        );
496                    }
497                }
498            }
499        }
500
501        let this = self.clone();
502        let task = fasync::Task::spawn(async move {
503            fasync::Timer::new(fasync::MonotonicDuration::from_seconds(duration_secs.into())).await;
504            this.stop_profile_tasks().await;
505        });
506        *state = Some(ProfileState { task, profile_name, all_volumes: volume_name.is_none() });
507        Ok(())
508    }
509
510    /// Returns the directory node which can be used to provide connections for e.g. enumerating
511    /// entries in the VolumesDirectory.
512    /// Directly manipulating the entries in this node will result in strange behaviour.
513    pub fn directory_node(&self) -> &Arc<vfs::directory::immutable::Simple> {
514        &self.directory_node
515    }
516
517    // This serves as an exclusive lock for operations that manipulate the set of mounted volumes.
518    pub async fn lock<'a>(self: &'a Arc<Self>) -> MountedVolumesGuard<'a> {
519        MountedVolumesGuard {
520            volumes_directory: self.clone(),
521            mounted_volumes: self.mounted_volumes.lock().await,
522        }
523    }
524
525    fn add_directory_entry(self: &Arc<Self>, name: &str, store_id: u64) {
526        let weak = Arc::downgrade(self);
527        let name_owned = Arc::new(name.to_string());
528        self.directory_node
529            .add_entry(
530                name,
531                vfs::service::host(move |requests| {
532                    let weak = weak.clone();
533                    let name = name_owned.clone();
534                    async move {
535                        if let Some(me) = weak.upgrade() {
536                            let _ =
537                                me.handle_volume_requests(name.as_ref(), requests, store_id).await;
538                        }
539                    }
540                }),
541            )
542            .unwrap();
543    }
544
545    /// Creates and mounts a new volume. If |crypt| is set, the volume will be encrypted. The
546    /// volume is mounted according to |as_blob|.
547    pub async fn create_and_mount_volume(
548        self: &Arc<Self>,
549        name: &str,
550        crypt: Option<Arc<dyn Crypt>>,
551        as_blob: bool,
552        guid: Option<[u8; 16]>,
553    ) -> Result<FxVolumeAndRoot, Error> {
554        self.lock()
555            .await
556            .create_or_mount_volume(
557                name,
558                crypt,
559                Mode::Create { guid, low_32_bit_object_ids: false },
560                as_blob,
561            )
562            .await
563    }
564
565    /// Mounts an existing volume. `crypt` will be used to unlock the volume if provided.
566    /// If `as_blob` is `true`, the volume will be mounted as a blob filesystem, otherwise
567    /// it will be treated as a regular fxfs volume.
568    pub async fn mount_volume(
569        self: &Arc<Self>,
570        name: &str,
571        crypt: Option<Arc<dyn Crypt>>,
572        as_blob: bool,
573    ) -> Result<FxVolumeAndRoot, Error> {
574        self.lock().await.create_or_mount_volume(name, crypt, Mode::Mount, as_blob).await
575    }
576
577    /// Removes a volume. The volume must exist but encrypted volume keys are not required.
578    pub async fn remove_volume(self: &Arc<Self>, name: &str) -> Result<(), Error> {
579        self.lock().await.remove_volume(name).await
580    }
581
582    /// Terminates all opened volumes.  This will not cancel any profiling that might be taking
583    /// place.
584    pub async fn terminate(self: &Arc<Self>) {
585        // Abort the profiling timer task.
586        let profiling_state = self.profiling_state.lock().await.take();
587        if let Some(state) = profiling_state {
588            state.task.abort().await;
589        }
590        self.lock().await.terminate().await;
591        // TODO(https://fxbug.dev/452935329): Turn this into a real assert.
592        debug_assert!(
593            self.pager_dirty_bytes_count.load() == 0,
594            "Leaked {} dirty bytes.",
595            self.pager_dirty_bytes_count.load()
596        );
597    }
598
599    /// Serves the given volume on `outgoing_dir_server_end`.
600    pub fn serve_volume(
601        self: &Arc<Self>,
602        volume: &FxVolumeAndRoot,
603        outgoing_dir_server_end: ServerEnd<fio::DirectoryMarker>,
604        as_blob: bool,
605    ) -> Result<(), Error> {
606        // A note regarding strong references to `FxVolume`: connections to services here are all on
607        // the admin scope.  When we force lock a volume, we want to keep the admin scope running
608        // but terminate the volume.  When a volume is terminated in this way, we want to ensure
609        // that there are no outstanding strong references to the volume.  With that in mind, the
610        // services here should not hold strong references.  They can hold a strong reference to
611        // VolumesDirectory and find the volume via the mount list, or they can hold a weak
612        // reference to the FxVolume.  Before upgrading the weak reference, an active guard must be
613        // acquired on the volume's scope first.  This ensures that no new strong references are
614        // taken once the volume has commenced termination.  The "root" entry just below is an
615        // exception that does hold a strong reference.  We handle that by making sure we remove
616        // that entry before we call `FxVolume::terminate`.
617
618        let outgoing_dir = volume.outgoing_dir();
619        outgoing_dir.add_entry("root", volume.root().clone().as_directory_entry())?;
620        let svc_dir = vfs::directory::immutable::simple();
621        outgoing_dir.add_entry("svc", svc_dir.clone())?;
622
623        let store_id = volume.volume().store().store_object_id();
624        let me = self.clone();
625        svc_dir.add_entry(
626            AdminMarker::PROTOCOL_NAME,
627            vfs::service::host(move |requests| {
628                let me = me.clone();
629                async move {
630                    let _ = me.handle_admin_requests(requests, store_id).await;
631                }
632            }),
633        )?;
634        let vol_scope = volume.volume().scope().clone();
635        let weak_vol = Arc::downgrade(volume.volume());
636        {
637            let vol_scope = vol_scope.clone();
638            let weak_vol = weak_vol.clone();
639            svc_dir.add_entry(
640                ProjectIdMarker::PROTOCOL_NAME,
641                vfs::service::host(move |requests| {
642                    let weak_vol = weak_vol.clone();
643                    let scope = vol_scope.clone();
644                    async move {
645                        let _ =
646                            FxVolume::handle_project_id_requests(weak_vol, scope, requests).await;
647                    }
648                }),
649            )?;
650        }
651        svc_dir.add_entry(
652            FileBackedVolumeProviderMarker::PROTOCOL_NAME,
653            vfs::service::host(move |requests| {
654                let weak_vol = weak_vol.clone();
655                let scope = vol_scope.clone();
656                async move {
657                    let _ = FxVolume::handle_file_backed_volume_provider_requests(
658                        weak_vol, scope, requests,
659                    )
660                    .await;
661                }
662            }),
663        )?;
664        volume.root().clone().register_additional_volume_services(&svc_dir)?;
665
666        let scope = volume.admin_scope().clone();
667        let mut flags = fio::PERM_READABLE | fio::PERM_WRITABLE;
668        if as_blob {
669            flags |= fio::PERM_EXECUTABLE;
670        }
671        vfs::directory::serve_on(Arc::clone(outgoing_dir), flags, scope, outgoing_dir_server_end);
672
673        info!(
674            store_id;
675            "Serving volume, pager port koid={}",
676            fasync::EHandle::local().port().koid().unwrap().raw_koid()
677        );
678        Ok(())
679    }
680
681    /// Creates and serves the volume with the given name.
682    pub async fn create_and_serve_volume(
683        self: &Arc<Self>,
684        name: &str,
685        outgoing_directory_server_end: ServerEnd<fio::DirectoryMarker>,
686        mount_options: MountOptions,
687        create_options: CreateOptions,
688    ) -> Result<(), Error> {
689        let mut guard = self.lock().await;
690        let crypt =
691            mount_options.crypt.map(|crypt| Arc::new(RemoteCrypt::new(crypt)) as Arc<dyn Crypt>);
692        let as_blob = mount_options.as_blob.unwrap_or(false);
693        let guid = create_options.guid;
694        let low_32_bit_object_ids = create_options.restrict_inode_ids_to_32_bit.unwrap_or(false);
695        let volume = guard
696            .create_or_mount_volume(
697                name,
698                crypt,
699                Mode::Create { guid, low_32_bit_object_ids },
700                as_blob,
701            )
702            .await?;
703        self.serve_volume(&volume, outgoing_directory_server_end, as_blob)
704            .context("failed to serve volume")?;
705        guard.auto_unmount(volume.volume().store().store_object_id());
706        Ok(())
707    }
708
709    async fn handle_volume_requests(
710        self: &Arc<Self>,
711        name: &str,
712        mut requests: VolumeRequestStream,
713        store_id: u64,
714    ) -> Result<(), Error> {
715        while let Some(request) = requests.try_next().await? {
716            match request {
717                VolumeRequest::Check { responder, options } => {
718                    async move {
719                        responder.send(self.handle_check(store_id, options).await.map_err(
720                            |error| {
721                                error!(error:?, store_id; "Failed to check volume");
722                                map_to_raw_status(error)
723                            },
724                        ))
725                    }
726                    .trace(trace_future_args!("Volume::Check"))
727                    .await?;
728                }
729                VolumeRequest::Mount { responder, outgoing_directory, options } => {
730                    async move {
731                        responder.send(
732                            self.handle_mount(name, store_id, outgoing_directory, options)
733                                .await
734                                .map_err(|error| {
735                                    error!(error:?, name, store_id; "Failed to mount volume");
736                                    map_to_raw_status(error)
737                                }),
738                        )
739                    }
740                    .trace(trace_future_args!("Volume::Mount"))
741                    .await?;
742                }
743                VolumeRequest::SetLimit { responder, bytes } => {
744                    async move {
745                        responder.send(self.handle_set_limit(store_id, bytes).await.map_err(
746                            |error| {
747                                error!(error:?, store_id; "Failed to set volume limit");
748                                map_to_raw_status(error)
749                            },
750                        ))
751                    }
752                    .trace(trace_future_args!("Volume::SetLimit"))
753                    .await?;
754                }
755                VolumeRequest::GetLimit { responder } => {
756                    fxfs_trace::duration!("Volume::GetLimit");
757                    responder.send(Ok(self.handle_get_limit(store_id)))?
758                }
759                VolumeRequest::GetInfo { responder } => {
760                    async move {
761                        let result = self.handle_get_info(store_id).await.map(|guid| {
762                            fidl_fuchsia_fs_startup::VolumeInfo {
763                                guid: Some(guid),
764                                ..Default::default()
765                            }
766                        });
767                        match result {
768                            Ok(response) => responder.send(Ok(&response)),
769                            Err(error) => {
770                                error!(error:?, store_id; "Failed to get volume info");
771                                responder.send(Err(map_to_raw_status(error)))
772                            }
773                        }
774                    }
775                    .trace(trace_future_args!("Volume::GetInfo"))
776                    .await?;
777                }
778            }
779        }
780        Ok(())
781    }
782
783    pub fn memory_pressure_config(&self) -> &MemoryPressureConfig {
784        &self.memory_pressure_config
785    }
786
787    fn is_flush_required_to_dirty(&self, byte_count: u64) -> bool {
788        let mem_pressure = self
789            .mem_monitor
790            .as_ref()
791            .map(|mem_monitor| mem_monitor.level())
792            .unwrap_or(MemoryPressureLevel::Normal);
793        if !matches!(mem_pressure, MemoryPressureLevel::Critical) {
794            return false;
795        }
796
797        let total_dirty = self.pager_dirty_bytes_count.load();
798        total_dirty + byte_count >= self.max_dirty_bytes_when_critical.load(Ordering::Relaxed)
799    }
800
801    /// Reports that a certain number of bytes will be dirtied in a pager-backed VMO. If the memory
802    /// pressure level is critical and fxfs has lots of dirty pages then a new task will be spawned
803    /// in `volume` to flush the dirty pages before `mark_dirty` is called. If the memory pressure
804    /// level is not critical then `mark_dirty` will be synchronously called.
805    pub fn report_pager_dirty(
806        self: Arc<Self>,
807        byte_count: u64,
808        volume: Arc<FxVolume>,
809        mark_dirty: impl FnOnce() + Send + 'static,
810    ) {
811        if !self.is_flush_required_to_dirty(byte_count) {
812            self.pager_dirty_bytes_count.fetch_add(byte_count);
813            mark_dirty();
814        } else {
815            volume.spawn(
816                async move {
817                    let volumes = self.mounted_volumes.lock().await;
818
819                    // Re-check the number of outstanding pager dirty bytes because another thread
820                    // could have raced and flushed the volumes first.
821                    if self.is_flush_required_to_dirty(byte_count) {
822                        debug!(
823                            "Flushing all volumes. Memory pressure is critical & dirty pager bytes \
824                            ({} MiB) >= limit ({} MiB)",
825                            self.pager_dirty_bytes_count.load() / MEBIBYTE,
826                            self.max_dirty_bytes_when_critical.load(Ordering::Relaxed) / MEBIBYTE
827                        );
828
829                        let flushes = FuturesUnordered::new();
830                        for MountedVolume { volume, .. } in volumes.values() {
831                            let vol = volume.volume().clone();
832                            flushes.push(async move {
833                                vol.minimize_memory().await;
834                            });
835                        }
836
837                        flushes.collect::<()>().await;
838                    }
839                    self.pager_dirty_bytes_count.fetch_add(byte_count);
840                    mark_dirty();
841                }
842                .trace(trace_future_args!("flush-before-mark-dirty")),
843            )
844        }
845    }
846
847    /// Reports that a certain number of bytes were cleaned in a pager-backed VMO.
848    pub fn report_pager_clean(&self, byte_count: u64) {
849        let prev_dirty = self.pager_dirty_bytes_count.fetch_sub(byte_count);
850        // TODO(https://fxbug.dev/452935329): Turn this into a real assert.
851        debug_assert!(prev_dirty >= byte_count, "Underflowed dirty bytes.");
852
853        if prev_dirty < byte_count {
854            // An unlikely scenario, but if there was an underflow, reset the pager dirty bytes to
855            // zero.
856            self.pager_dirty_bytes_count.store(0);
857        }
858    }
859
860    async fn handle_check(
861        self: &Arc<Self>,
862        store_id: u64,
863        options: CheckOptions,
864    ) -> Result<(), Error> {
865        let fs = self.root_volume.volume_directory().store().filesystem();
866        let crypt = if let Some(crypt) = options.crypt {
867            Some(Arc::new(RemoteCrypt::new(crypt)) as Arc<dyn Crypt>)
868        } else {
869            None
870        };
871        let result = fsck::fsck_volume(fs.as_ref(), store_id, crypt).await?;
872        // TODO(b/311550633): Stash result in inspect.
873        info!(store_id:%; "{result:?}");
874        Ok(())
875    }
876
877    async fn handle_set_limit(self: &Arc<Self>, store_id: u64, bytes: u64) -> Result<(), Error> {
878        let fs = self.root_volume.volume_directory().store().filesystem();
879        let mut transaction = fs.clone().new_transaction(lock_keys![], Options::default()).await?;
880        fs.allocator().set_bytes_limit(&mut transaction, store_id, bytes)?;
881        transaction.commit().await?;
882        Ok(())
883    }
884
885    fn handle_get_limit(self: &Arc<Self>, store_id: u64) -> u64 {
886        let fs = self.root_volume.volume_directory().store().filesystem();
887        fs.allocator().get_owner_bytes_limit(store_id).unwrap_or_default()
888    }
889
890    async fn handle_get_info(self: &Arc<Self>, store_id: u64) -> Result<[u8; 16], Error> {
891        let fs = self.root_volume.volume_directory().store().filesystem();
892        let store =
893            fs.object_manager().store(store_id).ok_or_else(|| anyhow!("Store not found"))?;
894        Ok(store.guid())
895    }
896
897    async fn handle_mount(
898        self: &Arc<Self>,
899        name: &str,
900        store_id: u64,
901        outgoing_directory_server_end: ServerEnd<fio::DirectoryMarker>,
902        options: MountOptions,
903    ) -> Result<(), Error> {
904        info!(name:%, store_id:%, options:?; "Received mount request");
905        let crypt = options.crypt.map(|crypt| Arc::new(RemoteCrypt::new(crypt)) as Arc<dyn Crypt>);
906        let as_blob = options.as_blob.unwrap_or(false);
907        let mut guard = self.lock().await;
908        let volume = guard
909            .create_or_mount_volume(name, crypt, Mode::Mount, as_blob)
910            .await
911            .context("failed to mount volume")?;
912        self.serve_volume(&volume, outgoing_directory_server_end, as_blob)
913            .context("failed to serve volume")?;
914        guard.auto_unmount(volume.volume().store().store_object_id());
915        Ok(())
916    }
917
918    async fn handle_admin_requests(
919        self: &Arc<Self>,
920        mut stream: AdminRequestStream,
921        store_id: u64,
922    ) -> Result<(), Error> {
923        // If the Admin protocol ever supports more methods, this should change to a while.
924        if let Some(request) = stream.try_next().await.context("Reading request")? {
925            match request {
926                AdminRequest::Shutdown { responder } => {
927                    info!(store_id; "Received shutdown request for volume");
928
929                    let root_store = self.root_volume.volume_directory().store();
930                    let fs = root_store.filesystem();
931                    let _guard = fs
932                        .lock_manager()
933                        .txn_lock(lock_keys![LockKey::object(
934                            root_store.store_object_id(),
935                            self.root_volume.volume_directory().object_id(),
936                        )])
937                        .await;
938
939                    let maybe_volume = self.lock().await.unmount(store_id).await;
940                    responder
941                        .send()
942                        .unwrap_or_else(|e| warn!("Failed to send shutdown response: {}", e));
943
944                    if let Ok(volume) = maybe_volume {
945                        // NOTE: After calling this, this task might be dropped at the next await
946                        // point.
947                        volume.admin_scope().shutdown();
948                    }
949
950                    return Ok(());
951                }
952            }
953        }
954        Ok(())
955    }
956
957    /// Sets a callback which is invoked when a volume is added.  When the volume is removed, this
958    /// is called again with `None` as the second parameter.
959    /// Note that this can only be set once per VolumesDirectory; repeated calls will panic.
960    pub fn set_on_mount_callback<F: Fn(&str, Option<Arc<FxVolume>>) + Send + Sync + 'static>(
961        &self,
962        callback: F,
963    ) {
964        self.on_volume_added.set(Box::new(callback)).ok().unwrap();
965    }
966
967    pub async fn install_volume(
968        self: &Arc<Self>,
969        src: &str,
970        image_file: &str,
971        dst: &str,
972    ) -> Result<(), Error> {
973        let guard = self.lock().await;
974        info!("installing {src}/{image_file} -> {dst}");
975        for MountedVolume { volume, .. } in guard.mounted_volumes.values() {
976            if volume.volume().name() == src {
977                return Err(zx::Status::ALREADY_BOUND)
978                    .with_context(|| format!("volume {src} is already mounted"));
979            }
980            if volume.volume().name() == dst {
981                return Err(zx::Status::ALREADY_BOUND)
982                    .with_context(|| format!("volume {dst} is already mounted"));
983            }
984        }
985        guard.volumes_directory.root_volume.install_volume(&src, &image_file, &dst).await?;
986
987        // The above function ensures that we've deleted `src` and `dst` now exists. Before we
988        // release `guard`, we need to update the entries in the volumes directory accordingly.
989        guard
990            .volumes_directory
991            .directory_node()
992            .remove_entry(src, /* must_be_directory: */ false)
993            .unwrap();
994        guard
995            .volumes_directory
996            .directory_node()
997            .remove_entry(dst, /* must_be_directory: */ false)
998            .unwrap();
999        let new_dst_object_id =
1000            match guard.volumes_directory.root_volume.volume_directory().lookup(dst).await? {
1001                Some((object_id, ObjectDescriptor::Volume, _)) => Ok(object_id),
1002                Some(_) => Err(FxfsError::Inconsistent),
1003                None => Err(FxfsError::NotFound),
1004            }?;
1005        self.add_directory_entry(dst, new_dst_object_id);
1006
1007        info!("install complete");
1008        Ok(())
1009    }
1010}
1011
1012#[async_trait]
1013impl StoreOwner for VolumesDirectory {
1014    async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), Error> {
1015        self.lock().await.force_lock(store.store_object_id()).await
1016    }
1017}
1018
1019#[cfg(test)]
1020pub(crate) fn serve_startup_volume_proxy(
1021    volumes_directory: &Arc<VolumesDirectory>,
1022    volume_name: &str,
1023) -> (fidl_fuchsia_fs_startup::VolumeProxy, vfs::ExecutionScope) {
1024    use vfs::ToObjectRequest;
1025    use vfs::service::ServiceLike;
1026    let scope = vfs::ExecutionScope::new();
1027    let entry = volumes_directory.directory_node().get_entry(volume_name).unwrap();
1028    let service = entry.into_any().downcast::<vfs::service::Service>().unwrap();
1029    let (proxy, server) = fidl::endpoints::create_proxy::<fidl_fuchsia_fs_startup::VolumeMarker>();
1030    service
1031        .connect(
1032            scope.clone(),
1033            Default::default(),
1034            &mut fio::Flags::PROTOCOL_SERVICE.to_object_request(server),
1035        )
1036        .unwrap();
1037    (proxy, scope)
1038}
1039
1040struct PagerDirtyByteCount(AtomicU64);
1041
1042impl PagerDirtyByteCount {
1043    pub fn new() -> Self {
1044        Self(AtomicU64::new(0))
1045    }
1046
1047    pub fn fetch_add(&self, value: u64) -> u64 {
1048        let prev = self.0.fetch_add(value, Ordering::Relaxed);
1049        fxfs_trace::counter!("dirty-bytes", 0, "total" => prev.saturating_add(value));
1050        prev
1051    }
1052
1053    pub fn fetch_sub(&self, value: u64) -> u64 {
1054        let prev = self.0.fetch_sub(value, Ordering::Relaxed);
1055        fxfs_trace::counter!("dirty-bytes", 0, "total" => prev.saturating_sub(value));
1056        prev
1057    }
1058
1059    pub fn load(&self) -> u64 {
1060        self.0.load(Ordering::Relaxed)
1061    }
1062
1063    pub fn store(&self, value: u64) {
1064        self.0.store(value, Ordering::Relaxed);
1065        fxfs_trace::counter!("dirty-bytes", 0, "total" => value);
1066    }
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071    use super::{Mode, serve_startup_volume_proxy};
1072    use crate::fuchsia::RemoteCrypt;
1073    use crate::fuchsia::memory_pressure::MemoryPressureLevel;
1074    use crate::fuchsia::testing::{self, TestFixture, open_dir_checked, open_file_checked};
1075    use crate::fuchsia::volume::MemoryPressureConfig;
1076    use crate::fuchsia::volumes_directory::VolumesDirectory;
1077    use crate::testing::TestFixtureOptions;
1078    use fidl::endpoints::{DiscoverableProtocolMarker, create_proxy, create_request_stream};
1079    use fidl_fuchsia_fs::AdminMarker;
1080    use fidl_fuchsia_fs_startup::{MountOptions, VolumeProxy};
1081    use fidl_fuchsia_fxfs::{CryptRequest, FxfsKey, KeyPurpose, WrappedKey};
1082    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
1083    use fuchsia_fs::file;
1084    use futures::{TryStreamExt, join};
1085    use fxfs::errors::FxfsError;
1086    use fxfs::filesystem::FxFilesystem;
1087    use fxfs::fsck::{FsckOptions, fsck, fsck_volume_with_options, fsck_with_options};
1088    use fxfs::lock_keys;
1089    use fxfs::object_handle::ObjectHandle;
1090    use fxfs::object_store::allocator::Allocator;
1091    use fxfs::object_store::transaction::{LockKey, Options};
1092    use fxfs::object_store::volume::root_volume;
1093    use fxfs_crypto::Crypt;
1094    use fxfs_insecure_crypto::new_insecure_crypt;
1095    use refaults_vmo::PageRefaultCounter;
1096    use std::sync::atomic::Ordering;
1097    use std::sync::{Arc, Weak};
1098    use std::time::Duration;
1099    use storage_device::DeviceHolder;
1100    use storage_device::fake_device::FakeDevice;
1101    use vfs::temp_clone::{TempClonable, unblock};
1102    use zx::Status;
1103    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
1104    async fn write_image_to_file(image: DeviceHolder, file: fio::FileProxy) {
1105        file.resize(image.size()).await.unwrap().expect("resize failed");
1106        let vmo = TempClonable::new(
1107            file.get_backing_memory(fio::VmoFlags::SHARED_BUFFER | fio::VmoFlags::WRITE)
1108                .await
1109                .unwrap()
1110                .expect("get backing memory failed"),
1111        );
1112
1113        const CHUNK_READ_SIZE: usize = 131_072; /* 128 KiB */
1114        let mut buff = image.allocate_buffer(CHUNK_READ_SIZE).await;
1115        let total = image.size();
1116        let mut offset = 0;
1117        while offset < total {
1118            let amount = std::cmp::min(total - offset, CHUNK_READ_SIZE as u64);
1119            image.read(offset, buff.as_mut()).await.expect("image read failed");
1120            {
1121                // *NOTE*: We have to unblock our write to the VMO since it's pager backed and could
1122                // be running on the same thread as the filesystem is.
1123                let vmo = vmo.temp_clone();
1124                let data = buff.as_slice()[0..amount as usize].to_vec();
1125                let offset = offset;
1126                unblock(move || vmo.write(&data, offset)).await.expect("vmo write failed");
1127            }
1128            offset += amount;
1129        }
1130        assert_eq!(offset, total);
1131        file.sync().await.unwrap().expect("sync failed");
1132        file.close().await.unwrap().expect("close failed");
1133    }
1134
1135    #[fuchsia::test]
1136    async fn test_volume_creation() {
1137        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1138        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1139        let blob_resupplied_count =
1140            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1141        let volumes_directory = VolumesDirectory::new(
1142            root_volume(filesystem.clone()).await.unwrap(),
1143            Weak::new(),
1144            None,
1145            blob_resupplied_count,
1146            MemoryPressureConfig::default(),
1147        )
1148        .await
1149        .unwrap();
1150
1151        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1152        {
1153            let vol = volumes_directory
1154                .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1155                .await
1156                .expect("create encrypted volume failed");
1157            vol.volume().store().store_object_id()
1158        };
1159
1160        volumes_directory.terminate().await;
1161        std::mem::drop(volumes_directory);
1162        filesystem.close().await.expect("close filesystem failed");
1163        let device = filesystem.take_device().await;
1164        device.reopen(false);
1165        let filesystem = FxFilesystem::open(device).await.unwrap();
1166        let blob_resupplied_count =
1167            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1168        let volumes_directory = VolumesDirectory::new(
1169            root_volume(filesystem.clone()).await.unwrap(),
1170            Weak::new(),
1171            None,
1172            blob_resupplied_count,
1173            MemoryPressureConfig::default(),
1174        )
1175        .await
1176        .unwrap();
1177
1178        let error = volumes_directory
1179            .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1180            .await
1181            .err()
1182            .expect("Creating existing encrypted volume should fail");
1183        assert!(FxfsError::AlreadyExists.matches(&error));
1184    }
1185
1186    #[fuchsia::test]
1187    async fn test_dirty_pages_accumulate_in_parent() {
1188        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1189        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1190        let blob_resupplied_count =
1191            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1192        let volumes_directory = VolumesDirectory::new(
1193            root_volume(filesystem.clone()).await.unwrap(),
1194            Weak::new(),
1195            None,
1196            blob_resupplied_count,
1197            MemoryPressureConfig::default(),
1198        )
1199        .await
1200        .unwrap();
1201
1202        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1203        let vol = volumes_directory
1204            .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1205            .await
1206            .expect("create encrypted volume failed");
1207        let old_dirty = volumes_directory.pager_dirty_bytes_count.load();
1208
1209        let new_dirty = {
1210            let (root, server_end) = create_proxy::<fio::DirectoryMarker>();
1211            vol.root().clone().serve(fio::PERM_READABLE | fio::PERM_WRITABLE, server_end);
1212            let f = open_file_checked(
1213                &root,
1214                "foo",
1215                fio::Flags::FLAG_MAYBE_CREATE
1216                    | fio::PERM_READABLE
1217                    | fio::PERM_WRITABLE
1218                    | fio::Flags::PROTOCOL_FILE,
1219                &Default::default(),
1220            )
1221            .await;
1222            let buf = vec![0xaa as u8; 8192];
1223            file::write(&f, buf.as_slice()).await.expect("Write");
1224            // It's important to check the dirty bytes before closing the file, as closing can
1225            // trigger a flush.
1226            volumes_directory.pager_dirty_bytes_count.load()
1227        };
1228        assert_ne!(old_dirty, new_dirty);
1229
1230        volumes_directory.terminate().await;
1231        std::mem::drop(volumes_directory);
1232        filesystem.close().await.expect("close filesystem failed");
1233    }
1234
1235    #[fuchsia::test]
1236    async fn test_volume_reopen() {
1237        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1238        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1239        let blob_resupplied_count =
1240            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1241        let volumes_directory = VolumesDirectory::new(
1242            root_volume(filesystem.clone()).await.unwrap(),
1243            Weak::new(),
1244            None,
1245            blob_resupplied_count,
1246            MemoryPressureConfig::default(),
1247        )
1248        .await
1249        .unwrap();
1250
1251        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1252        let volume_id = {
1253            let vol = volumes_directory
1254                .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1255                .await
1256                .expect("create encrypted volume failed");
1257            vol.volume().store().store_object_id()
1258        };
1259
1260        volumes_directory.terminate().await;
1261        std::mem::drop(volumes_directory);
1262        filesystem.close().await.expect("close filesystem failed");
1263        let device = filesystem.take_device().await;
1264        device.reopen(false);
1265        let filesystem = FxFilesystem::open(device).await.unwrap();
1266        let blob_resupplied_count =
1267            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1268        let volumes_directory = VolumesDirectory::new(
1269            root_volume(filesystem.clone()).await.unwrap(),
1270            Weak::new(),
1271            None,
1272            blob_resupplied_count,
1273            MemoryPressureConfig::default(),
1274        )
1275        .await
1276        .unwrap();
1277
1278        {
1279            let vol = volumes_directory
1280                .mount_volume("encrypted", Some(crypt.clone()), false)
1281                .await
1282                .expect("open existing encrypted volume failed");
1283            assert_eq!(vol.volume().store().store_object_id(), volume_id);
1284        }
1285
1286        volumes_directory.terminate().await;
1287        std::mem::drop(volumes_directory);
1288        filesystem.close().await.expect("close filesystem failed");
1289    }
1290
1291    #[fuchsia::test]
1292    async fn test_volume_creation_unencrypted() {
1293        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1294        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1295        let blob_resupplied_count =
1296            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1297        let volumes_directory = VolumesDirectory::new(
1298            root_volume(filesystem.clone()).await.unwrap(),
1299            Weak::new(),
1300            None,
1301            blob_resupplied_count,
1302            MemoryPressureConfig::default(),
1303        )
1304        .await
1305        .unwrap();
1306
1307        {
1308            let vol = volumes_directory
1309                .create_and_mount_volume("unencrypted", None, false, None)
1310                .await
1311                .expect("create unencrypted volume failed");
1312            vol.volume().store().store_object_id()
1313        };
1314
1315        volumes_directory.terminate().await;
1316        std::mem::drop(volumes_directory);
1317        filesystem.close().await.expect("close filesystem failed");
1318        let device = filesystem.take_device().await;
1319        device.reopen(false);
1320        let filesystem = FxFilesystem::open(device).await.unwrap();
1321        let blob_resupplied_count =
1322            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1323        let volumes_directory = VolumesDirectory::new(
1324            root_volume(filesystem.clone()).await.unwrap(),
1325            Weak::new(),
1326            None,
1327            blob_resupplied_count,
1328            MemoryPressureConfig::default(),
1329        )
1330        .await
1331        .unwrap();
1332
1333        let error = volumes_directory
1334            .create_and_mount_volume("unencrypted", None, false, None)
1335            .await
1336            .err()
1337            .expect("Creating existing unencrypted volume should fail");
1338        assert!(FxfsError::AlreadyExists.matches(&error));
1339
1340        volumes_directory.terminate().await;
1341        std::mem::drop(volumes_directory);
1342        filesystem.close().await.expect("close filesystem failed");
1343    }
1344
1345    #[fuchsia::test]
1346    async fn test_volume_reopen_unencrypted() {
1347        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1348        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1349        let blob_resupplied_count =
1350            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1351        let volumes_directory = VolumesDirectory::new(
1352            root_volume(filesystem.clone()).await.unwrap(),
1353            Weak::new(),
1354            None,
1355            blob_resupplied_count,
1356            MemoryPressureConfig::default(),
1357        )
1358        .await
1359        .unwrap();
1360
1361        let volume_id = {
1362            let vol = volumes_directory
1363                .create_and_mount_volume("unencrypted", None, false, None)
1364                .await
1365                .expect("create unencrypted volume failed");
1366            vol.volume().store().store_object_id()
1367        };
1368
1369        volumes_directory.terminate().await;
1370        std::mem::drop(volumes_directory);
1371        filesystem.close().await.expect("close filesystem failed");
1372        let device = filesystem.take_device().await;
1373        device.reopen(false);
1374        let filesystem = FxFilesystem::open(device).await.unwrap();
1375        let blob_resupplied_count =
1376            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1377        let volumes_directory = VolumesDirectory::new(
1378            root_volume(filesystem.clone()).await.unwrap(),
1379            Weak::new(),
1380            None,
1381            blob_resupplied_count,
1382            MemoryPressureConfig::default(),
1383        )
1384        .await
1385        .unwrap();
1386
1387        {
1388            let vol = volumes_directory
1389                .mount_volume("unencrypted", None, false)
1390                .await
1391                .expect("open existing unencrypted volume failed");
1392            assert_eq!(vol.volume().store().store_object_id(), volume_id);
1393        }
1394
1395        volumes_directory.terminate().await;
1396        std::mem::drop(volumes_directory);
1397        filesystem.close().await.expect("close filesystem failed");
1398    }
1399
1400    #[fuchsia::test]
1401    async fn test_volume_enumeration() {
1402        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1403        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1404        let blob_resupplied_count =
1405            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1406        let volumes_directory = VolumesDirectory::new(
1407            root_volume(filesystem.clone()).await.unwrap(),
1408            Weak::new(),
1409            None,
1410            blob_resupplied_count,
1411            MemoryPressureConfig::default(),
1412        )
1413        .await
1414        .unwrap();
1415
1416        // Add an encrypted volume...
1417        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1418        {
1419            volumes_directory
1420                .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1421                .await
1422                .expect("create encrypted volume failed");
1423        };
1424        // And an unencrypted volume.
1425        {
1426            volumes_directory
1427                .create_and_mount_volume("unencrypted", None, false, None)
1428                .await
1429                .expect("create unencrypted volume failed");
1430        };
1431
1432        // Restart, so that we can test enumeration of unopened volumes.
1433        volumes_directory.terminate().await;
1434        std::mem::drop(volumes_directory);
1435        filesystem.close().await.expect("close filesystem failed");
1436        let device = filesystem.take_device().await;
1437        device.reopen(false);
1438        let filesystem = FxFilesystem::open(device).await.unwrap();
1439        let blob_resupplied_count =
1440            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1441        let volumes_directory = VolumesDirectory::new(
1442            root_volume(filesystem.clone()).await.unwrap(),
1443            Weak::new(),
1444            None,
1445            blob_resupplied_count,
1446            MemoryPressureConfig::default(),
1447        )
1448        .await
1449        .unwrap();
1450
1451        let readdir = |dir: Arc<fio::DirectoryProxy>| async move {
1452            let status = dir.rewind().await.expect("FIDL call failed");
1453            Status::ok(status).expect("rewind failed");
1454            let (status, buf) = dir.read_dirents(fio::MAX_BUF).await.expect("FIDL call failed");
1455            Status::ok(status).expect("read_dirents failed");
1456            let mut entries = vec![];
1457            for res in fuchsia_fs::directory::parse_dir_entries(&buf) {
1458                entries.push(res.expect("Failed to parse entry").name);
1459            }
1460            entries
1461        };
1462
1463        let dir_proxy =
1464            Arc::new(vfs::directory::serve_read_only(volumes_directory.directory_node().clone()));
1465        let entries = readdir(dir_proxy.clone()).await;
1466        assert_eq!(entries, [".", "encrypted", "unencrypted"]);
1467
1468        let _vol = volumes_directory
1469            .mount_volume("encrypted", Some(crypt.clone()), false)
1470            .await
1471            .expect("Open encrypted volume failed");
1472
1473        // Ensure that the behaviour is the same after we've opened a volume.
1474        let entries = readdir(dir_proxy).await;
1475        assert_eq!(entries, [".", "encrypted", "unencrypted"]);
1476
1477        volumes_directory.terminate().await;
1478        std::mem::drop(volumes_directory);
1479        filesystem.close().await.expect("close filesystem failed");
1480    }
1481
1482    #[fuchsia::test]
1483    async fn test_get_info() {
1484        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1485        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1486        let root_volume = root_volume(filesystem.clone()).await.unwrap();
1487        let blob_resupplied_count =
1488            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1489        let volumes_directory = VolumesDirectory::new(
1490            root_volume,
1491            Weak::new(),
1492            None,
1493            blob_resupplied_count,
1494            MemoryPressureConfig::default(),
1495        )
1496        .await
1497        .unwrap();
1498
1499        let vol = volumes_directory
1500            .create_and_mount_volume("vol", None, false, None)
1501            .await
1502            .expect("create_and_mount_volume failed");
1503        let guid = vol.volume().store().guid();
1504
1505        let (volume_proxy, _scope) = serve_startup_volume_proxy(&volumes_directory, "vol");
1506
1507        let info: fidl_fuchsia_fs_startup::VolumeInfo = volume_proxy
1508            .get_info()
1509            .await
1510            .expect("get_info failed")
1511            .expect("get_info returned error");
1512        assert_eq!(info.guid, Some(guid));
1513
1514        volumes_directory.terminate().await;
1515    }
1516
1517    #[fuchsia::test]
1518    async fn test_deleted_encrypted_volume_while_mounted() {
1519        const VOLUME_NAME: &str = "encrypted";
1520
1521        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1522        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1523        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1524        let blob_resupplied_count =
1525            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1526        let volumes_directory = VolumesDirectory::new(
1527            root_volume(filesystem.clone()).await.unwrap(),
1528            Weak::new(),
1529            None,
1530            blob_resupplied_count,
1531            MemoryPressureConfig::default(),
1532        )
1533        .await
1534        .unwrap();
1535        volumes_directory
1536            .create_and_mount_volume(VOLUME_NAME, Some(crypt.clone()), false, None)
1537            .await
1538            .expect("create encrypted volume failed");
1539        // We have the volume mounted so delete attempts should fail.
1540        assert!(
1541            FxfsError::AlreadyBound.matches(
1542                &volumes_directory
1543                    .remove_volume(VOLUME_NAME)
1544                    .await
1545                    .err()
1546                    .expect("Deleting volume should fail")
1547            )
1548        );
1549        volumes_directory.terminate().await;
1550        std::mem::drop(volumes_directory);
1551        filesystem.close().await.expect("close filesystem failed");
1552    }
1553
1554    #[fuchsia::test]
1555    async fn test_mount_volume_using_volume_protocol() {
1556        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1557        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1558        let blob_resupplied_count =
1559            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1560        let volumes_directory = VolumesDirectory::new(
1561            root_volume(filesystem.clone()).await.unwrap(),
1562            Weak::new(),
1563            None,
1564            blob_resupplied_count,
1565            MemoryPressureConfig::default(),
1566        )
1567        .await
1568        .unwrap();
1569
1570        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1571        let store_id = {
1572            let vol = volumes_directory
1573                .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1574                .await
1575                .expect("create encrypted volume failed");
1576            vol.volume().store().store_object_id()
1577        };
1578        volumes_directory.lock().await.unmount(store_id).await.expect("unmount failed");
1579
1580        let (volume_proxy, _scope) = serve_startup_volume_proxy(&volumes_directory, "encrypted");
1581
1582        let (dir_proxy, dir_server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1583
1584        let crypt_service = fxfs_crypt::CryptService::new();
1585        crypt_service
1586            .add_wrapping_key(0, fxfs_insecure_crypto::DATA_KEY.to_vec())
1587            .expect("add_wrapping_key failed");
1588        crypt_service
1589            .add_wrapping_key(1, fxfs_insecure_crypto::METADATA_KEY.to_vec())
1590            .expect("add_wrapping_key failed");
1591        crypt_service.set_active_key(KeyPurpose::Data, 0).expect("set_active_key failed");
1592        crypt_service.set_active_key(KeyPurpose::Metadata, 1).expect("set_active_key failed");
1593        let (client1, stream1) = create_request_stream();
1594        let (client2, stream2) = create_request_stream();
1595
1596        join!(
1597            async {
1598                volume_proxy
1599                    .mount(
1600                        dir_server_end,
1601                        MountOptions { crypt: Some(client1), ..MountOptions::default() },
1602                    )
1603                    .await
1604                    .expect("mount (fidl) failed")
1605                    .expect("mount failed");
1606
1607                open_file_checked(
1608                    &dir_proxy,
1609                    "root/test",
1610                    fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::FLAG_MAYBE_CREATE,
1611                    &Default::default(),
1612                )
1613                .await;
1614
1615                // Attempting to mount again should fail with ALREADY_BOUND.
1616                let (_dir_proxy, dir_server_end) =
1617                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1618
1619                assert_eq!(
1620                    Status::from_raw(
1621                        volume_proxy
1622                            .mount(
1623                                dir_server_end,
1624                                MountOptions { crypt: Some(client2), ..MountOptions::default() },
1625                            )
1626                            .await
1627                            .expect("mount (fidl) failed")
1628                            .expect_err("mount succeeded")
1629                    ),
1630                    Status::ALREADY_BOUND
1631                );
1632
1633                std::mem::drop(dir_proxy);
1634
1635                // The volume should get unmounted a short time later.
1636                let mut count = 0;
1637                loop {
1638                    if volumes_directory.mounted_volumes.lock().await.is_empty() {
1639                        break;
1640                    }
1641                    count += 1;
1642                    assert!(count <= 100);
1643                    fasync::Timer::new(Duration::from_millis(100)).await;
1644                }
1645            },
1646            async {
1647                crypt_service
1648                    .handle_request(fxfs_crypt::Services::Crypt(stream1))
1649                    .await
1650                    .expect("handle_request failed");
1651                crypt_service
1652                    .handle_request(fxfs_crypt::Services::Crypt(stream2))
1653                    .await
1654                    .expect("handle_request failed");
1655            }
1656        );
1657        // Make sure the background thread that actually calls terminate() on the volume finishes
1658        // before exiting the test. terminate() should be a no-op since we already verified
1659        // mounted_directories is empty, but the volume's terminate() future in the background task
1660        // may still be outstanding. As both the background task and VolumesDirectory::terminate()
1661        // hold the write lock, we use that to block until the background task has completed.
1662        volumes_directory.terminate().await;
1663    }
1664
1665    #[fuchsia::test]
1666    #[ignore] // TODO(b/293917849) re-enable this test when de-flaked
1667
1668    async fn test_volume_dir_races() {
1669        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1670        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1671        let blob_resupplied_count =
1672            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1673        let volumes_directory = VolumesDirectory::new(
1674            root_volume(filesystem.clone()).await.unwrap(),
1675            Weak::new(),
1676            None,
1677            blob_resupplied_count,
1678            MemoryPressureConfig::default(),
1679        )
1680        .await
1681        .unwrap();
1682
1683        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1684        let store_id = {
1685            let vol = volumes_directory
1686                .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1687                .await
1688                .expect("create encrypted volume failed");
1689            vol.volume().store().store_object_id()
1690        };
1691        volumes_directory.lock().await.unmount(store_id).await.expect("unmount failed");
1692
1693        let (volume_proxy, _scope) = serve_startup_volume_proxy(&volumes_directory, "encrypted");
1694
1695        let crypt_service = Arc::new(fxfs_crypt::CryptService::new());
1696        crypt_service
1697            .add_wrapping_key(0, fxfs_insecure_crypto::DATA_KEY.to_vec())
1698            .expect("add_wrapping_key failed");
1699        crypt_service
1700            .add_wrapping_key(1, fxfs_insecure_crypto::METADATA_KEY.to_vec())
1701            .expect("add_wrapping_key failed");
1702        crypt_service.set_active_key(KeyPurpose::Data, 0).expect("set_active_key failed");
1703        crypt_service.set_active_key(KeyPurpose::Metadata, 1).expect("set_active_key failed");
1704        let (client1, stream1) = create_request_stream();
1705        let (client2, stream2) = create_request_stream();
1706        let crypt_service_clone = crypt_service.clone();
1707        let crypt_task1 = fasync::Task::spawn(async move {
1708            crypt_service_clone
1709                .handle_request(fxfs_crypt::Services::Crypt(stream1))
1710                .await
1711                .expect("handle_request failed");
1712        });
1713        let crypt_task2 = fasync::Task::spawn(async move {
1714            crypt_service
1715                .handle_request(fxfs_crypt::Services::Crypt(stream2))
1716                .await
1717                .expect("handle_request failed");
1718        });
1719
1720        // Create two tasks each of mount and remove, and one to recreate the volume, so that we get
1721        // to exercise a wide variety of concurrent actions.
1722        // Delay remove and create a bit, since mount is slower due to FIDL.
1723        join!(
1724            async {
1725                let (_dir_proxy, dir_server_end) =
1726                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1727                if let Err(status) = volume_proxy
1728                    .mount(
1729                        dir_server_end,
1730                        MountOptions { crypt: Some(client1), ..MountOptions::default() },
1731                    )
1732                    .await
1733                    .expect("mount (fidl) failed")
1734                {
1735                    let status = Status::from_raw(status);
1736                    if status != Status::NOT_FOUND && status != Status::ALREADY_BOUND {
1737                        assert!(false, "Unexpected status {:}", status);
1738                    }
1739                }
1740            },
1741            async {
1742                let (_dir_proxy, dir_server_end) =
1743                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1744                if let Err(status) = volume_proxy
1745                    .mount(
1746                        dir_server_end,
1747                        MountOptions { crypt: Some(client2), ..MountOptions::default() },
1748                    )
1749                    .await
1750                    .expect("mount (fidl) failed")
1751                {
1752                    let status = Status::from_raw(status);
1753                    if status != Status::NOT_FOUND && status != Status::ALREADY_BOUND {
1754                        assert!(false, "Unexpected status {:}", status);
1755                    }
1756                }
1757            },
1758            async {
1759                let volumes_directory = volumes_directory.clone();
1760                let wait_time = rand::random_range(0..5);
1761                fasync::Timer::new(Duration::from_millis(wait_time)).await;
1762                if let Err(err) = volumes_directory.remove_volume("encrypted").await {
1763                    assert!(
1764                        FxfsError::NotFound.matches(&err) || FxfsError::AlreadyBound.matches(&err),
1765                        "Unexpected error {:?}",
1766                        err
1767                    );
1768                }
1769            },
1770            async {
1771                let volumes_directory = volumes_directory.clone();
1772                let wait_time = rand::random_range(0..5);
1773                fasync::Timer::new(Duration::from_millis(wait_time)).await;
1774                if let Err(err) = volumes_directory.remove_volume("encrypted").await {
1775                    assert!(
1776                        FxfsError::NotFound.matches(&err) || FxfsError::AlreadyBound.matches(&err),
1777                        "Unexpected error {:?}",
1778                        err
1779                    );
1780                }
1781            },
1782            async {
1783                let volumes_directory = volumes_directory.clone();
1784                let wait_time = rand::random_range(0..5);
1785                fasync::Timer::new(Duration::from_millis(wait_time)).await;
1786                let mut guard = volumes_directory.lock().await;
1787                match guard
1788                    .create_or_mount_volume(
1789                        "encrypted",
1790                        Some(crypt.clone()),
1791                        Mode::Create { guid: None, low_32_bit_object_ids: false },
1792                        false,
1793                    )
1794                    .await
1795                {
1796                    Ok(vol) => {
1797                        let store_id = vol.volume().store().store_object_id();
1798                        std::mem::drop(vol);
1799                        guard.unmount(store_id).await.expect("unmount failed");
1800                    }
1801                    Err(err) => {
1802                        assert!(
1803                            FxfsError::AlreadyExists.matches(&err)
1804                                || FxfsError::AlreadyBound.matches(&err),
1805                            "Unexpected error {:?}",
1806                            err
1807                        );
1808                    }
1809                }
1810            }
1811        );
1812        std::mem::drop(crypt_task1);
1813        std::mem::drop(crypt_task2);
1814        // Make sure the background thread that actually calls terminate() on the volume finishes
1815        // before exiting the test. terminate() should be a no-op since we already verified
1816        // mounted_directories is empty, but the volume's terminate() future in the background task
1817        // may still be outstanding. As both the background task and VolumesDirectory::terminate()
1818        // hold the write lock, we use that to block until the background task has completed.
1819        volumes_directory.terminate().await;
1820    }
1821
1822    #[fuchsia::test]
1823    async fn test_shutdown_volume() {
1824        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1825        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1826        let blob_resupplied_count =
1827            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1828        let volumes_directory = VolumesDirectory::new(
1829            root_volume(filesystem.clone()).await.unwrap(),
1830            Weak::new(),
1831            None,
1832            blob_resupplied_count,
1833            MemoryPressureConfig::default(),
1834        )
1835        .await
1836        .unwrap();
1837
1838        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1839        let vol = volumes_directory
1840            .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1841            .await
1842            .expect("create encrypted volume failed");
1843
1844        let (dir_proxy, dir_server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1845
1846        volumes_directory.serve_volume(&vol, dir_server_end, false).expect("serve_volume failed");
1847
1848        let admin_proxy = connect_to_protocol_at_dir_svc::<AdminMarker>(&dir_proxy)
1849            .expect("Unable to connect to admin service");
1850
1851        admin_proxy.shutdown().await.expect("shutdown failed");
1852
1853        assert!(volumes_directory.mounted_volumes.lock().await.is_empty());
1854    }
1855
1856    #[fuchsia::test]
1857    async fn test_byte_limit_persistence() {
1858        const BYTES_LIMIT_1: u64 = 123456;
1859        const BYTES_LIMIT_2: u64 = 456789;
1860        const VOLUME_NAME: &str = "A";
1861        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
1862        {
1863            let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1864            let blob_resupplied_count =
1865                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1866            let volumes_directory = VolumesDirectory::new(
1867                root_volume(filesystem.clone()).await.unwrap(),
1868                Weak::new(),
1869                None,
1870                blob_resupplied_count,
1871                MemoryPressureConfig::default(),
1872            )
1873            .await
1874            .unwrap();
1875
1876            volumes_directory
1877                .create_and_mount_volume(VOLUME_NAME, None, false, None)
1878                .await
1879                .expect("create unencrypted volume failed");
1880
1881            let (volume_proxy, _scope) =
1882                serve_startup_volume_proxy(&volumes_directory, VOLUME_NAME);
1883
1884            volume_proxy.set_limit(BYTES_LIMIT_1).await.unwrap().expect("To set limits");
1885            {
1886                let limits = (filesystem.allocator() as Arc<Allocator>).owner_byte_limits();
1887                assert_eq!(limits.len(), 1);
1888                assert_eq!(limits[0].1, BYTES_LIMIT_1);
1889            }
1890
1891            volume_proxy.set_limit(BYTES_LIMIT_2).await.unwrap().expect("To set limits");
1892            {
1893                let limits = (filesystem.allocator() as Arc<Allocator>).owner_byte_limits();
1894                assert_eq!(limits.len(), 1);
1895                assert_eq!(limits[0].1, BYTES_LIMIT_2);
1896            }
1897            std::mem::drop(volume_proxy);
1898            volumes_directory.terminate().await;
1899            std::mem::drop(volumes_directory);
1900            filesystem.close().await.expect("close filesystem failed");
1901            device = filesystem.take_device().await;
1902        }
1903        device.ensure_unique();
1904        device.reopen(false);
1905        {
1906            let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1907            fsck(filesystem.clone()).await.expect("Fsck");
1908            let blob_resupplied_count =
1909                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1910            let volumes_directory = VolumesDirectory::new(
1911                root_volume(filesystem.clone()).await.unwrap(),
1912                Weak::new(),
1913                None,
1914                blob_resupplied_count,
1915                MemoryPressureConfig::default(),
1916            )
1917            .await
1918            .unwrap();
1919            {
1920                let limits = (filesystem.allocator() as Arc<Allocator>).owner_byte_limits();
1921                assert_eq!(limits.len(), 1);
1922                assert_eq!(limits[0].1, BYTES_LIMIT_2);
1923            }
1924            volumes_directory.remove_volume(VOLUME_NAME).await.expect("Volume deletion failed");
1925            {
1926                let limits = (filesystem.allocator() as Arc<Allocator>).owner_byte_limits();
1927                assert_eq!(limits.len(), 0);
1928            }
1929            volumes_directory.terminate().await;
1930            std::mem::drop(volumes_directory);
1931            filesystem.close().await.expect("close filesystem failed");
1932            device = filesystem.take_device().await;
1933        }
1934        device.ensure_unique();
1935        device.reopen(false);
1936        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1937        fsck(filesystem.clone()).await.expect("Fsck");
1938        let limits = (filesystem.allocator() as Arc<Allocator>).owner_byte_limits();
1939        assert_eq!(limits.len(), 0);
1940    }
1941
1942    struct VolumeInfo {
1943        _scope: vfs::ExecutionScope,
1944        volume_proxy: VolumeProxy,
1945        file_proxy: fio::FileProxy,
1946    }
1947
1948    impl VolumeInfo {
1949        async fn new(volumes_directory: &Arc<VolumesDirectory>, name: &'static str) -> Self {
1950            let volume = volumes_directory
1951                .create_and_mount_volume(name, None, false, None)
1952                .await
1953                .expect("create unencrypted volume failed");
1954
1955            let (volume_dir_proxy, dir_server_end) =
1956                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1957            volumes_directory
1958                .serve_volume(&volume, dir_server_end, false)
1959                .expect("serve_volume failed");
1960
1961            let (volume_proxy, _scope) = serve_startup_volume_proxy(&volumes_directory, name);
1962
1963            let (root_proxy, root_server_end) =
1964                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1965            volume_dir_proxy
1966                .open(
1967                    "root",
1968                    fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
1969                    &Default::default(),
1970                    root_server_end.into_channel(),
1971                )
1972                .expect("Failed to open volume root");
1973
1974            let file_proxy = open_file_checked(
1975                &root_proxy,
1976                "foo",
1977                fio::Flags::FLAG_MAYBE_CREATE
1978                    | fio::PERM_READABLE
1979                    | fio::PERM_WRITABLE
1980                    | fio::Flags::PROTOCOL_FILE,
1981                &Default::default(),
1982            )
1983            .await;
1984            VolumeInfo { _scope, volume_proxy, file_proxy }
1985        }
1986    }
1987
1988    #[fuchsia::test]
1989    async fn test_limit_bytes() {
1990        const BYTES_LIMIT: u64 = 262_144; // 256KiB
1991        const BLOCK_SIZE: usize = 8192; // 8KiB
1992        let device = DeviceHolder::new(FakeDevice::new(BLOCK_SIZE.try_into().unwrap(), 512));
1993        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1994        let blob_resupplied_count =
1995            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1996        let volumes_directory = VolumesDirectory::new(
1997            root_volume(filesystem.clone()).await.unwrap(),
1998            Weak::new(),
1999            None,
2000            blob_resupplied_count,
2001            MemoryPressureConfig::default(),
2002        )
2003        .await
2004        .unwrap();
2005
2006        let vol = VolumeInfo::new(&volumes_directory, "foo").await;
2007        let old_info = {
2008            let (status, info) = vol.file_proxy.query_filesystem().await.expect("Getting fs info");
2009            assert_eq!(status, zx::Status::OK.into_raw());
2010            let info = info.unwrap();
2011            // With no limit set, the total filesystem size should be returned.
2012            assert!(info.total_bytes > BYTES_LIMIT);
2013            info
2014        };
2015
2016        vol.volume_proxy.set_limit(BYTES_LIMIT).await.unwrap().expect("To set limits");
2017        {
2018            let (status, info) = vol.file_proxy.query_filesystem().await.expect("Getting fs info");
2019            assert!(status == zx::Status::OK.into_raw());
2020            let new_info = info.unwrap();
2021            assert_eq!(new_info.total_bytes, BYTES_LIMIT);
2022            // Now since the limit is the volume limit, the space used should be the volume usage,
2023            // which should be strictly less than the filesystem.
2024            assert!(new_info.used_bytes < old_info.used_bytes);
2025        }
2026
2027        let zeros = vec![0u8; BLOCK_SIZE];
2028        // First write should succeed.
2029        assert_eq!(
2030            <u64 as TryInto<usize>>::try_into(
2031                vol.file_proxy
2032                    .write(&zeros)
2033                    .await
2034                    .expect("Failed Write message")
2035                    .expect("Failed write")
2036            )
2037            .unwrap(),
2038            BLOCK_SIZE
2039        );
2040        // Likely to run out of space before writing the full limit due to overheads.
2041        for _ in (BLOCK_SIZE..BYTES_LIMIT as usize).step_by(BLOCK_SIZE) {
2042            match vol.file_proxy.write(&zeros).await.expect("Failed Write message") {
2043                Err(_) => break,
2044                Ok(b) if b < BLOCK_SIZE.try_into().unwrap() => break,
2045                _ => (),
2046            };
2047        }
2048
2049        // Any further writes should fail with out of space.
2050        assert_eq!(
2051            vol.file_proxy
2052                .write(&zeros)
2053                .await
2054                .expect("Failed write message")
2055                .expect_err("Write should have been limited"),
2056            Status::NO_SPACE.into_raw()
2057        );
2058
2059        // Double the limit and try again. We should have write space again.
2060        vol.volume_proxy.set_limit(BYTES_LIMIT * 2).await.unwrap().expect("To set limits");
2061        assert_eq!(
2062            <u64 as TryInto<usize>>::try_into(
2063                vol.file_proxy
2064                    .write(&zeros)
2065                    .await
2066                    .expect("Failed Write message")
2067                    .expect("Failed write")
2068            )
2069            .unwrap(),
2070            BLOCK_SIZE
2071        );
2072
2073        vol.file_proxy.close().await.unwrap().expect("Failed to close file");
2074        volumes_directory.terminate().await;
2075        std::mem::drop(volumes_directory);
2076        filesystem.close().await.expect("close filesystem failed");
2077    }
2078
2079    #[fuchsia::test]
2080    async fn test_limit_bytes_two_hit_device_limit() {
2081        const BYTES_LIMIT: u64 = 3_145_728; // 3MiB
2082        const BLOCK_SIZE: usize = 8192; // 8KiB
2083        const BLOCK_COUNT: u32 = 512;
2084        let device =
2085            DeviceHolder::new(FakeDevice::new(BLOCK_SIZE.try_into().unwrap(), BLOCK_COUNT));
2086        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2087        let blob_resupplied_count =
2088            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2089        let volumes_directory = VolumesDirectory::new(
2090            root_volume(filesystem.clone()).await.unwrap(),
2091            Weak::new(),
2092            None,
2093            blob_resupplied_count,
2094            MemoryPressureConfig::default(),
2095        )
2096        .await
2097        .unwrap();
2098
2099        let a = VolumeInfo::new(&volumes_directory, "foo").await;
2100        let b = VolumeInfo::new(&volumes_directory, "bar").await;
2101        a.volume_proxy.set_limit(BYTES_LIMIT).await.unwrap().expect("To set limits");
2102        b.volume_proxy.set_limit(BYTES_LIMIT).await.unwrap().expect("To set limits");
2103        let mut a_written: u64 = 0;
2104        let mut b_written: u64 = 0;
2105
2106        // Write chunks of BLOCK_SIZE.
2107        let zeros = vec![0u8; BLOCK_SIZE];
2108
2109        // First write should succeed for both.
2110        assert_eq!(
2111            <u64 as TryInto<usize>>::try_into(
2112                a.file_proxy
2113                    .write(&zeros)
2114                    .await
2115                    .expect("Failed Write message")
2116                    .expect("Failed write")
2117            )
2118            .unwrap(),
2119            BLOCK_SIZE
2120        );
2121        a_written += BLOCK_SIZE as u64;
2122        assert_eq!(
2123            <u64 as TryInto<usize>>::try_into(
2124                b.file_proxy
2125                    .write(&zeros)
2126                    .await
2127                    .expect("Failed Write message")
2128                    .expect("Failed write")
2129            )
2130            .unwrap(),
2131            BLOCK_SIZE
2132        );
2133        b_written += BLOCK_SIZE as u64;
2134
2135        // Likely to run out of space before writing the full limit due to overheads.
2136        for _ in (BLOCK_SIZE..BYTES_LIMIT as usize).step_by(BLOCK_SIZE) {
2137            match a.file_proxy.write(&zeros).await.expect("Failed Write message") {
2138                Err(_) => break,
2139                Ok(bytes) => {
2140                    a_written += bytes;
2141                    if bytes < BLOCK_SIZE.try_into().unwrap() {
2142                        break;
2143                    }
2144                }
2145            };
2146        }
2147        // Any further writes should fail with out of space.
2148        assert_eq!(
2149            a.file_proxy
2150                .write(&zeros)
2151                .await
2152                .expect("Failed write message")
2153                .expect_err("Write should have been limited"),
2154            Status::NO_SPACE.into_raw()
2155        );
2156
2157        // Now write to the second volume. Likely to run out of space before writing the full limit
2158        // due to overheads.
2159        for _ in (BLOCK_SIZE..BYTES_LIMIT as usize).step_by(BLOCK_SIZE) {
2160            match b.file_proxy.write(&zeros).await.expect("Failed Write message") {
2161                Err(_) => break,
2162                Ok(bytes) => {
2163                    b_written += bytes;
2164                    if bytes < BLOCK_SIZE.try_into().unwrap() {
2165                        break;
2166                    }
2167                }
2168            };
2169        }
2170        // Any further writes should fail with out of space.
2171        assert_eq!(
2172            b.file_proxy
2173                .write(&zeros)
2174                .await
2175                .expect("Failed write message")
2176                .expect_err("Write should have been limited"),
2177            Status::NO_SPACE.into_raw()
2178        );
2179
2180        // Second volume should have failed very early.
2181        assert!(BLOCK_SIZE as u64 * BLOCK_COUNT as u64 - BYTES_LIMIT >= b_written);
2182        // First volume should have gotten further.
2183        assert!(BLOCK_SIZE as u64 * BLOCK_COUNT as u64 - BYTES_LIMIT <= a_written);
2184
2185        a.file_proxy.close().await.unwrap().expect("Failed to close file");
2186        b.file_proxy.close().await.unwrap().expect("Failed to close file");
2187        volumes_directory.terminate().await;
2188        std::mem::drop(volumes_directory);
2189        filesystem.close().await.expect("close filesystem failed");
2190    }
2191
2192    #[fuchsia::test(threads = 10)]
2193    async fn test_profile_start() {
2194        const PREMOUNT_BLOB: &str = "premount_blob";
2195        const PREMOUNT_NOBLOB: &str = "premount_noblob";
2196        const LIVE_BLOB: &str = "live_blob";
2197        const LIVE_NOBLOB: &str = "live_noblob";
2198
2199        const RECORDING_NAME: &str = "foo";
2200
2201        let device = {
2202            let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2203            let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2204            let blob_resupplied_count =
2205                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2206            let volumes_directory = VolumesDirectory::new(
2207                root_volume(filesystem.clone()).await.unwrap(),
2208                Weak::new(),
2209                None,
2210                blob_resupplied_count,
2211                MemoryPressureConfig::default(),
2212            )
2213            .await
2214            .unwrap();
2215            volumes_directory
2216                .create_and_mount_volume(PREMOUNT_BLOB, None, true, None)
2217                .await
2218                .unwrap();
2219            volumes_directory
2220                .create_and_mount_volume(PREMOUNT_NOBLOB, None, false, None)
2221                .await
2222                .unwrap();
2223            volumes_directory.create_and_mount_volume(LIVE_BLOB, None, true, None).await.unwrap();
2224            volumes_directory
2225                .create_and_mount_volume(LIVE_NOBLOB, None, false, None)
2226                .await
2227                .unwrap();
2228
2229            volumes_directory.terminate().await;
2230            std::mem::drop(volumes_directory);
2231            filesystem.close().await.expect("Filesystem close");
2232            filesystem.take_device().await
2233        };
2234
2235        device.ensure_unique();
2236        device.reopen(false);
2237        let device = {
2238            let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2239            let blob_resupplied_count =
2240                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2241            let volumes_directory = VolumesDirectory::new(
2242                root_volume(filesystem.clone()).await.unwrap(),
2243                Weak::new(),
2244                None,
2245                blob_resupplied_count,
2246                MemoryPressureConfig::default(),
2247            )
2248            .await
2249            .unwrap();
2250
2251            // Premount two volumes.
2252            let _premount_blob = volumes_directory
2253                .mount_volume(PREMOUNT_BLOB, None, true)
2254                .await
2255                .expect("Reopen volume");
2256            let _premount_noblob = volumes_directory
2257                .mount_volume(PREMOUNT_NOBLOB, None, false)
2258                .await
2259                .expect("Reopen volume");
2260
2261            // Start the recording, let it run a really long time, it doesn't need to end for this
2262            // test. If it does wait this long then it should trigger test timeouts.
2263            volumes_directory
2264                .clone()
2265                .record_or_replay_profile(None, RECORDING_NAME.to_owned(), 600)
2266                .await
2267                .expect("Recording");
2268
2269            // Live mount two volumes.
2270            let _live_blob =
2271                volumes_directory.mount_volume(LIVE_BLOB, None, true).await.expect("Reopen volume");
2272            let _live_noblob = volumes_directory
2273                .mount_volume(LIVE_NOBLOB, None, false)
2274                .await
2275                .expect("Reopen volume");
2276
2277            // Wait for the recordings to finish.
2278            volumes_directory.stop_profile_tasks().await;
2279
2280            volumes_directory.terminate().await;
2281            std::mem::drop(volumes_directory);
2282            filesystem.close().await.expect("Filesystem close");
2283            filesystem.take_device().await
2284        };
2285
2286        device.ensure_unique();
2287        device.reopen(false);
2288        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2289        {
2290            let blob_resupplied_count =
2291                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2292            let volumes_directory = VolumesDirectory::new(
2293                root_volume(filesystem.clone()).await.unwrap(),
2294                Weak::new(),
2295                None,
2296                blob_resupplied_count,
2297                MemoryPressureConfig::default(),
2298            )
2299            .await
2300            .unwrap();
2301
2302            let _premount_blob = volumes_directory
2303                .mount_volume(PREMOUNT_BLOB, None, true)
2304                .await
2305                .expect("Reopen volume");
2306            let _premount_noblob = volumes_directory
2307                .mount_volume(PREMOUNT_NOBLOB, None, false)
2308                .await
2309                .expect("Reopen volume");
2310            let _live_blob =
2311                volumes_directory.mount_volume(LIVE_BLOB, None, true).await.expect("Reopen volume");
2312            let _live_noblob = volumes_directory
2313                .mount_volume(LIVE_NOBLOB, None, false)
2314                .await
2315                .expect("Reopen volume");
2316
2317            // Verify which recordings ran based on the saved recordings.
2318            volumes_directory
2319                .delete_profile(PREMOUNT_BLOB, RECORDING_NAME)
2320                .await
2321                .expect("Finding profile to delete.");
2322            volumes_directory
2323                .delete_profile(PREMOUNT_NOBLOB, RECORDING_NAME)
2324                .await
2325                .expect("Finding profile to delete.");
2326            volumes_directory
2327                .delete_profile(LIVE_BLOB, RECORDING_NAME)
2328                .await
2329                .expect("Finding profile to delete.");
2330            volumes_directory
2331                .delete_profile(LIVE_NOBLOB, RECORDING_NAME)
2332                .await
2333                .expect("Finding profile to delete.");
2334
2335            volumes_directory.terminate().await;
2336        }
2337
2338        filesystem.close().await.expect("Filesystem close");
2339    }
2340
2341    #[fuchsia::test(threads = 10)]
2342    async fn test_profile_stop() {
2343        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2344        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2345        let blob_resupplied_count =
2346            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2347        let volumes_directory = VolumesDirectory::new(
2348            root_volume(filesystem.clone()).await.unwrap(),
2349            Weak::new(),
2350            None,
2351            blob_resupplied_count,
2352            MemoryPressureConfig::default(),
2353        )
2354        .await
2355        .unwrap();
2356        let volume =
2357            volumes_directory.create_and_mount_volume("foo", None, true, None).await.unwrap();
2358
2359        // Run the recording with no time at all and ensure that it still shuts down properly.
2360        volumes_directory
2361            .clone()
2362            .record_or_replay_profile(None, "foo".to_owned(), 0)
2363            .await
2364            .expect("Recording");
2365
2366        // Delete will succeed once the profile is completed.
2367        while volumes_directory.delete_profile("foo", "foo").await.is_err() {
2368            fasync::Timer::new(Duration::from_millis(10)).await;
2369        }
2370
2371        std::mem::drop(volume);
2372        volumes_directory.terminate().await;
2373        std::mem::drop(volumes_directory);
2374        filesystem.close().await.expect("Filesystem close");
2375    }
2376
2377    #[fuchsia::test(threads = 10)]
2378    async fn test_delete_profile() {
2379        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2380        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2381        let blob_resupplied_count =
2382            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2383        let volumes_directory = VolumesDirectory::new(
2384            root_volume(filesystem.clone()).await.unwrap(),
2385            Weak::new(),
2386            None,
2387            blob_resupplied_count,
2388            MemoryPressureConfig::default(),
2389        )
2390        .await
2391        .unwrap();
2392        let volume =
2393            volumes_directory.create_and_mount_volume("foo", None, true, None).await.unwrap();
2394
2395        volumes_directory
2396            .clone()
2397            .record_or_replay_profile(None, "foo".to_owned(), 600)
2398            .await
2399            .expect("Recording");
2400
2401        // Deletion fails during in-flight recording.
2402        assert_eq!(
2403            volumes_directory.delete_profile("foo", "foo").await.expect_err("File shouldn't exist"),
2404            Status::SHOULD_WAIT
2405        );
2406
2407        volumes_directory.stop_profile_tasks().await;
2408
2409        // Missing volume name.
2410        assert_eq!(
2411            volumes_directory.delete_profile("bar", "foo").await.expect_err("File shouldn't exist"),
2412            Status::NOT_FOUND
2413        );
2414
2415        // Missing Profile name.
2416        assert_eq!(
2417            volumes_directory.delete_profile("foo", "bar").await.expect_err("File shouldn't exist"),
2418            Status::NOT_FOUND
2419        );
2420
2421        // Deletion should now succeed as the profile will be placed as part of `finish_profiling()`
2422        volumes_directory.delete_profile("foo", "foo").await.expect("Deleting");
2423
2424        // Deletion fails as the file shouldn't exist anymore.
2425        assert_eq!(
2426            volumes_directory.delete_profile("foo", "foo").await.expect_err("File shouldn't exist"),
2427            Status::NOT_FOUND
2428        );
2429
2430        std::mem::drop(volume);
2431        volumes_directory.terminate().await;
2432        std::mem::drop(volumes_directory);
2433        filesystem.close().await.expect("Filesystem close");
2434    }
2435
2436    #[fuchsia::test(threads = 10)]
2437    async fn test_profile_start_single_volume() {
2438        const TEST_VOLUME: &str = "test_1234";
2439        const TEST_RECORDING: &str = "test_5678";
2440        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
2441
2442        let fixture = TestFixture::new().await;
2443        {
2444            let volumes_directory = fixture.volumes_directory();
2445            // Start recording for volume that doesn't exist.
2446            assert_eq!(
2447                volumes_directory
2448                    .record_or_replay_profile(
2449                        Some(TEST_VOLUME.to_owned()),
2450                        TEST_RECORDING.to_owned(),
2451                        1
2452                    )
2453                    .await
2454                    .expect_err("Volumes doesn't exist yet"),
2455                Status::NOT_FOUND
2456            );
2457
2458            // Create the volume unmounted.
2459            {
2460                let volume = volumes_directory
2461                    .create_and_mount_volume(TEST_VOLUME, Some(crypt.clone()), false, None)
2462                    .await
2463                    .unwrap();
2464                volumes_directory
2465                    .lock()
2466                    .await
2467                    .unmount(volume.volume().store().store_object_id())
2468                    .await
2469                    .expect("unmount failed");
2470            }
2471
2472            // Start recording for volume that is not mounted.
2473            assert_eq!(
2474                volumes_directory
2475                    .record_or_replay_profile(
2476                        Some(TEST_VOLUME.to_owned()),
2477                        TEST_RECORDING.to_owned(),
2478                        1
2479                    )
2480                    .await
2481                    .expect_err("Volumes doesn't exist yet"),
2482                Status::UNAVAILABLE
2483            );
2484
2485            // Remount the volume and try again.
2486            let volume = volumes_directory
2487                .mount_volume(TEST_VOLUME, Some(crypt.clone()), false)
2488                .await
2489                .expect("Remount volume");
2490            volumes_directory
2491                .record_or_replay_profile(
2492                    Some(TEST_VOLUME.to_owned()),
2493                    TEST_RECORDING.to_owned(),
2494                    1,
2495                )
2496                .await
2497                .expect("Starting recording");
2498
2499            // Stop the recording and check that it was created on the new volume.
2500            volumes_directory.stop_profile_tasks().await;
2501            {
2502                let profile_dir = volume.volume().get_profile_directory().await.unwrap();
2503                assert!(profile_dir.lookup(TEST_RECORDING).await.unwrap().is_some());
2504            }
2505
2506            // Should be no recording for the other volume.
2507            {
2508                let profile_dir = fixture.volume().volume().get_profile_directory().await.unwrap();
2509                assert!(profile_dir.lookup(TEST_RECORDING).await.unwrap().is_none());
2510            }
2511        }
2512        fixture.close().await;
2513    }
2514
2515    #[fuchsia::test(threads = 10)]
2516    async fn test_delete_volume_while_flushing() {
2517        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2518        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2519        let blob_resupplied_count =
2520            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2521        let volumes_directory = VolumesDirectory::new(
2522            root_volume(filesystem.clone()).await.unwrap(),
2523            Weak::new(),
2524            None,
2525            blob_resupplied_count,
2526            MemoryPressureConfig::default(),
2527        )
2528        .await
2529        .unwrap();
2530        let name = "vol";
2531        let volume =
2532            volumes_directory.create_and_mount_volume(name, None, false, None).await.unwrap();
2533        let mut transaction = filesystem
2534            .clone()
2535            .new_transaction(
2536                lock_keys![LockKey::object(
2537                    volume.volume().store().store_object_id(),
2538                    volume.root_dir().directory().object_id()
2539                )],
2540                Options::default(),
2541            )
2542            .await
2543            .unwrap();
2544        volume
2545            .root_dir()
2546            .directory()
2547            .create_child_file(&mut transaction, "foo")
2548            .await
2549            .expect("create_child_file failed");
2550        transaction.commit().await.expect("commit failed");
2551        volumes_directory
2552            .lock()
2553            .await
2554            .unmount(volume.volume().store().store_object_id())
2555            .await
2556            .expect("unmount failed");
2557
2558        let filesystem_clone = filesystem.clone();
2559        let filesystem_clone2 = filesystem.clone();
2560        let volumes_directory_clone1 = volumes_directory.clone();
2561        let volumes_directory_clone2 = volumes_directory.clone();
2562        let root_store_object_id = filesystem.root_store().store_object_id();
2563        let store_info_object_id = volume.volume().store().store_info_handle_object_id().unwrap();
2564        join!(
2565            async move {
2566                // Take a lock that the EndFlush transaction requires, so we interleave removing
2567                // the volume between StartFlush and EndFlush.  Release it on a timer since once the
2568                // volume is deleted, `remove_volume` needs to take this lock as well to tombstone
2569                // the store info.
2570                let _guard = filesystem_clone2
2571                    .lock_manager()
2572                    .read_lock(lock_keys![LockKey::object(
2573                        root_store_object_id,
2574                        store_info_object_id,
2575                    )])
2576                    .await;
2577                fasync::Timer::new(Duration::from_millis(200)).await;
2578            },
2579            async move {
2580                filesystem_clone.journal().force_compact().await.expect("Compact failed");
2581            },
2582            async move {
2583                if let Err(e) = volumes_directory_clone1.remove_volume(name).await {
2584                    if !FxfsError::NotFound.matches(&e) {
2585                        panic!("remove_volume failed: {e:?}");
2586                    }
2587                }
2588            },
2589            async move {
2590                if let Err(e) = volumes_directory_clone2.remove_volume(name).await {
2591                    if !FxfsError::NotFound.matches(&e) {
2592                        panic!("remove_volume failed: {e:?}");
2593                    }
2594                }
2595            },
2596        );
2597        volumes_directory.terminate().await;
2598        std::mem::drop(volumes_directory);
2599        filesystem.close().await.expect("Filesystem close");
2600    }
2601
2602    // This mostly just ensures that we exercise the code path. It was added because the path at
2603    // one point contained a deadlock.
2604    #[fuchsia::test(threads = 10)]
2605    async fn test_flush_before_mark_dirty_under_critical_memory_pressure() {
2606        let fixture = TestFixture::new().await;
2607        // Memory is critical, and it's always our fault.
2608        let _ = fixture
2609            .memory_pressure_proxy()
2610            .on_level_changed(MemoryPressureLevel::Critical)
2611            .await
2612            .expect("memory pressure FIDL");
2613        fixture.volumes_directory().max_dirty_bytes_when_critical.store(1, Ordering::Relaxed);
2614
2615        let root = fixture.root();
2616        let file = open_file_checked(
2617            &root,
2618            "foo",
2619            fio::Flags::FLAG_MAYBE_CREATE
2620                | fio::PERM_READABLE
2621                | fio::PERM_WRITABLE
2622                | fio::Flags::PROTOCOL_FILE,
2623            &Default::default(),
2624        )
2625        .await;
2626
2627        file.resize((zx::system_get_page_size() * 2).into())
2628            .await
2629            .expect("resize (FIDL)")
2630            .expect("resize failed");
2631        // The above resize creates zero pages which aren't dirty pages and don't contribute to the
2632        // dirty bytes count but still need to be flushed. The first write below will cross the
2633        // `max_dirty_bytes_when_critical` threshold but `minimize_memory` won't flush the file
2634        // because the zero pages aren't dirty pages. The second write below will also cross the
2635        // `max_dirty_bytes_when_critical` threshold and since the first write dirtied a page, a
2636        // flush will occur. The flush cleans the 2nd page which is a zero page and the kernel is
2637        // actively trying to dirty. The kernel doesn't end up dirtying the 2nd page because it's
2638        // concerned that the clean and dirty raced so it issues another dirty request for the same
2639        // page. The duplicate dirty request again crosses the `max_dirty_bytes_when_critical`
2640        // threshold. The file thinks that it has a dirty page so `minimize_memory` flushes the
2641        // file. `zx_pager_query_dirty_ranges` doesn't return any pages because the kernel didn't
2642        // actually dirty the page that fxfs thought it did. This causes only the file metadata to
2643        // be flushed and the dirty page count to not be reduced. The 2nd page finally gets dirtied
2644        // and now fxfs thinks it has 2 dirty pages instead of 1. `PagedObjectHandle` knows that
2645        // this can happen and will fix up the counts on the next flush. This test doesn't want to
2646        // deal with all of that so it syncs here to clean the zero pages that would cause that
2647        // mess.
2648        file.sync().await.expect("Failed to make sync call").expect("sync failed");
2649
2650        let vmo = file
2651            .get_backing_memory(fio::VmoFlags::READ | fio::VmoFlags::WRITE)
2652            .await
2653            .expect("get_backing_memory (FIDL)")
2654            .expect("get_backing_memory");
2655
2656        let buf = [0xAAu8];
2657        // One call to get dirty bytes over 0, the second to force a flush during mark_dirty.
2658        vmo.write(&buf, 0).expect("Writing to create dirty bytes");
2659        let before = fixture.volumes_directory().pager_dirty_bytes_count.load();
2660        vmo.write(&buf, zx::system_get_page_size().into())
2661            .expect("Writing to force a flush during mark_dirty");
2662        // This is still the page size because we forced a flush of the first write during the
2663        // second write.
2664        assert_eq!(fixture.volumes_directory().pager_dirty_bytes_count.load(), before,);
2665
2666        fixture.close().await;
2667    }
2668
2669    #[fuchsia::test(threads = 10)]
2670    async fn test_delete_crypt_for_volume() {
2671        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2672        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2673        let store_id;
2674        {
2675            let blob_resupplied_count =
2676                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2677            let volumes_directory = VolumesDirectory::new(
2678                root_volume(filesystem.clone()).await.unwrap(),
2679                Weak::new(),
2680                None,
2681                blob_resupplied_count,
2682                MemoryPressureConfig::default(),
2683            )
2684            .await
2685            .unwrap();
2686            let name = "vol";
2687            let crypt = Arc::new(new_insecure_crypt());
2688            let volume = volumes_directory
2689                .create_and_mount_volume(name, Some(crypt.clone()), false, None)
2690                .await
2691                .unwrap();
2692            store_id = volume.volume().store().store_object_id();
2693            // Make sure the volume has some journaled mutations.
2694            let mut transaction = filesystem
2695                .clone()
2696                .new_transaction(
2697                    lock_keys![LockKey::object(
2698                        volume.volume().store().store_object_id(),
2699                        volume.root_dir().directory().object_id()
2700                    )],
2701                    Options::default(),
2702                )
2703                .await
2704                .unwrap();
2705            volume
2706                .root_dir()
2707                .directory()
2708                .create_child_file(&mut transaction, "foo")
2709                .await
2710                .expect("create_child_file failed");
2711            transaction.commit().await.expect("commit failed");
2712
2713            let (volume_dir_proxy, dir_server_end) =
2714                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2715            volumes_directory
2716                .serve_volume(&volume, dir_server_end, false)
2717                .expect("serve_volume failed");
2718            let (root_dir, root_server_end) =
2719                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2720            volume_dir_proxy
2721                .open(
2722                    "root",
2723                    fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
2724                    &Default::default(),
2725                    root_server_end.into_channel(),
2726                )
2727                .expect("Failed to open volume root");
2728
2729            let filesystem_clone = filesystem.clone();
2730            join!(
2731                async move {
2732                    filesystem_clone.journal().force_compact().await.expect("Compact failed");
2733                },
2734                async move {
2735                    let mut i = 0;
2736                    while let Ok(_) = fuchsia_fs::directory::open_file(
2737                        &root_dir,
2738                        &format!("foo{i}"),
2739                        fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE,
2740                    )
2741                    .await
2742                    {
2743                        i += 1;
2744                    }
2745                },
2746                async move {
2747                    crypt.shutdown();
2748                },
2749            );
2750
2751            // Make sure we can still ask the volume to shutdown.
2752            let (admin_proxy, server_end) =
2753                fidl::endpoints::create_proxy::<fidl_fuchsia_fs::AdminMarker>();
2754            volume_dir_proxy
2755                .open(
2756                    &format!("svc/{}", fidl_fuchsia_fs::AdminMarker::PROTOCOL_NAME),
2757                    fio::Flags::PROTOCOL_SERVICE,
2758                    &Default::default(),
2759                    server_end.into(),
2760                )
2761                .expect("Failed to open Admin connection");
2762            admin_proxy.shutdown().await.expect("shutdown failed");
2763
2764            volumes_directory.terminate().await;
2765        }
2766        filesystem.close().await.expect("Filesystem close");
2767        let device = filesystem.take_device().await;
2768        device.reopen(false);
2769        let filesystem = FxFilesystem::open(device).await.expect("open failed");
2770        let options = FsckOptions { fail_on_warning: true, ..Default::default() };
2771        fsck_with_options(filesystem.clone(), &options).await.expect("fsck failed");
2772        fsck_volume_with_options(
2773            filesystem.as_ref(),
2774            &options,
2775            store_id,
2776            Some(Arc::new(new_insecure_crypt())),
2777        )
2778        .await
2779        .expect("fsck_volume failed");
2780        filesystem.close().await.expect("Filesystem close");
2781    }
2782
2783    /// Tests writing a partition image and installing a volume contained within.
2784    #[fuchsia::test(threads = 10)]
2785    async fn test_volume_installation() {
2786        let fixture = TestFixture::open(
2787            DeviceHolder::new(FakeDevice::new(1024, 4096)),
2788            TestFixtureOptions { format: true, encrypted: false, ..Default::default() },
2789        )
2790        .await;
2791
2792        // Create a file "foo" in the existing volume "vol". This should be gone after installation.
2793        {
2794            let file = open_file_checked(
2795                fixture.root(),
2796                "foo",
2797                fio::Flags::PROTOCOL_FILE | fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2798                &Default::default(),
2799            )
2800            .await;
2801            file.write("Hello, world!".as_bytes()).await.unwrap().expect("write failed");
2802        };
2803
2804        // Create another in-memory partition image with a different set of files.
2805        let image = {
2806            let inner_fixture = TestFixture::open(
2807                DeviceHolder::new(FakeDevice::new(512, 4096)),
2808                TestFixtureOptions { format: true, encrypted: false, ..Default::default() },
2809            )
2810            .await;
2811            let file = open_file_checked(
2812                inner_fixture.root(),
2813                "bar",
2814                fio::Flags::PROTOCOL_FILE | fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2815                &Default::default(),
2816            )
2817            .await;
2818            file.write("Well, this is new...".as_bytes()).await.unwrap().expect("write failed");
2819            file.close().await.unwrap().expect("close error");
2820            inner_fixture.close().await
2821        };
2822
2823        // Write the partition image to a file "install" in a new volume called "src".
2824        {
2825            let (src_out_dir, server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2826            fixture
2827                .volumes_directory()
2828                .create_and_serve_volume("src", server_end, Default::default(), Default::default())
2829                .await
2830                .unwrap();
2831            let src_root = open_dir_checked(
2832                &src_out_dir,
2833                "root",
2834                fio::PERM_READABLE | fio::PERM_WRITABLE,
2835                Default::default(),
2836            )
2837            .await;
2838            let file = open_file_checked(
2839                &src_root,
2840                "image",
2841                fio::Flags::PROTOCOL_FILE | fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2842                &Default::default(),
2843            )
2844            .await;
2845            write_image_to_file(image, file).await;
2846        };
2847
2848        // Installation should not be possible yet since both volumes are still mounted.
2849        assert!(
2850            fixture.volumes_directory().install_volume("src", "image", "vol").await.is_err(),
2851            "volume installation should fail while either src/dst is still mounted"
2852        );
2853
2854        // Now let's re-mount the filesystem manually without a fixture, install the volumes, and
2855        // spin up a new fixture to verify the result.
2856        let device = fixture.close().await;
2857        let fs = FxFilesystem::open(device).await.unwrap();
2858        {
2859            let root = root_volume(fs.clone()).await.unwrap();
2860            root.install_volume("src", "image", "vol").await.unwrap();
2861        }
2862        fs.close().await.unwrap();
2863        let device = fs.take_device().await;
2864        device.reopen(/*read_only*/ true);
2865        let fixture = TestFixture::open(
2866            device,
2867            TestFixtureOptions { encrypted: false, format: false, ..Default::default() },
2868        )
2869        .await;
2870
2871        // Ensure that the "src" volume is now gone, and the old file "foo" is gone from "vol".
2872        assert!(
2873            fixture.volumes_directory().mount_volume("src", None, false).await.is_err(),
2874            "src volume should be deleted after installation"
2875        );
2876        assert!(
2877            testing::open_file(
2878                fixture.volume_out_dir(),
2879                "foo",
2880                fio::PERM_READABLE,
2881                &Default::default()
2882            )
2883            .await
2884            .is_err(),
2885            "foo should be deleted after installation"
2886        );
2887
2888        // Check that we can find the contents of "vol" that we installed from the image.
2889        let file =
2890            open_file_checked(fixture.root(), "bar", fio::PERM_READABLE, &Default::default()).await;
2891        let data = file.read(fio::MAX_TRANSFER_SIZE).await.unwrap().expect("read failed");
2892        assert_eq!(String::from_utf8(data).unwrap(), "Well, this is new...");
2893        file.close().await.unwrap().unwrap();
2894
2895        fixture.close().await;
2896    }
2897
2898    #[fuchsia::test]
2899    async fn test_create_with_low_32_bit_ids() {
2900        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2901        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2902        let blob_resupplied_count =
2903            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2904
2905        {
2906            let volumes_directory = VolumesDirectory::new(
2907                root_volume(filesystem.clone()).await.unwrap(),
2908                Weak::new(),
2909                None,
2910                blob_resupplied_count,
2911                MemoryPressureConfig::default(),
2912            )
2913            .await
2914            .unwrap();
2915
2916            let mut guard = volumes_directory.lock().await;
2917
2918            let vol = guard
2919                .create_or_mount_volume(
2920                    "low_32",
2921                    None,
2922                    Mode::Create { guid: None, low_32_bit_object_ids: true },
2923                    false,
2924                )
2925                .await
2926                .expect("create volume failed");
2927
2928            let root_dir = vol.volume().store().root_directory_object_id();
2929            let root_dir = fxfs::object_store::Directory::open(vol.volume().store(), root_dir)
2930                .await
2931                .expect("open failed");
2932
2933            let mut transaction = filesystem
2934                .clone()
2935                .new_transaction(
2936                    lock_keys![LockKey::object(
2937                        vol.volume().store().store_object_id(),
2938                        root_dir.object_id()
2939                    )],
2940                    Options::default(),
2941                )
2942                .await
2943                .expect("new_transaction failed");
2944
2945            let object = root_dir
2946                .create_child_file(&mut transaction, "test")
2947                .await
2948                .expect("create_child_file failed");
2949
2950            // We can't check LastObjectIdInfo as it is private, but we can verify behavior.
2951            assert!(object.object_id() < 1 << 32);
2952            transaction.commit().await.expect("commit failed");
2953        };
2954
2955        filesystem.close().await.expect("close filesystem failed");
2956
2957        // Reopen and verify persistence
2958        let device = filesystem.take_device().await;
2959        device.reopen(false);
2960        let filesystem = FxFilesystem::open(device).await.unwrap();
2961        let blob_resupplied_count =
2962            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2963        let volumes_directory = VolumesDirectory::new(
2964            root_volume(filesystem.clone()).await.unwrap(),
2965            Weak::new(),
2966            None,
2967            blob_resupplied_count,
2968            MemoryPressureConfig::default(),
2969        )
2970        .await
2971        .unwrap();
2972
2973        // Mount the volume again, and check that new files are still created with expected IDs.
2974        {
2975            let mut guard = volumes_directory.lock().await;
2976
2977            let vol = guard
2978                .create_or_mount_volume("low_32", None, Mode::Mount, false)
2979                .await
2980                .expect("mount volume failed");
2981
2982            let root_dir = vol.volume().store().root_directory_object_id();
2983            let root_dir = fxfs::object_store::Directory::open(vol.volume().store(), root_dir)
2984                .await
2985                .expect("open failed");
2986
2987            let mut transaction = filesystem
2988                .clone()
2989                .new_transaction(
2990                    lock_keys![LockKey::object(
2991                        vol.volume().store().store_object_id(),
2992                        root_dir.object_id()
2993                    )],
2994                    Options::default(),
2995                )
2996                .await
2997                .expect("new_transaction failed");
2998
2999            let object = root_dir
3000                .create_child_file(&mut transaction, "test2")
3001                .await
3002                .expect("create_child_file failed");
3003
3004            assert!(object.object_id() < 1 << 32);
3005            transaction.commit().await.expect("commit failed");
3006        }
3007
3008        filesystem.close().await.expect("close filesystem failed");
3009    }
3010
3011    #[fuchsia::test(threads = 10)]
3012    async fn test_race_unmount_and_flush_with_crypt_error() {
3013        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
3014        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
3015        let blob_resupplied_count = Arc::new(PageRefaultCounter::new().unwrap());
3016        let volumes_directory = VolumesDirectory::new(
3017            root_volume(filesystem.clone()).await.unwrap(),
3018            Weak::new(),
3019            None,
3020            blob_resupplied_count,
3021            MemoryPressureConfig::default(),
3022        )
3023        .await
3024        .unwrap();
3025
3026        let crypt_service = Arc::new(fxfs_crypt::CryptService::new());
3027        crypt_service
3028            .add_wrapping_key(0, fxfs_insecure_crypto::DATA_KEY.to_vec())
3029            .expect("add_wrapping_key failed");
3030        crypt_service
3031            .add_wrapping_key(1, fxfs_insecure_crypto::METADATA_KEY.to_vec())
3032            .expect("add_wrapping_key failed");
3033        crypt_service.set_active_key(KeyPurpose::Data, 0).expect("set_active_key failed");
3034        crypt_service.set_active_key(KeyPurpose::Metadata, 1).expect("set_active_key failed");
3035
3036        for _ in 0..20 {
3037            let (client, mut stream) = create_request_stream::<fidl_fuchsia_fxfs::CryptMarker>();
3038            let (close_tx, mut close_rx) = futures::channel::oneshot::channel::<()>();
3039
3040            let crypt_task = fasync::Task::spawn(async move {
3041                loop {
3042                    futures::select! {
3043                        _ = close_rx => return, // Close signal
3044                        request = stream.try_next() => {
3045                            match request {
3046                                Ok(Some(CryptRequest::CreateKey { responder, .. })) => {
3047                                    responder.send(Ok((&[0; 16], &[0; 48], &[0; 32]))).unwrap();
3048                                }
3049                                Ok(Some(CryptRequest::CreateKeyWithId {
3050                                        wrapping_key_id, responder, .. })) => {
3051                                    let key = WrappedKey::Fxfs(FxfsKey {
3052                                        wrapping_key_id,
3053                                        wrapped_key: [0u8; 48],
3054                                    });
3055                                    responder.send(Ok((&key, &[0; 32]))).unwrap();
3056                                }
3057                                Ok(Some(CryptRequest::UnwrapKey { responder, .. })) => {
3058                                    responder.send(Ok(&vec![0; 32])).unwrap();
3059                                }
3060                                _ => return,
3061                            }
3062                        }
3063                    }
3064                }
3065            });
3066
3067            let volume = volumes_directory
3068                .create_and_mount_volume(
3069                    "encrypted",
3070                    Some(Arc::new(RemoteCrypt::new(client))),
3071                    false,
3072                    None,
3073                )
3074                .await
3075                .unwrap();
3076
3077            // Write some data to dirty the journal.
3078            {
3079                let mut transaction = filesystem
3080                    .clone()
3081                    .new_transaction(
3082                        lock_keys![LockKey::object(
3083                            volume.volume().store().store_object_id(),
3084                            volume.root_dir().directory().object_id()
3085                        )],
3086                        Options::default(),
3087                    )
3088                    .await
3089                    .unwrap();
3090                volume
3091                    .root_dir()
3092                    .directory()
3093                    .create_child_file(&mut transaction, "foo")
3094                    .await
3095                    .expect("create_child_file failed");
3096                transaction.commit().await.expect("commit failed");
3097            }
3098
3099            let (dir_proxy, dir_server_end) =
3100                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
3101            volumes_directory.serve_volume(&volume, dir_server_end, false).unwrap();
3102            volumes_directory.lock().await.auto_unmount(volume.volume().store().store_object_id());
3103
3104            // Kill crypt.  The next usage should be in flush.
3105            let _ = close_tx.send(());
3106
3107            let filesystem_clone = filesystem.clone();
3108            let compact_task = fasync::Task::spawn(async move {
3109                // Flush should not fail, because that would close the journal for the rest of the
3110                // filesystem.  Instead, the volume should be force-locked and flushed (which
3111                // doesn't depend on crypt).
3112                filesystem_clone.object_manager().flush().await.expect("flush failed");
3113            });
3114
3115            // Trigger unmount.
3116            std::mem::drop(dir_proxy);
3117
3118            // Wait for volume to be unmounted, so we can clean up for the next iteration.
3119            let store_id = volume.volume().store().store_object_id();
3120            loop {
3121                {
3122                    let guard = volumes_directory.lock().await;
3123                    if !guard.mounted_volumes.contains_key(&store_id) {
3124                        break;
3125                    }
3126                }
3127                fasync::Timer::new(Duration::from_millis(10)).await;
3128            }
3129
3130            join!(compact_task, crypt_task);
3131            volumes_directory.remove_volume("encrypted").await.expect("remove_volume failed");
3132        }
3133        volumes_directory.terminate().await;
3134        filesystem.close().await.expect("close filesystem failed");
3135    }
3136}