Skip to main content

fxfs_platform/fuchsia/
volumes_directory.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::RemoteCrypt;
6use crate::fuchsia::component::map_to_raw_status;
7use crate::fuchsia::directory::FxDirectory;
8use crate::fuchsia::errors::map_to_status;
9use crate::fuchsia::fxblob::BlobDirectory;
10use crate::fuchsia::memory_pressure::{MemoryPressureLevel, MemoryPressureMonitor};
11use crate::fuchsia::profile::new_profile_state;
12use crate::fuchsia::volume::{FxVolume, FxVolumeAndRoot, MemoryPressureConfig, RootDir};
13use anyhow::{Context, Error, anyhow, ensure};
14use async_trait::async_trait;
15use fidl::endpoints::{DiscoverableProtocolMarker, ServerEnd};
16use fidl_fuchsia_fs::{AdminMarker, AdminRequest, AdminRequestStream};
17use fidl_fuchsia_fs_startup::{
18    CheckOptions, CreateOptions, MountOptions, VolumeRequest, VolumeRequestStream,
19};
20use fidl_fuchsia_fxfs::{FileBackedVolumeProviderMarker, ProjectIdMarker};
21use fidl_fuchsia_io as fio;
22use fs_inspect::{FsInspectTree, FsInspectVolume};
23use fuchsia_async as fasync;
24use futures::stream::FuturesUnordered;
25use futures::{StreamExt, TryStreamExt};
26use fxfs::errors::FxfsError;
27use fxfs::fsck;
28use fxfs::log::*;
29use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
30use fxfs::object_store::volume::RootVolume;
31use fxfs::object_store::{
32    Directory, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions, StoreOwner,
33};
34use fxfs_crypto::Crypt;
35use fxfs_trace::{TraceFutureExt, trace_future_args};
36use refaults_vmo::PageRefaultCounter;
37use rustc_hash::FxHashMap as HashMap;
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::sync::{Arc, OnceLock, Weak};
40use vfs::directory::entry_container::MutableDirectory;
41use vfs::directory::helper::DirectlyMutable;
42const MEBIBYTE: u64 = 1024 * 1024;
43
44struct ProfileState {
45    task: fasync::Task<()>,
46    profile_name: String,
47    all_volumes: bool,
48}
49
50/// VolumesDirectory is a special pseudo-directory used to enumerate and operate on volumes.
51/// Volume creation happens via fuchsia.fs.startup.Volumes.Create, rather than open.
52///
53/// Note that VolumesDirectory assumes exclusive access to |root_volume| and if volumes are
54/// manipulated from elsewhere, strange things will happen.
55pub struct VolumesDirectory {
56    root_volume: RootVolume,
57    directory_node: Arc<vfs::directory::immutable::Simple>,
58    mounted_volumes: futures::lock::Mutex<HashMap<u64, MountedVolume>>,
59    inspect_tree: Weak<FsInspectTree>,
60    mem_monitor: Option<MemoryPressureMonitor>,
61    blob_resupplied_count: Arc<PageRefaultCounter>,
62    // The state of profile recordings. Should be locked *after* mounted_volumes.
63    profiling_state: futures::lock::Mutex<Option<ProfileState>>,
64
65    /// A running estimate of the number of dirty bytes outstanding in all pager-backed VMOs across
66    /// all volumes.
67    pager_dirty_bytes_count: PagerDirtyByteCount,
68
69    /// Max outstanding dirty bytes under critical memory pressure. This could be hardcoded, but is
70    /// broken out for testing.
71    max_dirty_bytes_when_critical: AtomicU64,
72
73    // A callback to invoke when a volume is added.  When the volume is removed, this is called
74    // again with `None` as the second parameter.
75    on_volume_added:
76        OnceLock<Box<dyn Fn(&str, Option<(Arc<FxVolume>, Arc<ObjectStore>)>) + Send + Sync>>,
77
78    /// The cache configuration to use under different memory pressure levels.
79    memory_pressure_config: MemoryPressureConfig,
80}
81
82/// Operations on VolumesDirectory that cannot be performed concurrently (i.e. most
83/// volume creation/removal ops) should exist on this guard instead of VolumesDirectory.
84pub struct MountedVolumesGuard<'a> {
85    volumes_directory: Arc<VolumesDirectory>,
86    mounted_volumes: futures::lock::MutexGuard<'a, HashMap<u64, MountedVolume>>,
87}
88
89struct MountedVolume {
90    sequence: u64,
91    volume: FxVolumeAndRoot,
92
93    // True if the volume was forcibly locked.
94    locked: bool,
95}
96
97pub(crate) enum Mode {
98    Mount,
99    Create { guid: Option<[u8; 16]>, low_32_bit_object_ids: bool },
100}
101
102impl MountedVolumesGuard<'_> {
103    /// Creates or mounts a volume. If |crypt| is set, the volume will be created or mounted as
104    /// encrypted. The volume is mounted according to |as_blob|.
105    async fn create_or_mount_volume(
106        &mut self,
107        name: &str,
108        crypt: Option<Arc<dyn Crypt>>,
109        mode: Mode,
110        as_blob: bool,
111    ) -> Result<FxVolumeAndRoot, Error> {
112        let owner = Arc::downgrade(&self.volumes_directory) as Weak<dyn StoreOwner>;
113        let store = match mode {
114            Mode::Create { guid, low_32_bit_object_ids } => self
115                .volumes_directory
116                .root_volume
117                .new_volume(
118                    name,
119                    NewChildStoreOptions {
120                        options: StoreOptions { owner, crypt },
121                        guid,
122                        low_32_bit_object_ids,
123                        ..Default::default()
124                    },
125                )
126                .await
127                .context("failed to create new volume")?,
128            Mode::Mount => {
129                self.volumes_directory
130                    .root_volume
131                    .volume(name, StoreOptions { owner, crypt })
132                    .await?
133            }
134        };
135        ensure!(
136            !self.mounted_volumes.contains_key(&store.store_object_id()),
137            FxfsError::AlreadyBound
138        );
139
140        let volume = if as_blob {
141            self.mount_store::<BlobDirectory>(
142                name,
143                store,
144                self.volumes_directory.memory_pressure_config,
145            )
146            .await?
147        } else {
148            self.mount_store::<FxDirectory>(
149                name,
150                store,
151                self.volumes_directory.memory_pressure_config,
152            )
153            .await?
154        };
155        // If there is an ongoing profile activity, we should apply it to the mounted volume.
156        if let Some(ProfileState { profile_name, all_volumes: true, .. }) =
157            &(*self.volumes_directory.profiling_state.lock().await)
158        {
159            if let Err(e) = volume
160                .volume()
161                .record_and_replay_profile(new_profile_state(as_blob), profile_name)
162                .await
163            {
164                error!(
165                    "Failed to record or replay profile '{}' for volume {}: {:?}",
166                    profile_name, name, e
167                );
168            }
169        }
170
171        if let Mode::Create { .. } = mode {
172            let store_object_id = volume.volume().store().store_object_id();
173            self.volumes_directory.add_directory_entry(name, store_object_id);
174        }
175        Ok(volume)
176    }
177
178    /// Returns the volume if it is found and unlocked, along with a bool to indicate that it is or
179    /// isn't a blob volume.
180    async fn get_unlocked_volume_by_name(
181        &self,
182        volume_name: &str,
183    ) -> Result<(FxVolumeAndRoot, bool), zx::Status> {
184        let (store_object_id, _, _) = self
185            .volumes_directory
186            .root_volume
187            .volume_directory()
188            .lookup(volume_name)
189            .await
190            .map_err(map_to_status)?
191            .ok_or(zx::Status::NOT_FOUND)?;
192        if let Some(MountedVolume { volume, .. }) = self.mounted_volumes.get(&store_object_id) {
193            let is_blob = volume.root().clone().into_any().downcast::<BlobDirectory>().is_ok();
194            Ok((volume.clone(), is_blob))
195        } else {
196            Err(zx::Status::UNAVAILABLE)
197        }
198    }
199
200    // Mounts the given store.  A lock *must* be held on the volume directory.
201    async fn mount_store<T: From<Directory<FxVolume>> + RootDir>(
202        &mut self,
203        name: &str,
204        store: Arc<ObjectStore>,
205        flush_task_config: MemoryPressureConfig,
206    ) -> Result<FxVolumeAndRoot, Error> {
207        let unique_id = zx::Event::create();
208        let volume = FxVolumeAndRoot::new::<T>(
209            Arc::downgrade(&self.volumes_directory),
210            store,
211            unique_id.koid().unwrap().raw_koid(),
212            name.to_owned(),
213            self.volumes_directory.blob_resupplied_count.clone(),
214            self.volumes_directory.memory_pressure_config,
215        )
216        .await?;
217        volume
218            .volume()
219            .start_background_task(flush_task_config, self.volumes_directory.mem_monitor.as_ref());
220        self.add_mount(name, &volume);
221        Ok(volume)
222    }
223
224    /// Adds a volume (`FxVolumeAndRoot`) into the mount list.
225    pub fn add_mount(&mut self, name: &str, volume: &FxVolumeAndRoot) {
226        static SEQUENCE: AtomicU64 = AtomicU64::new(0);
227        let sequence = SEQUENCE.fetch_add(1, Ordering::Relaxed);
228        self.mounted_volumes.insert(
229            volume.volume().store().store_object_id(),
230            MountedVolume { sequence, volume: volume.clone(), locked: false },
231        );
232        if let Some(inspect) = self.volumes_directory.inspect_tree.upgrade() {
233            inspect.register_volume(
234                name.to_string(),
235                Arc::downgrade(volume.volume()) as Weak<dyn FsInspectVolume + Send + Sync>,
236            )
237        }
238        if let Some(callback) = self.volumes_directory.on_volume_added.get() {
239            callback(
240                name,
241                Some((
242                    volume.volume().clone(),
243                    self.volumes_directory
244                        .root_volume
245                        .volume_directory()
246                        .store()
247                        .filesystem()
248                        .root_store(),
249                )),
250            );
251        }
252    }
253
254    async fn lock_mount(&self, mounted_volume: &mut MountedVolume) {
255        let MountedVolume { volume, locked, .. } = mounted_volume;
256
257        if let Some(callback) = self.volumes_directory.on_volume_added.get() {
258            callback(&volume.volume().name(), None);
259        }
260        if !*locked {
261            if let Some(inspect) = self.volumes_directory.inspect_tree.upgrade() {
262                inspect.unregister_volume(volume.volume().name());
263            }
264            // We must make sure to remove the root entry which holds strong references to the
265            // volume since otherwise `Volume::try_unwrap` might fail.
266            let _ = volume.outgoing_dir().remove_entry("root", true);
267            volume.volume().terminate().await;
268            *locked = true;
269        }
270    }
271
272    async fn remove_volume(&mut self, name: &str) -> Result<(), Error> {
273        let (object_id, transaction) = self
274            .volumes_directory
275            .root_volume
276            .acquire_transaction_for_remove_volume(name, [], false)
277            .await?;
278
279        // Cowardly refuse to delete a mounted volume.
280        ensure!(!self.mounted_volumes.contains_key(&object_id), FxfsError::AlreadyBound);
281        let directory_node = self.volumes_directory.directory_node.clone();
282        self.volumes_directory
283            .root_volume
284            .delete_volume(name, transaction, || {
285                // This shouldn't fail because the entry should exist.
286                directory_node.remove_entry(name, /* must_be_directory: */ false).unwrap();
287            })
288            .await?;
289        Ok(())
290    }
291
292    async fn terminate(&mut self) {
293        let mut volumes = std::mem::take(&mut *self.mounted_volumes);
294        for mounted_volume in volumes.values_mut() {
295            let admin_scope = mounted_volume.volume.admin_scope();
296            admin_scope.shutdown();
297            admin_scope.wait().await;
298
299            self.lock_mount(mounted_volume).await;
300        }
301    }
302
303    // Unmounts the volume identified by `store_id`.  The caller should take locks to avoid races if
304    // necessary.
305    //
306    // NOTE: This will not terminate any connections on the admin scope.
307    pub async fn unmount(&mut self, store_id: u64) -> Result<FxVolumeAndRoot, Error> {
308        let mut mounted_volume =
309            self.mounted_volumes.remove(&store_id).ok_or(FxfsError::NotFound)?;
310        self.lock_mount(&mut mounted_volume).await;
311        Ok(mounted_volume.volume)
312    }
313
314    async fn force_lock(&mut self, store_id: u64) -> Result<(), Error> {
315        if let Some(mut mounted_volume) = self.mounted_volumes.remove(&store_id) {
316            self.lock_mount(&mut mounted_volume).await;
317            // Reinsert the volume as locked. This looks racy but it isn't, we held the mutable
318            // reference to `self` for this whole duration. `get_mut()` would be cleaner but that
319            // would hold a mutable reference while this call would take another reference.
320            self.mounted_volumes.insert(store_id, mounted_volume);
321        }
322
323        Ok(())
324    }
325
326    // Auto-unmount the volume when the last connection to the volume is closed.
327    fn auto_unmount(&self, store_id: u64) {
328        let volumes_directory = self.volumes_directory.clone();
329        let mounted_volume = self.mounted_volumes.get(&store_id).unwrap();
330        let sequence = mounted_volume.sequence;
331        let admin_scope = mounted_volume.volume.admin_scope().clone();
332        let scope = mounted_volume.volume.volume().scope().clone();
333        fasync::Task::spawn(
334            async move {
335                // Check the admin_scope first because once that has finished, there can never be
336                // any more connections to it.
337                admin_scope.wait().await;
338                scope.wait().await;
339
340                // Check that the same volume is still mounted i.e. there wasn't an explicit
341                // unmount.
342                let mut mounted_volumes = volumes_directory.lock().await;
343                match mounted_volumes.mounted_volumes.get(&store_id) {
344                    Some(m) if m.sequence == sequence => {}
345                    _ => return,
346                }
347
348                warn!(store_id; "Last connection to volume closed without unmount, shutting down");
349                let root_store = volumes_directory.root_volume.volume_directory().store();
350                let fs = root_store.filesystem();
351                let _guard = fs
352                    .lock_manager()
353                    .txn_lock(lock_keys![LockKey::object(
354                        root_store.store_object_id(),
355                        volumes_directory.root_volume.volume_directory().object_id(),
356                    )])
357                    .await;
358
359                if let Err(e) = mounted_volumes.unmount(store_id).await {
360                    warn!(e:?, store_id; "Failed to unmount volume");
361                }
362            }
363            .trace(trace_future_args!("Volume::auto_unmount")),
364        )
365        .detach();
366    }
367}
368
369impl VolumesDirectory {
370    /// Fills the VolumesDirectory with all volumes in |root_volume|.  No volume is opened during
371    /// this.
372    pub async fn new(
373        root_volume: RootVolume,
374        inspect_tree: Weak<FsInspectTree>,
375        mem_monitor: Option<MemoryPressureMonitor>,
376        blob_resupplied_count: Arc<PageRefaultCounter>,
377        memory_pressure_config: MemoryPressureConfig,
378    ) -> Result<Arc<Self>, Error> {
379        let layer_set = root_volume.volume_directory().store().tree().layer_set();
380        let mut merger = layer_set.merger();
381        let me = Arc::new(Self {
382            root_volume,
383            directory_node: vfs::directory::immutable::simple(),
384            mounted_volumes: futures::lock::Mutex::new(HashMap::default()),
385            inspect_tree,
386            mem_monitor,
387            blob_resupplied_count,
388            profiling_state: futures::lock::Mutex::new(None),
389            pager_dirty_bytes_count: PagerDirtyByteCount::new(),
390            max_dirty_bytes_when_critical: AtomicU64::new(zx::system_get_physmem() / 100),
391            on_volume_added: OnceLock::new(),
392            memory_pressure_config,
393        });
394        let mut iter = me.root_volume.volume_directory().iter(&mut merger).await?;
395        while let Some((name, store_id, object_descriptor)) = iter.get() {
396            ensure!(*object_descriptor == ObjectDescriptor::Volume, FxfsError::Inconsistent);
397
398            me.add_directory_entry(&name, store_id);
399
400            iter.advance().await?;
401        }
402        Ok(me)
403    }
404
405    /// Delete a profile for a given volume. Fails if that volume isn't mounted or if there is
406    /// active profile recording or replay.
407    pub async fn delete_profile(
408        self: &Arc<Self>,
409        volume_name: &str,
410        profile_name: &str,
411    ) -> Result<(), zx::Status> {
412        // Volumes lock is taken first to provide consistent lock ordering with mounting a volume.
413        let volumes = self.mounted_volumes.lock().await;
414        let state = self.profiling_state.lock().await;
415
416        // Only allow deletion when no operations are in flight. This removes confusion around
417        // deleting a profile while one is recording with the same name, as the profile will not be
418        // available for deletion until the recording completes. This would also mean that deleting
419        // during a recording may succeed for deleting an older version but will be confusingly
420        // replaced moments later.
421        if state.is_some() {
422            warn!("Failing profile deletion while profile operations are in flight.");
423            return Err(zx::Status::SHOULD_WAIT);
424        }
425        for MountedVolume { volume, .. } in volumes.values() {
426            if volume.volume().name() == volume_name {
427                let dir = Arc::new(FxDirectory::new(
428                    None,
429                    volume.volume().get_profile_directory().await.map_err(map_to_status)?,
430                ));
431                return dir.unlink(profile_name, false).await;
432            }
433        }
434        warn!(volume_name, profile_name; "Volume not found while deleting profile");
435        Err(zx::Status::NOT_FOUND)
436    }
437
438    pub fn memory_pressure_monitor(&self) -> Option<&MemoryPressureMonitor> {
439        self.mem_monitor.as_ref()
440    }
441
442    /// Stop all ongoing replays, and complete and persist ongoing recordings.
443    pub async fn stop_profile_tasks(self: &Arc<Self>) {
444        let mut state;
445        let volumes;
446        // Take the mounted_volumes lock first to keep consistent lock ordering with other
447        // operations, but don't need to hold it for the entire operation. We need to take the
448        // profiling_state lock before the mounted_volumes lock is dropped to ensure that another
449        // thread doesn't mount a volume and start a profile task on it in between.
450        {
451            volumes = self
452                .mounted_volumes
453                .lock()
454                .await
455                .values()
456                .map(|v| v.volume.volume().clone()) // Clones of each FxVolume.
457                .collect::<Vec<Arc<FxVolume>>>();
458            state = self.profiling_state.lock().await;
459        }
460        for volume in volumes {
461            volume.stop_profile_tasks().await;
462        }
463        *state = None;
464    }
465
466    /// Record a named profile for a number of seconds, fails if there is an in flight recording or
467    /// replay. The given volume must be unlocked, if no volume is given then all volumes will be
468    /// affected and all volumes mounted during the process will also be affected.
469    pub async fn record_and_replay_profile(
470        self: &Arc<Self>,
471        volume_name: Option<String>,
472        profile_name: String,
473        duration_secs: u32,
474    ) -> Result<(), zx::Status> {
475        // Volumes lock is taken first to provide consistent lock ordering with mounting a volume.
476        let volumes = self.lock().await;
477        let mut state = self.profiling_state.lock().await;
478        if state.is_some() {
479            // Consistency in the recording and replaying cannot be ensured at the volume level
480            // if more than one operation can be in flight at a time.
481            return Err(zx::Status::SHOULD_WAIT);
482        }
483        match volume_name.as_ref() {
484            Some(volume_name) => {
485                let (volume, is_blob) = volumes.get_unlocked_volume_by_name(&volume_name).await?;
486                if let Err(error) = volume
487                    .volume()
488                    .record_and_replay_profile(new_profile_state(is_blob), &profile_name)
489                    .await
490                {
491                    error!(
492                        error:?,
493                        profile_name = profile_name.as_str(),
494                        volume_name = volume_name.as_str();
495                        "Failed to record or replay profile",
496                    );
497                    return Err(map_to_status(error));
498                }
499            }
500            None => {
501                for MountedVolume { volume, .. } in volumes.mounted_volumes.values() {
502                    let is_blob =
503                        volume.root().clone().into_any().downcast::<BlobDirectory>().is_ok();
504                    // Just log the errors, don't stop half-way.
505                    if let Err(error) = volume
506                        .volume()
507                        .record_and_replay_profile(new_profile_state(is_blob), &profile_name)
508                        .await
509                    {
510                        error!(
511                            error:?,
512                            profile_name = profile_name.as_str(),
513                            volume_name = volume.volume().name();
514                            "Failed to record or replay profile",
515                        );
516                    }
517                }
518            }
519        }
520
521        let this = self.clone();
522        let task = fasync::Task::spawn(async move {
523            fasync::Timer::new(fasync::MonotonicDuration::from_seconds(duration_secs.into())).await;
524            this.stop_profile_tasks().await;
525        });
526        *state = Some(ProfileState { task, profile_name, all_volumes: volume_name.is_none() });
527        Ok(())
528    }
529
530    /// Replays a profile if one exists, and only records if one does not exist.
531    /// The given volume must be unlocked.
532    pub async fn replay_xor_record_profile(
533        self: &Arc<Self>,
534        volume_name: String,
535        profile_name: String,
536        duration_secs: u32,
537    ) -> Result<(), zx::Status> {
538        // Volumes lock is taken first to provide consistent lock ordering with mounting a volume.
539        let volumes = self.lock().await;
540        let mut state = self.profiling_state.lock().await;
541        if state.is_some() {
542            return Err(zx::Status::SHOULD_WAIT);
543        }
544        let (volume, is_blob) = volumes.get_unlocked_volume_by_name(&volume_name).await?;
545        if let Err(error) = volume
546            .volume()
547            .replay_xor_record_profile(new_profile_state(is_blob), &profile_name)
548            .await
549        {
550            error!(
551                error:?,
552                profile_name = profile_name.as_str(),
553                volume_name = volume_name.as_str();
554                "Failed to replay or record profile",
555            );
556            return Err(map_to_status(error));
557        }
558
559        let this = self.clone();
560        let task = fasync::Task::spawn(async move {
561            fasync::Timer::new(fasync::MonotonicDuration::from_seconds(duration_secs.into())).await;
562            this.stop_profile_tasks().await;
563        });
564        *state = Some(ProfileState { task, profile_name, all_volumes: false });
565        Ok(())
566    }
567
568    /// Returns the directory node which can be used to provide connections for e.g. enumerating
569    /// entries in the VolumesDirectory.
570    /// Directly manipulating the entries in this node will result in strange behaviour.
571    pub fn directory_node(&self) -> &Arc<vfs::directory::immutable::Simple> {
572        &self.directory_node
573    }
574
575    // This serves as an exclusive lock for operations that manipulate the set of mounted volumes.
576    pub async fn lock<'a>(self: &'a Arc<Self>) -> MountedVolumesGuard<'a> {
577        MountedVolumesGuard {
578            volumes_directory: self.clone(),
579            mounted_volumes: self.mounted_volumes.lock().await,
580        }
581    }
582
583    fn add_directory_entry(self: &Arc<Self>, name: &str, store_id: u64) {
584        let weak = Arc::downgrade(self);
585        let name_owned = Arc::new(name.to_string());
586        self.directory_node
587            .add_entry(
588                name,
589                vfs::service::host(move |requests| {
590                    let weak = weak.clone();
591                    let name = name_owned.clone();
592                    async move {
593                        if let Some(me) = weak.upgrade() {
594                            let _ =
595                                me.handle_volume_requests(name.as_ref(), requests, store_id).await;
596                        }
597                    }
598                }),
599            )
600            .unwrap();
601    }
602
603    /// Creates and mounts a new volume. If |crypt| is set, the volume will be encrypted. The
604    /// volume is mounted according to |as_blob|.
605    pub async fn create_and_mount_volume(
606        self: &Arc<Self>,
607        name: &str,
608        crypt: Option<Arc<dyn Crypt>>,
609        as_blob: bool,
610        guid: Option<[u8; 16]>,
611    ) -> Result<FxVolumeAndRoot, Error> {
612        self.lock()
613            .await
614            .create_or_mount_volume(
615                name,
616                crypt,
617                Mode::Create { guid, low_32_bit_object_ids: false },
618                as_blob,
619            )
620            .await
621    }
622
623    /// Mounts an existing volume. `crypt` will be used to unlock the volume if provided.
624    /// If `as_blob` is `true`, the volume will be mounted as a blob filesystem, otherwise
625    /// it will be treated as a regular fxfs volume.
626    pub async fn mount_volume(
627        self: &Arc<Self>,
628        name: &str,
629        crypt: Option<Arc<dyn Crypt>>,
630        as_blob: bool,
631    ) -> Result<FxVolumeAndRoot, Error> {
632        self.lock().await.create_or_mount_volume(name, crypt, Mode::Mount, as_blob).await
633    }
634
635    /// Removes a volume. The volume must exist but encrypted volume keys are not required.
636    pub async fn remove_volume(self: &Arc<Self>, name: &str) -> Result<(), Error> {
637        self.lock().await.remove_volume(name).await
638    }
639
640    /// Terminates all opened volumes.  This will not cancel any profiling that might be taking
641    /// place.
642    pub async fn terminate(self: &Arc<Self>) {
643        // Abort the profiling timer task.
644        let profiling_state = self.profiling_state.lock().await.take();
645        if let Some(state) = profiling_state {
646            state.task.abort().await;
647        }
648        self.lock().await.terminate().await;
649        // TODO(https://fxbug.dev/452935329): Turn this into a real assert.
650        debug_assert!(
651            self.pager_dirty_bytes_count.load() == 0,
652            "Leaked {} dirty bytes.",
653            self.pager_dirty_bytes_count.load()
654        );
655    }
656
657    /// Serves the given volume on `outgoing_dir_server_end`.
658    pub fn serve_volume(
659        self: &Arc<Self>,
660        volume: &FxVolumeAndRoot,
661        outgoing_dir_server_end: ServerEnd<fio::DirectoryMarker>,
662        as_blob: bool,
663    ) -> Result<(), Error> {
664        // A note regarding strong references to `FxVolume`: connections to services here are all on
665        // the admin scope.  When we force lock a volume, we want to keep the admin scope running
666        // but terminate the volume.  When a volume is terminated in this way, we want to ensure
667        // that there are no outstanding strong references to the volume.  With that in mind, the
668        // services here should not hold strong references.  They can hold a strong reference to
669        // VolumesDirectory and find the volume via the mount list, or they can hold a weak
670        // reference to the FxVolume.  Before upgrading the weak reference, an active guard must be
671        // acquired on the volume's scope first.  This ensures that no new strong references are
672        // taken once the volume has commenced termination.  The "root" entry just below is an
673        // exception that does hold a strong reference.  We handle that by making sure we remove
674        // that entry before we call `FxVolume::terminate`.
675
676        let outgoing_dir = volume.outgoing_dir();
677        outgoing_dir.add_entry("root", volume.root().clone().as_directory_entry())?;
678        let svc_dir = vfs::directory::immutable::simple();
679        outgoing_dir.add_entry("svc", svc_dir.clone())?;
680
681        let store_id = volume.volume().store().store_object_id();
682        let me = self.clone();
683        svc_dir.add_entry(
684            AdminMarker::PROTOCOL_NAME,
685            vfs::service::host(move |requests| {
686                let me = me.clone();
687                async move {
688                    let _ = me.handle_admin_requests(requests, store_id).await;
689                }
690            }),
691        )?;
692        let vol_scope = volume.volume().scope().clone();
693        let weak_vol = Arc::downgrade(volume.volume());
694        {
695            let vol_scope = vol_scope.clone();
696            let weak_vol = weak_vol.clone();
697            svc_dir.add_entry(
698                ProjectIdMarker::PROTOCOL_NAME,
699                vfs::service::host(move |requests| {
700                    let weak_vol = weak_vol.clone();
701                    let scope = vol_scope.clone();
702                    async move {
703                        let _ =
704                            FxVolume::handle_project_id_requests(weak_vol, scope, requests).await;
705                    }
706                }),
707            )?;
708        }
709        svc_dir.add_entry(
710            FileBackedVolumeProviderMarker::PROTOCOL_NAME,
711            vfs::service::host(move |requests| {
712                let weak_vol = weak_vol.clone();
713                let scope = vol_scope.clone();
714                async move {
715                    let _ = FxVolume::handle_file_backed_volume_provider_requests(
716                        weak_vol, scope, requests,
717                    )
718                    .await;
719                }
720            }),
721        )?;
722        volume.root().clone().register_additional_volume_services(&svc_dir)?;
723
724        let scope = volume.admin_scope().clone();
725        let mut flags = fio::PERM_READABLE | fio::PERM_WRITABLE;
726        if as_blob {
727            flags |= fio::PERM_EXECUTABLE;
728        }
729        vfs::directory::serve_on(Arc::clone(outgoing_dir), flags, scope, outgoing_dir_server_end);
730
731        info!(
732            store_id;
733            "Serving volume, pager port koid={}",
734            fasync::EHandle::local().port().koid().unwrap().raw_koid()
735        );
736        Ok(())
737    }
738
739    /// Creates and serves the volume with the given name.
740    pub async fn create_and_serve_volume(
741        self: &Arc<Self>,
742        name: &str,
743        outgoing_directory_server_end: ServerEnd<fio::DirectoryMarker>,
744        mount_options: MountOptions,
745        create_options: CreateOptions,
746    ) -> Result<(), Error> {
747        let mut guard = self.lock().await;
748        let crypt =
749            mount_options.crypt.map(|crypt| Arc::new(RemoteCrypt::new(crypt)) as Arc<dyn Crypt>);
750        let as_blob = mount_options.as_blob.unwrap_or(false);
751        let guid = create_options.guid;
752        let low_32_bit_object_ids = create_options.restrict_inode_ids_to_32_bit.unwrap_or(false);
753        let volume = guard
754            .create_or_mount_volume(
755                name,
756                crypt,
757                Mode::Create { guid, low_32_bit_object_ids },
758                as_blob,
759            )
760            .await?;
761        self.serve_volume(&volume, outgoing_directory_server_end, as_blob)
762            .context("failed to serve volume")?;
763        guard.auto_unmount(volume.volume().store().store_object_id());
764        Ok(())
765    }
766
767    async fn handle_volume_requests(
768        self: &Arc<Self>,
769        name: &str,
770        mut requests: VolumeRequestStream,
771        store_id: u64,
772    ) -> Result<(), Error> {
773        while let Some(request) = requests.try_next().await? {
774            match request {
775                VolumeRequest::Check { responder, options } => {
776                    async move {
777                        responder.send(self.handle_check(store_id, options).await.map_err(
778                            |error| {
779                                error!(error:?, store_id; "Failed to check volume");
780                                map_to_raw_status(error)
781                            },
782                        ))
783                    }
784                    .trace(trace_future_args!("Volume::Check"))
785                    .await?;
786                }
787                VolumeRequest::Mount { responder, outgoing_directory, options } => {
788                    async move {
789                        responder.send(
790                            self.handle_mount(name, store_id, outgoing_directory, options)
791                                .await
792                                .map_err(|error| {
793                                    error!(error:?, name, store_id; "Failed to mount volume");
794                                    map_to_raw_status(error)
795                                }),
796                        )
797                    }
798                    .trace(trace_future_args!("Volume::Mount"))
799                    .await?;
800                }
801                VolumeRequest::SetLimit { responder, bytes } => {
802                    async move {
803                        responder.send(self.handle_set_limit(store_id, bytes).await.map_err(
804                            |error| {
805                                error!(error:?, store_id; "Failed to set volume limit");
806                                map_to_raw_status(error)
807                            },
808                        ))
809                    }
810                    .trace(trace_future_args!("Volume::SetLimit"))
811                    .await?;
812                }
813                VolumeRequest::GetLimit { responder } => {
814                    fxfs_trace::duration!("Volume::GetLimit");
815                    responder.send(Ok(self.handle_get_limit(store_id)))?
816                }
817                VolumeRequest::GetInfo { responder } => {
818                    async move {
819                        let result = self.handle_get_info(store_id).await.map(|guid| {
820                            fidl_fuchsia_fs_startup::VolumeInfo {
821                                guid: Some(guid),
822                                ..Default::default()
823                            }
824                        });
825                        match result {
826                            Ok(response) => responder.send(Ok(&response)),
827                            Err(error) => {
828                                error!(error:?, store_id; "Failed to get volume info");
829                                responder.send(Err(map_to_raw_status(error)))
830                            }
831                        }
832                    }
833                    .trace(trace_future_args!("Volume::GetInfo"))
834                    .await?;
835                }
836            }
837        }
838        Ok(())
839    }
840
841    pub fn memory_pressure_config(&self) -> &MemoryPressureConfig {
842        &self.memory_pressure_config
843    }
844
845    fn is_flush_required_to_dirty(&self, byte_count: u64) -> bool {
846        let mem_pressure = self
847            .mem_monitor
848            .as_ref()
849            .map(|mem_monitor| mem_monitor.level())
850            .unwrap_or(MemoryPressureLevel::Normal);
851        if !matches!(mem_pressure, MemoryPressureLevel::Critical) {
852            return false;
853        }
854
855        let total_dirty = self.pager_dirty_bytes_count.load();
856        total_dirty + byte_count >= self.max_dirty_bytes_when_critical.load(Ordering::Relaxed)
857    }
858
859    /// Reports that a certain number of bytes will be dirtied in a pager-backed VMO. If the memory
860    /// pressure level is critical and fxfs has lots of dirty pages then a new task will be spawned
861    /// in `volume` to flush the dirty pages before `mark_dirty` is called. If the memory pressure
862    /// level is not critical then `mark_dirty` will be synchronously called.
863    pub fn report_pager_dirty(
864        self: Arc<Self>,
865        byte_count: u64,
866        volume: Arc<FxVolume>,
867        mark_dirty: impl FnOnce() + Send + 'static,
868    ) {
869        if !self.is_flush_required_to_dirty(byte_count) {
870            self.pager_dirty_bytes_count.fetch_add(byte_count);
871            mark_dirty();
872        } else {
873            volume.spawn(
874                async move {
875                    let volumes = self.mounted_volumes.lock().await;
876
877                    // Re-check the number of outstanding pager dirty bytes because another thread
878                    // could have raced and flushed the volumes first.
879                    if self.is_flush_required_to_dirty(byte_count) {
880                        debug!(
881                            "Flushing all volumes. Memory pressure is critical & dirty pager bytes \
882                            ({} MiB) >= limit ({} MiB)",
883                            self.pager_dirty_bytes_count.load() / MEBIBYTE,
884                            self.max_dirty_bytes_when_critical.load(Ordering::Relaxed) / MEBIBYTE
885                        );
886
887                        let flushes = FuturesUnordered::new();
888                        for MountedVolume { volume, .. } in volumes.values() {
889                            let vol = volume.volume().clone();
890                            flushes.push(async move {
891                                vol.minimize_memory().await;
892                            });
893                        }
894
895                        flushes.collect::<()>().await;
896                    }
897                    self.pager_dirty_bytes_count.fetch_add(byte_count);
898                    mark_dirty();
899                }
900                .trace(trace_future_args!("flush-before-mark-dirty")),
901            )
902        }
903    }
904
905    /// Reports that a certain number of bytes were cleaned in a pager-backed VMO.
906    pub fn report_pager_clean(&self, byte_count: u64) {
907        let prev_dirty = self.pager_dirty_bytes_count.fetch_sub(byte_count);
908        // TODO(https://fxbug.dev/452935329): Turn this into a real assert.
909        debug_assert!(prev_dirty >= byte_count, "Underflowed dirty bytes.");
910
911        if prev_dirty < byte_count {
912            // An unlikely scenario, but if there was an underflow, reset the pager dirty bytes to
913            // zero.
914            self.pager_dirty_bytes_count.store(0);
915        }
916    }
917
918    async fn handle_check(
919        self: &Arc<Self>,
920        store_id: u64,
921        options: CheckOptions,
922    ) -> Result<(), Error> {
923        let fs = self.root_volume.volume_directory().store().filesystem();
924        let crypt = if let Some(crypt) = options.crypt {
925            Some(Arc::new(RemoteCrypt::new(crypt)) as Arc<dyn Crypt>)
926        } else {
927            None
928        };
929        let result = fsck::fsck_volume(fs.as_ref(), store_id, crypt).await?;
930        // TODO(b/311550633): Stash result in inspect.
931        info!(store_id:%; "{result:?}");
932        Ok(())
933    }
934
935    async fn handle_set_limit(self: &Arc<Self>, store_id: u64, bytes: u64) -> Result<(), Error> {
936        let store = self.root_volume.volume_directory().store();
937        let mut transaction = store.new_transaction(lock_keys![], Options::default()).await?;
938        store.filesystem().allocator().set_bytes_limit(&mut transaction, store_id, bytes)?;
939        transaction.commit().await?;
940        Ok(())
941    }
942
943    fn handle_get_limit(self: &Arc<Self>, store_id: u64) -> u64 {
944        let fs = self.root_volume.volume_directory().store().filesystem();
945        fs.allocator().get_owner_bytes_limit(store_id).unwrap_or_default()
946    }
947
948    async fn handle_get_info(self: &Arc<Self>, store_id: u64) -> Result<[u8; 16], Error> {
949        let fs = self.root_volume.volume_directory().store().filesystem();
950        let store =
951            fs.object_manager().store(store_id).ok_or_else(|| anyhow!("Store not found"))?;
952        Ok(store.guid())
953    }
954
955    async fn handle_mount(
956        self: &Arc<Self>,
957        name: &str,
958        store_id: u64,
959        outgoing_directory_server_end: ServerEnd<fio::DirectoryMarker>,
960        options: MountOptions,
961    ) -> Result<(), Error> {
962        info!(name:%, store_id:%, options:?; "Received mount request");
963        let crypt = options.crypt.map(|crypt| Arc::new(RemoteCrypt::new(crypt)) as Arc<dyn Crypt>);
964        let as_blob = options.as_blob.unwrap_or(false);
965        let mut guard = self.lock().await;
966        let volume = guard
967            .create_or_mount_volume(name, crypt, Mode::Mount, as_blob)
968            .await
969            .context("failed to mount volume")?;
970        self.serve_volume(&volume, outgoing_directory_server_end, as_blob)
971            .context("failed to serve volume")?;
972        guard.auto_unmount(volume.volume().store().store_object_id());
973        Ok(())
974    }
975
976    async fn handle_admin_requests(
977        self: &Arc<Self>,
978        mut stream: AdminRequestStream,
979        store_id: u64,
980    ) -> Result<(), Error> {
981        // If the Admin protocol ever supports more methods, this should change to a while.
982        if let Some(request) = stream.try_next().await.context("Reading request")? {
983            match request {
984                AdminRequest::Shutdown { responder } => {
985                    info!(store_id; "Received shutdown request for volume");
986
987                    let root_store = self.root_volume.volume_directory().store();
988                    let fs = root_store.filesystem();
989                    let _guard = fs
990                        .lock_manager()
991                        .txn_lock(lock_keys![LockKey::object(
992                            root_store.store_object_id(),
993                            self.root_volume.volume_directory().object_id(),
994                        )])
995                        .await;
996
997                    let maybe_volume = self.lock().await.unmount(store_id).await;
998                    responder
999                        .send()
1000                        .unwrap_or_else(|e| warn!("Failed to send shutdown response: {}", e));
1001
1002                    if let Ok(volume) = maybe_volume {
1003                        // NOTE: After calling this, this task might be dropped at the next await
1004                        // point.
1005                        volume.admin_scope().shutdown();
1006                    }
1007
1008                    return Ok(());
1009                }
1010            }
1011        }
1012        Ok(())
1013    }
1014
1015    /// Sets a callback which is invoked when a volume is added.  When the volume is removed, this
1016    /// is called again with `None` as the second parameter. The root store is included along with
1017    /// volume so the volume's layers can be accessed. Note that this can only be set once per
1018    /// VolumesDirectory; repeated calls will panic.
1019    pub fn set_on_mount_callback<
1020        F: Fn(&str, Option<(Arc<FxVolume>, Arc<ObjectStore>)>) + Send + Sync + 'static,
1021    >(
1022        &self,
1023        callback: F,
1024    ) {
1025        self.on_volume_added.set(Box::new(callback)).ok().unwrap();
1026    }
1027
1028    pub async fn install_volume(
1029        self: &Arc<Self>,
1030        src: &str,
1031        image_file: &str,
1032        dst: &str,
1033    ) -> Result<(), Error> {
1034        let guard = self.lock().await;
1035        info!("installing {src}/{image_file} -> {dst}");
1036        for MountedVolume { volume, .. } in guard.mounted_volumes.values() {
1037            if volume.volume().name() == src {
1038                return Err(zx::Status::ALREADY_BOUND)
1039                    .with_context(|| format!("volume {src} is already mounted"));
1040            }
1041            if volume.volume().name() == dst {
1042                return Err(zx::Status::ALREADY_BOUND)
1043                    .with_context(|| format!("volume {dst} is already mounted"));
1044            }
1045        }
1046        guard.volumes_directory.root_volume.install_volume(&src, &image_file, &dst).await?;
1047
1048        // The above function ensures that we've deleted `src` and `dst` now exists. Before we
1049        // release `guard`, we need to update the entries in the volumes directory accordingly.
1050        guard
1051            .volumes_directory
1052            .directory_node()
1053            .remove_entry(src, /* must_be_directory: */ false)
1054            .unwrap();
1055        guard
1056            .volumes_directory
1057            .directory_node()
1058            .remove_entry(dst, /* must_be_directory: */ false)
1059            .unwrap();
1060        let new_dst_object_id =
1061            match guard.volumes_directory.root_volume.volume_directory().lookup(dst).await? {
1062                Some((object_id, ObjectDescriptor::Volume, _)) => Ok(object_id),
1063                Some(_) => Err(FxfsError::Inconsistent),
1064                None => Err(FxfsError::NotFound),
1065            }?;
1066        self.add_directory_entry(dst, new_dst_object_id);
1067
1068        info!("install complete");
1069        Ok(())
1070    }
1071}
1072
1073#[async_trait]
1074impl StoreOwner for VolumesDirectory {
1075    async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), Error> {
1076        self.lock().await.force_lock(store.store_object_id()).await
1077    }
1078}
1079
1080#[cfg(test)]
1081pub(crate) fn serve_startup_volume_proxy(
1082    volumes_directory: &Arc<VolumesDirectory>,
1083    volume_name: &str,
1084) -> (fidl_fuchsia_fs_startup::VolumeProxy, vfs::ExecutionScope) {
1085    use vfs::ToObjectRequest;
1086    use vfs::service::ServiceLike;
1087    let scope = vfs::ExecutionScope::new();
1088    let entry = volumes_directory.directory_node().get_entry(volume_name).unwrap();
1089    let service = entry.into_any().downcast::<vfs::service::Service>().unwrap();
1090    let (proxy, server) = fidl::endpoints::create_proxy::<fidl_fuchsia_fs_startup::VolumeMarker>();
1091    service
1092        .connect(
1093            scope.clone(),
1094            Default::default(),
1095            &mut fio::Flags::PROTOCOL_SERVICE.to_object_request(server),
1096        )
1097        .unwrap();
1098    (proxy, scope)
1099}
1100
1101struct PagerDirtyByteCount(AtomicU64);
1102
1103impl PagerDirtyByteCount {
1104    pub fn new() -> Self {
1105        Self(AtomicU64::new(0))
1106    }
1107
1108    pub fn fetch_add(&self, value: u64) -> u64 {
1109        let prev = self.0.fetch_add(value, Ordering::Relaxed);
1110        fxfs_trace::counter!("dirty-bytes", 0, "total" => prev.saturating_add(value));
1111        prev
1112    }
1113
1114    pub fn fetch_sub(&self, value: u64) -> u64 {
1115        let prev = self.0.fetch_sub(value, Ordering::Relaxed);
1116        fxfs_trace::counter!("dirty-bytes", 0, "total" => prev.saturating_sub(value));
1117        prev
1118    }
1119
1120    pub fn load(&self) -> u64 {
1121        self.0.load(Ordering::Relaxed)
1122    }
1123
1124    pub fn store(&self, value: u64) {
1125        self.0.store(value, Ordering::Relaxed);
1126        fxfs_trace::counter!("dirty-bytes", 0, "total" => value);
1127    }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132    use super::{Mode, serve_startup_volume_proxy};
1133    use crate::fuchsia::RemoteCrypt;
1134    use crate::fuchsia::memory_pressure::MemoryPressureLevel;
1135    use crate::fuchsia::testing::{self, TestFixture, open_dir_checked, open_file_checked};
1136    use crate::fuchsia::volume::MemoryPressureConfig;
1137    use crate::fuchsia::volumes_directory::VolumesDirectory;
1138    use crate::testing::TestFixtureOptions;
1139    use fidl::endpoints::{DiscoverableProtocolMarker, create_proxy, create_request_stream};
1140    use fidl_fuchsia_fs::AdminMarker;
1141    use fidl_fuchsia_fs_startup::{MountOptions, VolumeProxy};
1142    use fidl_fuchsia_fxfs::{CryptRequest, FxfsKey, KeyPurpose, WrappedKey};
1143    use fidl_fuchsia_io as fio;
1144    use fuchsia_async as fasync;
1145    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
1146    use fuchsia_fs::file;
1147    use futures::{TryStreamExt, join};
1148    use fxfs::errors::FxfsError;
1149    use fxfs::filesystem::FxFilesystem;
1150    use fxfs::fsck::{FsckOptions, fsck, fsck_volume_with_options, fsck_with_options};
1151    use fxfs::lock_keys;
1152    use fxfs::object_handle::ObjectHandle;
1153    use fxfs::object_store::allocator::Allocator;
1154    use fxfs::object_store::transaction::{LockKey, Options};
1155    use fxfs::object_store::volume::root_volume;
1156    use fxfs_crypto::Crypt;
1157    use fxfs_insecure_crypto::new_insecure_crypt;
1158    use refaults_vmo::PageRefaultCounter;
1159    use std::sync::atomic::Ordering;
1160    use std::sync::{Arc, Weak};
1161    use std::time::Duration;
1162    use storage_device::DeviceHolder;
1163    use storage_device::fake_device::FakeDevice;
1164    use vfs::execution_scope::ExecutionScope;
1165    use vfs::temp_clone::{TempClonable, unblock};
1166    use zx::Status;
1167    async fn write_image_to_file(image: DeviceHolder, file: fio::FileProxy) {
1168        file.resize(image.size()).await.unwrap().expect("resize failed");
1169        let vmo = TempClonable::new(
1170            file.get_backing_memory(fio::VmoFlags::SHARED_BUFFER | fio::VmoFlags::WRITE)
1171                .await
1172                .unwrap()
1173                .expect("get backing memory failed"),
1174        );
1175
1176        const CHUNK_READ_SIZE: usize = 131_072; /* 128 KiB */
1177        let mut buff = image.allocate_buffer(CHUNK_READ_SIZE).await;
1178        let total = image.size();
1179        let mut offset = 0;
1180        while offset < total {
1181            let amount = std::cmp::min(total - offset, CHUNK_READ_SIZE as u64);
1182            image.read(offset, buff.as_mut()).await.expect("image read failed");
1183            {
1184                // *NOTE*: We have to unblock our write to the VMO since it's pager backed and could
1185                // be running on the same thread as the filesystem is.
1186                let vmo = vmo.temp_clone();
1187                let data = buff.as_slice()[0..amount as usize].to_vec();
1188                let offset = offset;
1189                unblock(move || vmo.write(&data, offset)).await.expect("vmo write failed");
1190            }
1191            offset += amount;
1192        }
1193        assert_eq!(offset, total);
1194        file.sync().await.unwrap().expect("sync failed");
1195        file.close().await.unwrap().expect("close failed");
1196    }
1197
1198    #[fuchsia::test]
1199    async fn test_volume_creation() {
1200        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1201        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1202        let blob_resupplied_count =
1203            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1204        let volumes_directory = VolumesDirectory::new(
1205            root_volume(filesystem.clone()).await.unwrap(),
1206            Weak::new(),
1207            None,
1208            blob_resupplied_count,
1209            MemoryPressureConfig::default(),
1210        )
1211        .await
1212        .unwrap();
1213
1214        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1215        {
1216            let vol = volumes_directory
1217                .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1218                .await
1219                .expect("create encrypted volume failed");
1220            vol.volume().store().store_object_id()
1221        };
1222
1223        volumes_directory.terminate().await;
1224        std::mem::drop(volumes_directory);
1225        filesystem.close().await.expect("close filesystem failed");
1226        let device = filesystem.take_device().await;
1227        device.reopen(false);
1228        let filesystem = FxFilesystem::open(device).await.unwrap();
1229        let blob_resupplied_count =
1230            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1231        let volumes_directory = VolumesDirectory::new(
1232            root_volume(filesystem.clone()).await.unwrap(),
1233            Weak::new(),
1234            None,
1235            blob_resupplied_count,
1236            MemoryPressureConfig::default(),
1237        )
1238        .await
1239        .unwrap();
1240
1241        let error = volumes_directory
1242            .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1243            .await
1244            .err()
1245            .expect("Creating existing encrypted volume should fail");
1246        assert!(FxfsError::AlreadyExists.matches(&error));
1247    }
1248
1249    #[fuchsia::test]
1250    async fn test_dirty_pages_accumulate_in_parent() {
1251        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1252        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1253        let blob_resupplied_count =
1254            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1255        let volumes_directory = VolumesDirectory::new(
1256            root_volume(filesystem.clone()).await.unwrap(),
1257            Weak::new(),
1258            None,
1259            blob_resupplied_count,
1260            MemoryPressureConfig::default(),
1261        )
1262        .await
1263        .unwrap();
1264
1265        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1266        let vol = volumes_directory
1267            .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1268            .await
1269            .expect("create encrypted volume failed");
1270        let old_dirty = volumes_directory.pager_dirty_bytes_count.load();
1271
1272        let new_dirty = {
1273            let (root, server_end) = create_proxy::<fio::DirectoryMarker>();
1274            vol.root().clone().serve(fio::PERM_READABLE | fio::PERM_WRITABLE, server_end);
1275            let f = open_file_checked(
1276                &root,
1277                "foo",
1278                fio::Flags::FLAG_MAYBE_CREATE
1279                    | fio::PERM_READABLE
1280                    | fio::PERM_WRITABLE
1281                    | fio::Flags::PROTOCOL_FILE,
1282                &Default::default(),
1283            )
1284            .await;
1285            let buf = vec![0xaa as u8; 8192];
1286            file::write(&f, buf.as_slice()).await.expect("Write");
1287            // It's important to check the dirty bytes before closing the file, as closing can
1288            // trigger a flush.
1289            volumes_directory.pager_dirty_bytes_count.load()
1290        };
1291        assert_ne!(old_dirty, new_dirty);
1292
1293        volumes_directory.terminate().await;
1294        std::mem::drop(volumes_directory);
1295        filesystem.close().await.expect("close filesystem failed");
1296    }
1297
1298    #[fuchsia::test]
1299    async fn test_volume_reopen() {
1300        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1301        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1302        let blob_resupplied_count =
1303            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1304        let volumes_directory = VolumesDirectory::new(
1305            root_volume(filesystem.clone()).await.unwrap(),
1306            Weak::new(),
1307            None,
1308            blob_resupplied_count,
1309            MemoryPressureConfig::default(),
1310        )
1311        .await
1312        .unwrap();
1313
1314        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1315        let volume_id = {
1316            let vol = volumes_directory
1317                .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1318                .await
1319                .expect("create encrypted volume failed");
1320            vol.volume().store().store_object_id()
1321        };
1322
1323        volumes_directory.terminate().await;
1324        std::mem::drop(volumes_directory);
1325        filesystem.close().await.expect("close filesystem failed");
1326        let device = filesystem.take_device().await;
1327        device.reopen(false);
1328        let filesystem = FxFilesystem::open(device).await.unwrap();
1329        let blob_resupplied_count =
1330            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1331        let volumes_directory = VolumesDirectory::new(
1332            root_volume(filesystem.clone()).await.unwrap(),
1333            Weak::new(),
1334            None,
1335            blob_resupplied_count,
1336            MemoryPressureConfig::default(),
1337        )
1338        .await
1339        .unwrap();
1340
1341        {
1342            let vol = volumes_directory
1343                .mount_volume("encrypted", Some(crypt.clone()), false)
1344                .await
1345                .expect("open existing encrypted volume failed");
1346            assert_eq!(vol.volume().store().store_object_id(), volume_id);
1347        }
1348
1349        volumes_directory.terminate().await;
1350        std::mem::drop(volumes_directory);
1351        filesystem.close().await.expect("close filesystem failed");
1352    }
1353
1354    #[fuchsia::test]
1355    async fn test_volume_creation_unencrypted() {
1356        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1357        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1358        let blob_resupplied_count =
1359            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1360        let volumes_directory = VolumesDirectory::new(
1361            root_volume(filesystem.clone()).await.unwrap(),
1362            Weak::new(),
1363            None,
1364            blob_resupplied_count,
1365            MemoryPressureConfig::default(),
1366        )
1367        .await
1368        .unwrap();
1369
1370        {
1371            let vol = volumes_directory
1372                .create_and_mount_volume("unencrypted", None, false, None)
1373                .await
1374                .expect("create unencrypted volume failed");
1375            vol.volume().store().store_object_id()
1376        };
1377
1378        volumes_directory.terminate().await;
1379        std::mem::drop(volumes_directory);
1380        filesystem.close().await.expect("close filesystem failed");
1381        let device = filesystem.take_device().await;
1382        device.reopen(false);
1383        let filesystem = FxFilesystem::open(device).await.unwrap();
1384        let blob_resupplied_count =
1385            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1386        let volumes_directory = VolumesDirectory::new(
1387            root_volume(filesystem.clone()).await.unwrap(),
1388            Weak::new(),
1389            None,
1390            blob_resupplied_count,
1391            MemoryPressureConfig::default(),
1392        )
1393        .await
1394        .unwrap();
1395
1396        let error = volumes_directory
1397            .create_and_mount_volume("unencrypted", None, false, None)
1398            .await
1399            .err()
1400            .expect("Creating existing unencrypted volume should fail");
1401        assert!(FxfsError::AlreadyExists.matches(&error));
1402
1403        volumes_directory.terminate().await;
1404        std::mem::drop(volumes_directory);
1405        filesystem.close().await.expect("close filesystem failed");
1406    }
1407
1408    #[fuchsia::test]
1409    async fn test_volume_reopen_unencrypted() {
1410        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1411        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1412        let blob_resupplied_count =
1413            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1414        let volumes_directory = VolumesDirectory::new(
1415            root_volume(filesystem.clone()).await.unwrap(),
1416            Weak::new(),
1417            None,
1418            blob_resupplied_count,
1419            MemoryPressureConfig::default(),
1420        )
1421        .await
1422        .unwrap();
1423
1424        let volume_id = {
1425            let vol = volumes_directory
1426                .create_and_mount_volume("unencrypted", None, false, None)
1427                .await
1428                .expect("create unencrypted volume failed");
1429            vol.volume().store().store_object_id()
1430        };
1431
1432        volumes_directory.terminate().await;
1433        std::mem::drop(volumes_directory);
1434        filesystem.close().await.expect("close filesystem failed");
1435        let device = filesystem.take_device().await;
1436        device.reopen(false);
1437        let filesystem = FxFilesystem::open(device).await.unwrap();
1438        let blob_resupplied_count =
1439            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1440        let volumes_directory = VolumesDirectory::new(
1441            root_volume(filesystem.clone()).await.unwrap(),
1442            Weak::new(),
1443            None,
1444            blob_resupplied_count,
1445            MemoryPressureConfig::default(),
1446        )
1447        .await
1448        .unwrap();
1449
1450        {
1451            let vol = volumes_directory
1452                .mount_volume("unencrypted", None, false)
1453                .await
1454                .expect("open existing unencrypted volume failed");
1455            assert_eq!(vol.volume().store().store_object_id(), volume_id);
1456        }
1457
1458        volumes_directory.terminate().await;
1459        std::mem::drop(volumes_directory);
1460        filesystem.close().await.expect("close filesystem failed");
1461    }
1462
1463    #[fuchsia::test]
1464    async fn test_volume_enumeration() {
1465        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1466        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1467        let blob_resupplied_count =
1468            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1469        let volumes_directory = VolumesDirectory::new(
1470            root_volume(filesystem.clone()).await.unwrap(),
1471            Weak::new(),
1472            None,
1473            blob_resupplied_count,
1474            MemoryPressureConfig::default(),
1475        )
1476        .await
1477        .unwrap();
1478
1479        // Add an encrypted volume...
1480        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1481        {
1482            volumes_directory
1483                .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1484                .await
1485                .expect("create encrypted volume failed");
1486        };
1487        // And an unencrypted volume.
1488        {
1489            volumes_directory
1490                .create_and_mount_volume("unencrypted", None, false, None)
1491                .await
1492                .expect("create unencrypted volume failed");
1493        };
1494
1495        // Restart, so that we can test enumeration of unopened volumes.
1496        volumes_directory.terminate().await;
1497        std::mem::drop(volumes_directory);
1498        filesystem.close().await.expect("close filesystem failed");
1499        let device = filesystem.take_device().await;
1500        device.reopen(false);
1501        let filesystem = FxFilesystem::open(device).await.unwrap();
1502        let blob_resupplied_count =
1503            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1504        let volumes_directory = VolumesDirectory::new(
1505            root_volume(filesystem.clone()).await.unwrap(),
1506            Weak::new(),
1507            None,
1508            blob_resupplied_count,
1509            MemoryPressureConfig::default(),
1510        )
1511        .await
1512        .unwrap();
1513
1514        let readdir = |dir: Arc<fio::DirectoryProxy>| async move {
1515            let status = dir.rewind().await.expect("FIDL call failed");
1516            Status::ok(status).expect("rewind failed");
1517            let (status, buf) = dir.read_dirents(fio::MAX_BUF).await.expect("FIDL call failed");
1518            Status::ok(status).expect("read_dirents failed");
1519            let mut entries = vec![];
1520            for res in fuchsia_fs::directory::parse_dir_entries(&buf) {
1521                entries.push(res.expect("Failed to parse entry").name);
1522            }
1523            entries
1524        };
1525
1526        let dir_proxy = Arc::new(vfs::directory::serve_read_only(
1527            volumes_directory.directory_node().clone(),
1528            ExecutionScope::new(),
1529        ));
1530        let entries = readdir(dir_proxy.clone()).await;
1531        assert_eq!(entries, [".", "encrypted", "unencrypted"]);
1532
1533        let _vol = volumes_directory
1534            .mount_volume("encrypted", Some(crypt.clone()), false)
1535            .await
1536            .expect("Open encrypted volume failed");
1537
1538        // Ensure that the behaviour is the same after we've opened a volume.
1539        let entries = readdir(dir_proxy).await;
1540        assert_eq!(entries, [".", "encrypted", "unencrypted"]);
1541
1542        volumes_directory.terminate().await;
1543        std::mem::drop(volumes_directory);
1544        filesystem.close().await.expect("close filesystem failed");
1545    }
1546
1547    #[fuchsia::test]
1548    async fn test_get_info() {
1549        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1550        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1551        let root_volume = root_volume(filesystem.clone()).await.unwrap();
1552        let blob_resupplied_count =
1553            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1554        let volumes_directory = VolumesDirectory::new(
1555            root_volume,
1556            Weak::new(),
1557            None,
1558            blob_resupplied_count,
1559            MemoryPressureConfig::default(),
1560        )
1561        .await
1562        .unwrap();
1563
1564        let vol = volumes_directory
1565            .create_and_mount_volume("vol", None, false, None)
1566            .await
1567            .expect("create_and_mount_volume failed");
1568        let guid = vol.volume().store().guid();
1569
1570        let (volume_proxy, _scope) = serve_startup_volume_proxy(&volumes_directory, "vol");
1571
1572        let info: fidl_fuchsia_fs_startup::VolumeInfo = volume_proxy
1573            .get_info()
1574            .await
1575            .expect("get_info failed")
1576            .expect("get_info returned error");
1577        assert_eq!(info.guid, Some(guid));
1578
1579        volumes_directory.terminate().await;
1580    }
1581
1582    #[fuchsia::test]
1583    async fn test_deleted_encrypted_volume_while_mounted() {
1584        const VOLUME_NAME: &str = "encrypted";
1585
1586        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1587        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1588        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1589        let blob_resupplied_count =
1590            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1591        let volumes_directory = VolumesDirectory::new(
1592            root_volume(filesystem.clone()).await.unwrap(),
1593            Weak::new(),
1594            None,
1595            blob_resupplied_count,
1596            MemoryPressureConfig::default(),
1597        )
1598        .await
1599        .unwrap();
1600        volumes_directory
1601            .create_and_mount_volume(VOLUME_NAME, Some(crypt.clone()), false, None)
1602            .await
1603            .expect("create encrypted volume failed");
1604        // We have the volume mounted so delete attempts should fail.
1605        assert!(
1606            FxfsError::AlreadyBound.matches(
1607                &volumes_directory
1608                    .remove_volume(VOLUME_NAME)
1609                    .await
1610                    .err()
1611                    .expect("Deleting volume should fail")
1612            )
1613        );
1614        volumes_directory.terminate().await;
1615        std::mem::drop(volumes_directory);
1616        filesystem.close().await.expect("close filesystem failed");
1617    }
1618
1619    #[fuchsia::test]
1620    async fn test_mount_volume_using_volume_protocol() {
1621        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1622        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1623        let blob_resupplied_count =
1624            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1625        let volumes_directory = VolumesDirectory::new(
1626            root_volume(filesystem.clone()).await.unwrap(),
1627            Weak::new(),
1628            None,
1629            blob_resupplied_count,
1630            MemoryPressureConfig::default(),
1631        )
1632        .await
1633        .unwrap();
1634
1635        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1636        let store_id = {
1637            let vol = volumes_directory
1638                .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1639                .await
1640                .expect("create encrypted volume failed");
1641            vol.volume().store().store_object_id()
1642        };
1643        volumes_directory.lock().await.unmount(store_id).await.expect("unmount failed");
1644
1645        let (volume_proxy, _scope) = serve_startup_volume_proxy(&volumes_directory, "encrypted");
1646
1647        let (dir_proxy, dir_server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1648
1649        let crypt_service = fxfs_crypt::CryptService::new();
1650        crypt_service
1651            .add_wrapping_key(0, fxfs_insecure_crypto::DATA_KEY.to_vec())
1652            .expect("add_wrapping_key failed");
1653        crypt_service
1654            .add_wrapping_key(1, fxfs_insecure_crypto::METADATA_KEY.to_vec())
1655            .expect("add_wrapping_key failed");
1656        crypt_service.set_active_key(KeyPurpose::Data, 0).expect("set_active_key failed");
1657        crypt_service.set_active_key(KeyPurpose::Metadata, 1).expect("set_active_key failed");
1658        let (client1, stream1) = create_request_stream();
1659        let (client2, stream2) = create_request_stream();
1660
1661        join!(
1662            async {
1663                volume_proxy
1664                    .mount(
1665                        dir_server_end,
1666                        MountOptions { crypt: Some(client1), ..MountOptions::default() },
1667                    )
1668                    .await
1669                    .expect("mount (fidl) failed")
1670                    .expect("mount failed");
1671
1672                open_file_checked(
1673                    &dir_proxy,
1674                    "root/test",
1675                    fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::FLAG_MAYBE_CREATE,
1676                    &Default::default(),
1677                )
1678                .await;
1679
1680                // Attempting to mount again should fail with ALREADY_BOUND.
1681                let (_dir_proxy, dir_server_end) =
1682                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1683
1684                assert_eq!(
1685                    Status::from_raw(
1686                        volume_proxy
1687                            .mount(
1688                                dir_server_end,
1689                                MountOptions { crypt: Some(client2), ..MountOptions::default() },
1690                            )
1691                            .await
1692                            .expect("mount (fidl) failed")
1693                            .expect_err("mount succeeded")
1694                    ),
1695                    Status::ALREADY_BOUND
1696                );
1697
1698                std::mem::drop(dir_proxy);
1699
1700                // The volume should get unmounted a short time later.
1701                let mut count = 0;
1702                loop {
1703                    if volumes_directory.mounted_volumes.lock().await.is_empty() {
1704                        break;
1705                    }
1706                    count += 1;
1707                    assert!(count <= 100);
1708                    fasync::Timer::new(Duration::from_millis(100)).await;
1709                }
1710            },
1711            async {
1712                crypt_service
1713                    .handle_request(fxfs_crypt::Services::Crypt(stream1))
1714                    .await
1715                    .expect("handle_request failed");
1716                crypt_service
1717                    .handle_request(fxfs_crypt::Services::Crypt(stream2))
1718                    .await
1719                    .expect("handle_request failed");
1720            }
1721        );
1722        // Make sure the background thread that actually calls terminate() on the volume finishes
1723        // before exiting the test. terminate() should be a no-op since we already verified
1724        // mounted_directories is empty, but the volume's terminate() future in the background task
1725        // may still be outstanding. As both the background task and VolumesDirectory::terminate()
1726        // hold the write lock, we use that to block until the background task has completed.
1727        volumes_directory.terminate().await;
1728    }
1729
1730    #[fuchsia::test]
1731    #[ignore] // TODO(b/293917849) re-enable this test when de-flaked
1732
1733    async fn test_volume_dir_races() {
1734        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1735        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1736        let blob_resupplied_count =
1737            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1738        let volumes_directory = VolumesDirectory::new(
1739            root_volume(filesystem.clone()).await.unwrap(),
1740            Weak::new(),
1741            None,
1742            blob_resupplied_count,
1743            MemoryPressureConfig::default(),
1744        )
1745        .await
1746        .unwrap();
1747
1748        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1749        let store_id = {
1750            let vol = volumes_directory
1751                .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1752                .await
1753                .expect("create encrypted volume failed");
1754            vol.volume().store().store_object_id()
1755        };
1756        volumes_directory.lock().await.unmount(store_id).await.expect("unmount failed");
1757
1758        let (volume_proxy, _scope) = serve_startup_volume_proxy(&volumes_directory, "encrypted");
1759
1760        let crypt_service = Arc::new(fxfs_crypt::CryptService::new());
1761        crypt_service
1762            .add_wrapping_key(0, fxfs_insecure_crypto::DATA_KEY.to_vec())
1763            .expect("add_wrapping_key failed");
1764        crypt_service
1765            .add_wrapping_key(1, fxfs_insecure_crypto::METADATA_KEY.to_vec())
1766            .expect("add_wrapping_key failed");
1767        crypt_service.set_active_key(KeyPurpose::Data, 0).expect("set_active_key failed");
1768        crypt_service.set_active_key(KeyPurpose::Metadata, 1).expect("set_active_key failed");
1769        let (client1, stream1) = create_request_stream();
1770        let (client2, stream2) = create_request_stream();
1771        let crypt_service_clone = crypt_service.clone();
1772        let crypt_task1 = fasync::Task::spawn(async move {
1773            crypt_service_clone
1774                .handle_request(fxfs_crypt::Services::Crypt(stream1))
1775                .await
1776                .expect("handle_request failed");
1777        });
1778        let crypt_task2 = fasync::Task::spawn(async move {
1779            crypt_service
1780                .handle_request(fxfs_crypt::Services::Crypt(stream2))
1781                .await
1782                .expect("handle_request failed");
1783        });
1784
1785        // Create two tasks each of mount and remove, and one to recreate the volume, so that we get
1786        // to exercise a wide variety of concurrent actions.
1787        // Delay remove and create a bit, since mount is slower due to FIDL.
1788        join!(
1789            async {
1790                let (_dir_proxy, dir_server_end) =
1791                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1792                if let Err(status) = volume_proxy
1793                    .mount(
1794                        dir_server_end,
1795                        MountOptions { crypt: Some(client1), ..MountOptions::default() },
1796                    )
1797                    .await
1798                    .expect("mount (fidl) failed")
1799                {
1800                    let status = Status::from_raw(status);
1801                    if status != Status::NOT_FOUND && status != Status::ALREADY_BOUND {
1802                        assert!(false, "Unexpected status {:}", status);
1803                    }
1804                }
1805            },
1806            async {
1807                let (_dir_proxy, dir_server_end) =
1808                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1809                if let Err(status) = volume_proxy
1810                    .mount(
1811                        dir_server_end,
1812                        MountOptions { crypt: Some(client2), ..MountOptions::default() },
1813                    )
1814                    .await
1815                    .expect("mount (fidl) failed")
1816                {
1817                    let status = Status::from_raw(status);
1818                    if status != Status::NOT_FOUND && status != Status::ALREADY_BOUND {
1819                        assert!(false, "Unexpected status {:}", status);
1820                    }
1821                }
1822            },
1823            async {
1824                let volumes_directory = volumes_directory.clone();
1825                let wait_time = rand::random_range(0..5);
1826                fasync::Timer::new(Duration::from_millis(wait_time)).await;
1827                if let Err(err) = volumes_directory.remove_volume("encrypted").await {
1828                    assert!(
1829                        FxfsError::NotFound.matches(&err) || FxfsError::AlreadyBound.matches(&err),
1830                        "Unexpected error {:?}",
1831                        err
1832                    );
1833                }
1834            },
1835            async {
1836                let volumes_directory = volumes_directory.clone();
1837                let wait_time = rand::random_range(0..5);
1838                fasync::Timer::new(Duration::from_millis(wait_time)).await;
1839                if let Err(err) = volumes_directory.remove_volume("encrypted").await {
1840                    assert!(
1841                        FxfsError::NotFound.matches(&err) || FxfsError::AlreadyBound.matches(&err),
1842                        "Unexpected error {:?}",
1843                        err
1844                    );
1845                }
1846            },
1847            async {
1848                let volumes_directory = volumes_directory.clone();
1849                let wait_time = rand::random_range(0..5);
1850                fasync::Timer::new(Duration::from_millis(wait_time)).await;
1851                let mut guard = volumes_directory.lock().await;
1852                match guard
1853                    .create_or_mount_volume(
1854                        "encrypted",
1855                        Some(crypt.clone()),
1856                        Mode::Create { guid: None, low_32_bit_object_ids: false },
1857                        false,
1858                    )
1859                    .await
1860                {
1861                    Ok(vol) => {
1862                        let store_id = vol.volume().store().store_object_id();
1863                        std::mem::drop(vol);
1864                        guard.unmount(store_id).await.expect("unmount failed");
1865                    }
1866                    Err(err) => {
1867                        assert!(
1868                            FxfsError::AlreadyExists.matches(&err)
1869                                || FxfsError::AlreadyBound.matches(&err),
1870                            "Unexpected error {:?}",
1871                            err
1872                        );
1873                    }
1874                }
1875            }
1876        );
1877        std::mem::drop(crypt_task1);
1878        std::mem::drop(crypt_task2);
1879        // Make sure the background thread that actually calls terminate() on the volume finishes
1880        // before exiting the test. terminate() should be a no-op since we already verified
1881        // mounted_directories is empty, but the volume's terminate() future in the background task
1882        // may still be outstanding. As both the background task and VolumesDirectory::terminate()
1883        // hold the write lock, we use that to block until the background task has completed.
1884        volumes_directory.terminate().await;
1885    }
1886
1887    #[fuchsia::test]
1888    async fn test_shutdown_volume() {
1889        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1890        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1891        let blob_resupplied_count =
1892            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1893        let volumes_directory = VolumesDirectory::new(
1894            root_volume(filesystem.clone()).await.unwrap(),
1895            Weak::new(),
1896            None,
1897            blob_resupplied_count,
1898            MemoryPressureConfig::default(),
1899        )
1900        .await
1901        .unwrap();
1902
1903        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1904        let vol = volumes_directory
1905            .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1906            .await
1907            .expect("create encrypted volume failed");
1908
1909        let (dir_proxy, dir_server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1910
1911        volumes_directory.serve_volume(&vol, dir_server_end, false).expect("serve_volume failed");
1912
1913        let admin_proxy = connect_to_protocol_at_dir_svc::<AdminMarker>(&dir_proxy)
1914            .expect("Unable to connect to admin service");
1915
1916        admin_proxy.shutdown().await.expect("shutdown failed");
1917
1918        assert!(volumes_directory.mounted_volumes.lock().await.is_empty());
1919    }
1920
1921    #[fuchsia::test]
1922    async fn test_byte_limit_persistence() {
1923        const BYTES_LIMIT_1: u64 = 123456;
1924        const BYTES_LIMIT_2: u64 = 456789;
1925        const VOLUME_NAME: &str = "A";
1926        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
1927        {
1928            let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1929            let blob_resupplied_count =
1930                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1931            let volumes_directory = VolumesDirectory::new(
1932                root_volume(filesystem.clone()).await.unwrap(),
1933                Weak::new(),
1934                None,
1935                blob_resupplied_count,
1936                MemoryPressureConfig::default(),
1937            )
1938            .await
1939            .unwrap();
1940
1941            volumes_directory
1942                .create_and_mount_volume(VOLUME_NAME, None, false, None)
1943                .await
1944                .expect("create unencrypted volume failed");
1945
1946            let (volume_proxy, _scope) =
1947                serve_startup_volume_proxy(&volumes_directory, VOLUME_NAME);
1948
1949            volume_proxy.set_limit(BYTES_LIMIT_1).await.unwrap().expect("To set limits");
1950            {
1951                let limits = (filesystem.allocator() as Arc<Allocator>).owner_byte_limits();
1952                assert_eq!(limits.len(), 1);
1953                assert_eq!(limits[0].1, BYTES_LIMIT_1);
1954            }
1955
1956            volume_proxy.set_limit(BYTES_LIMIT_2).await.unwrap().expect("To set limits");
1957            {
1958                let limits = (filesystem.allocator() as Arc<Allocator>).owner_byte_limits();
1959                assert_eq!(limits.len(), 1);
1960                assert_eq!(limits[0].1, BYTES_LIMIT_2);
1961            }
1962            std::mem::drop(volume_proxy);
1963            volumes_directory.terminate().await;
1964            std::mem::drop(volumes_directory);
1965            filesystem.close().await.expect("close filesystem failed");
1966            device = filesystem.take_device().await;
1967        }
1968        device.ensure_unique();
1969        device.reopen(false);
1970        {
1971            let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1972            fsck(filesystem.clone()).await.expect("Fsck");
1973            let blob_resupplied_count =
1974                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1975            let volumes_directory = VolumesDirectory::new(
1976                root_volume(filesystem.clone()).await.unwrap(),
1977                Weak::new(),
1978                None,
1979                blob_resupplied_count,
1980                MemoryPressureConfig::default(),
1981            )
1982            .await
1983            .unwrap();
1984            {
1985                let limits = (filesystem.allocator() as Arc<Allocator>).owner_byte_limits();
1986                assert_eq!(limits.len(), 1);
1987                assert_eq!(limits[0].1, BYTES_LIMIT_2);
1988            }
1989            volumes_directory.remove_volume(VOLUME_NAME).await.expect("Volume deletion failed");
1990            {
1991                let limits = (filesystem.allocator() as Arc<Allocator>).owner_byte_limits();
1992                assert_eq!(limits.len(), 0);
1993            }
1994            volumes_directory.terminate().await;
1995            std::mem::drop(volumes_directory);
1996            filesystem.close().await.expect("close filesystem failed");
1997            device = filesystem.take_device().await;
1998        }
1999        device.ensure_unique();
2000        device.reopen(false);
2001        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2002        fsck(filesystem.clone()).await.expect("Fsck");
2003        let limits = (filesystem.allocator() as Arc<Allocator>).owner_byte_limits();
2004        assert_eq!(limits.len(), 0);
2005    }
2006
2007    struct VolumeInfo {
2008        _scope: vfs::ExecutionScope,
2009        volume_proxy: VolumeProxy,
2010        file_proxy: fio::FileProxy,
2011    }
2012
2013    impl VolumeInfo {
2014        async fn new(volumes_directory: &Arc<VolumesDirectory>, name: &'static str) -> Self {
2015            let volume = volumes_directory
2016                .create_and_mount_volume(name, None, false, None)
2017                .await
2018                .expect("create unencrypted volume failed");
2019
2020            let (volume_dir_proxy, dir_server_end) =
2021                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2022            volumes_directory
2023                .serve_volume(&volume, dir_server_end, false)
2024                .expect("serve_volume failed");
2025
2026            let (volume_proxy, _scope) = serve_startup_volume_proxy(&volumes_directory, name);
2027
2028            let (root_proxy, root_server_end) =
2029                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2030            volume_dir_proxy
2031                .open(
2032                    "root",
2033                    fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
2034                    &Default::default(),
2035                    root_server_end.into_channel(),
2036                )
2037                .expect("Failed to open volume root");
2038
2039            let file_proxy = open_file_checked(
2040                &root_proxy,
2041                "foo",
2042                fio::Flags::FLAG_MAYBE_CREATE
2043                    | fio::PERM_READABLE
2044                    | fio::PERM_WRITABLE
2045                    | fio::Flags::PROTOCOL_FILE,
2046                &Default::default(),
2047            )
2048            .await;
2049            VolumeInfo { _scope, volume_proxy, file_proxy }
2050        }
2051    }
2052
2053    #[fuchsia::test]
2054    async fn test_limit_bytes() {
2055        const BYTES_LIMIT: u64 = 262_144; // 256KiB
2056        const BLOCK_SIZE: usize = 8192; // 8KiB
2057        let device = DeviceHolder::new(FakeDevice::new(BLOCK_SIZE.try_into().unwrap(), 512));
2058        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2059        let blob_resupplied_count =
2060            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2061        let volumes_directory = VolumesDirectory::new(
2062            root_volume(filesystem.clone()).await.unwrap(),
2063            Weak::new(),
2064            None,
2065            blob_resupplied_count,
2066            MemoryPressureConfig::default(),
2067        )
2068        .await
2069        .unwrap();
2070
2071        let vol = VolumeInfo::new(&volumes_directory, "foo").await;
2072        let old_info = {
2073            let (status, info) = vol.file_proxy.query_filesystem().await.expect("Getting fs info");
2074            assert_eq!(status, zx::Status::OK.into_raw());
2075            let info = info.unwrap();
2076            // With no limit set, the total filesystem size should be returned.
2077            assert!(info.total_bytes > BYTES_LIMIT);
2078            info
2079        };
2080
2081        vol.volume_proxy.set_limit(BYTES_LIMIT).await.unwrap().expect("To set limits");
2082        {
2083            let (status, info) = vol.file_proxy.query_filesystem().await.expect("Getting fs info");
2084            assert!(status == zx::Status::OK.into_raw());
2085            let new_info = info.unwrap();
2086            assert_eq!(new_info.total_bytes, BYTES_LIMIT);
2087            // Now since the limit is the volume limit, the space used should be the volume usage,
2088            // which should be strictly less than the filesystem.
2089            assert!(new_info.used_bytes < old_info.used_bytes);
2090        }
2091
2092        let zeros = vec![0u8; BLOCK_SIZE];
2093        // First write should succeed.
2094        assert_eq!(
2095            <u64 as TryInto<usize>>::try_into(
2096                vol.file_proxy
2097                    .write(&zeros)
2098                    .await
2099                    .expect("Failed Write message")
2100                    .expect("Failed write")
2101            )
2102            .unwrap(),
2103            BLOCK_SIZE
2104        );
2105        // Likely to run out of space before writing the full limit due to overheads.
2106        for _ in (BLOCK_SIZE..BYTES_LIMIT as usize).step_by(BLOCK_SIZE) {
2107            match vol.file_proxy.write(&zeros).await.expect("Failed Write message") {
2108                Err(_) => break,
2109                Ok(b) if b < BLOCK_SIZE.try_into().unwrap() => break,
2110                _ => (),
2111            };
2112        }
2113
2114        // Any further writes should fail with out of space.
2115        assert_eq!(
2116            vol.file_proxy
2117                .write(&zeros)
2118                .await
2119                .expect("Failed write message")
2120                .expect_err("Write should have been limited"),
2121            Status::NO_SPACE.into_raw()
2122        );
2123
2124        // Double the limit and try again. We should have write space again.
2125        vol.volume_proxy.set_limit(BYTES_LIMIT * 2).await.unwrap().expect("To set limits");
2126        assert_eq!(
2127            <u64 as TryInto<usize>>::try_into(
2128                vol.file_proxy
2129                    .write(&zeros)
2130                    .await
2131                    .expect("Failed Write message")
2132                    .expect("Failed write")
2133            )
2134            .unwrap(),
2135            BLOCK_SIZE
2136        );
2137
2138        vol.file_proxy.close().await.unwrap().expect("Failed to close file");
2139        volumes_directory.terminate().await;
2140        std::mem::drop(volumes_directory);
2141        filesystem.close().await.expect("close filesystem failed");
2142    }
2143
2144    #[fuchsia::test]
2145    async fn test_limit_bytes_two_hit_device_limit() {
2146        const BYTES_LIMIT: u64 = 3_145_728; // 3MiB
2147        const BLOCK_SIZE: usize = 8192; // 8KiB
2148        const BLOCK_COUNT: u32 = 512;
2149        let device =
2150            DeviceHolder::new(FakeDevice::new(BLOCK_SIZE.try_into().unwrap(), BLOCK_COUNT));
2151        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2152        let blob_resupplied_count =
2153            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2154        let volumes_directory = VolumesDirectory::new(
2155            root_volume(filesystem.clone()).await.unwrap(),
2156            Weak::new(),
2157            None,
2158            blob_resupplied_count,
2159            MemoryPressureConfig::default(),
2160        )
2161        .await
2162        .unwrap();
2163
2164        let a = VolumeInfo::new(&volumes_directory, "foo").await;
2165        let b = VolumeInfo::new(&volumes_directory, "bar").await;
2166        a.volume_proxy.set_limit(BYTES_LIMIT).await.unwrap().expect("To set limits");
2167        b.volume_proxy.set_limit(BYTES_LIMIT).await.unwrap().expect("To set limits");
2168        let mut a_written: u64 = 0;
2169        let mut b_written: u64 = 0;
2170
2171        // Write chunks of BLOCK_SIZE.
2172        let zeros = vec![0u8; BLOCK_SIZE];
2173
2174        // First write should succeed for both.
2175        assert_eq!(
2176            <u64 as TryInto<usize>>::try_into(
2177                a.file_proxy
2178                    .write(&zeros)
2179                    .await
2180                    .expect("Failed Write message")
2181                    .expect("Failed write")
2182            )
2183            .unwrap(),
2184            BLOCK_SIZE
2185        );
2186        a_written += BLOCK_SIZE as u64;
2187        assert_eq!(
2188            <u64 as TryInto<usize>>::try_into(
2189                b.file_proxy
2190                    .write(&zeros)
2191                    .await
2192                    .expect("Failed Write message")
2193                    .expect("Failed write")
2194            )
2195            .unwrap(),
2196            BLOCK_SIZE
2197        );
2198        b_written += BLOCK_SIZE as u64;
2199
2200        // Likely to run out of space before writing the full limit due to overheads.
2201        for _ in (BLOCK_SIZE..BYTES_LIMIT as usize).step_by(BLOCK_SIZE) {
2202            match a.file_proxy.write(&zeros).await.expect("Failed Write message") {
2203                Err(_) => break,
2204                Ok(bytes) => {
2205                    a_written += bytes;
2206                    if bytes < BLOCK_SIZE.try_into().unwrap() {
2207                        break;
2208                    }
2209                }
2210            };
2211        }
2212        // Any further writes should fail with out of space.
2213        assert_eq!(
2214            a.file_proxy
2215                .write(&zeros)
2216                .await
2217                .expect("Failed write message")
2218                .expect_err("Write should have been limited"),
2219            Status::NO_SPACE.into_raw()
2220        );
2221
2222        // Now write to the second volume. Likely to run out of space before writing the full limit
2223        // due to overheads.
2224        for _ in (BLOCK_SIZE..BYTES_LIMIT as usize).step_by(BLOCK_SIZE) {
2225            match b.file_proxy.write(&zeros).await.expect("Failed Write message") {
2226                Err(_) => break,
2227                Ok(bytes) => {
2228                    b_written += bytes;
2229                    if bytes < BLOCK_SIZE.try_into().unwrap() {
2230                        break;
2231                    }
2232                }
2233            };
2234        }
2235        // Any further writes should fail with out of space.
2236        assert_eq!(
2237            b.file_proxy
2238                .write(&zeros)
2239                .await
2240                .expect("Failed write message")
2241                .expect_err("Write should have been limited"),
2242            Status::NO_SPACE.into_raw()
2243        );
2244
2245        // Second volume should have failed very early.
2246        assert!(BLOCK_SIZE as u64 * BLOCK_COUNT as u64 - BYTES_LIMIT >= b_written);
2247        // First volume should have gotten further.
2248        assert!(BLOCK_SIZE as u64 * BLOCK_COUNT as u64 - BYTES_LIMIT <= a_written);
2249
2250        a.file_proxy.close().await.unwrap().expect("Failed to close file");
2251        b.file_proxy.close().await.unwrap().expect("Failed to close file");
2252        volumes_directory.terminate().await;
2253        std::mem::drop(volumes_directory);
2254        filesystem.close().await.expect("close filesystem failed");
2255    }
2256
2257    #[fuchsia::test(threads = 10)]
2258    async fn test_profile_start() {
2259        const PREMOUNT_BLOB: &str = "premount_blob";
2260        const PREMOUNT_NOBLOB: &str = "premount_noblob";
2261        const LIVE_BLOB: &str = "live_blob";
2262        const LIVE_NOBLOB: &str = "live_noblob";
2263
2264        const RECORDING_NAME: &str = "foo";
2265
2266        let device = {
2267            let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2268            let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2269            let blob_resupplied_count =
2270                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2271            let volumes_directory = VolumesDirectory::new(
2272                root_volume(filesystem.clone()).await.unwrap(),
2273                Weak::new(),
2274                None,
2275                blob_resupplied_count,
2276                MemoryPressureConfig::default(),
2277            )
2278            .await
2279            .unwrap();
2280            volumes_directory
2281                .create_and_mount_volume(PREMOUNT_BLOB, None, true, None)
2282                .await
2283                .unwrap();
2284            volumes_directory
2285                .create_and_mount_volume(PREMOUNT_NOBLOB, None, false, None)
2286                .await
2287                .unwrap();
2288            volumes_directory.create_and_mount_volume(LIVE_BLOB, None, true, None).await.unwrap();
2289            volumes_directory
2290                .create_and_mount_volume(LIVE_NOBLOB, None, false, None)
2291                .await
2292                .unwrap();
2293
2294            volumes_directory.terminate().await;
2295            std::mem::drop(volumes_directory);
2296            filesystem.close().await.expect("Filesystem close");
2297            filesystem.take_device().await
2298        };
2299
2300        device.ensure_unique();
2301        device.reopen(false);
2302        let device = {
2303            let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2304            let blob_resupplied_count =
2305                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2306            let volumes_directory = VolumesDirectory::new(
2307                root_volume(filesystem.clone()).await.unwrap(),
2308                Weak::new(),
2309                None,
2310                blob_resupplied_count,
2311                MemoryPressureConfig::default(),
2312            )
2313            .await
2314            .unwrap();
2315
2316            // Premount two volumes.
2317            let _premount_blob = volumes_directory
2318                .mount_volume(PREMOUNT_BLOB, None, true)
2319                .await
2320                .expect("Reopen volume");
2321            let _premount_noblob = volumes_directory
2322                .mount_volume(PREMOUNT_NOBLOB, None, false)
2323                .await
2324                .expect("Reopen volume");
2325
2326            // Start the recording, let it run a really long time, it doesn't need to end for this
2327            // test. If it does wait this long then it should trigger test timeouts.
2328            volumes_directory
2329                .clone()
2330                .record_and_replay_profile(None, RECORDING_NAME.to_owned(), 600)
2331                .await
2332                .expect("Recording");
2333
2334            // Live mount two volumes.
2335            let _live_blob =
2336                volumes_directory.mount_volume(LIVE_BLOB, None, true).await.expect("Reopen volume");
2337            let _live_noblob = volumes_directory
2338                .mount_volume(LIVE_NOBLOB, None, false)
2339                .await
2340                .expect("Reopen volume");
2341
2342            // Wait for the recordings to finish.
2343            volumes_directory.stop_profile_tasks().await;
2344
2345            volumes_directory.terminate().await;
2346            std::mem::drop(volumes_directory);
2347            filesystem.close().await.expect("Filesystem close");
2348            filesystem.take_device().await
2349        };
2350
2351        device.ensure_unique();
2352        device.reopen(false);
2353        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2354        {
2355            let blob_resupplied_count =
2356                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2357            let volumes_directory = VolumesDirectory::new(
2358                root_volume(filesystem.clone()).await.unwrap(),
2359                Weak::new(),
2360                None,
2361                blob_resupplied_count,
2362                MemoryPressureConfig::default(),
2363            )
2364            .await
2365            .unwrap();
2366
2367            let _premount_blob = volumes_directory
2368                .mount_volume(PREMOUNT_BLOB, None, true)
2369                .await
2370                .expect("Reopen volume");
2371            let _premount_noblob = volumes_directory
2372                .mount_volume(PREMOUNT_NOBLOB, None, false)
2373                .await
2374                .expect("Reopen volume");
2375            let _live_blob =
2376                volumes_directory.mount_volume(LIVE_BLOB, None, true).await.expect("Reopen volume");
2377            let _live_noblob = volumes_directory
2378                .mount_volume(LIVE_NOBLOB, None, false)
2379                .await
2380                .expect("Reopen volume");
2381
2382            // Verify which recordings ran based on the saved recordings.
2383            volumes_directory
2384                .delete_profile(PREMOUNT_BLOB, RECORDING_NAME)
2385                .await
2386                .expect("Finding profile to delete.");
2387            volumes_directory
2388                .delete_profile(PREMOUNT_NOBLOB, RECORDING_NAME)
2389                .await
2390                .expect("Finding profile to delete.");
2391            volumes_directory
2392                .delete_profile(LIVE_BLOB, RECORDING_NAME)
2393                .await
2394                .expect("Finding profile to delete.");
2395            volumes_directory
2396                .delete_profile(LIVE_NOBLOB, RECORDING_NAME)
2397                .await
2398                .expect("Finding profile to delete.");
2399
2400            volumes_directory.terminate().await;
2401        }
2402
2403        filesystem.close().await.expect("Filesystem close");
2404    }
2405
2406    #[fuchsia::test(threads = 10)]
2407    async fn test_profile_stop() {
2408        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2409        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2410        let blob_resupplied_count =
2411            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2412        let volumes_directory = VolumesDirectory::new(
2413            root_volume(filesystem.clone()).await.unwrap(),
2414            Weak::new(),
2415            None,
2416            blob_resupplied_count,
2417            MemoryPressureConfig::default(),
2418        )
2419        .await
2420        .unwrap();
2421        let volume =
2422            volumes_directory.create_and_mount_volume("foo", None, true, None).await.unwrap();
2423
2424        // Run the recording with no time at all and ensure that it still shuts down properly.
2425        volumes_directory
2426            .clone()
2427            .record_and_replay_profile(None, "foo".to_owned(), 0)
2428            .await
2429            .expect("Recording");
2430
2431        // Delete will succeed once the profile is completed.
2432        while volumes_directory.delete_profile("foo", "foo").await.is_err() {
2433            fasync::Timer::new(Duration::from_millis(10)).await;
2434        }
2435
2436        std::mem::drop(volume);
2437        volumes_directory.terminate().await;
2438        std::mem::drop(volumes_directory);
2439        filesystem.close().await.expect("Filesystem close");
2440    }
2441
2442    #[fuchsia::test(threads = 10)]
2443    async fn test_delete_profile() {
2444        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2445        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2446        let blob_resupplied_count =
2447            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2448        let volumes_directory = VolumesDirectory::new(
2449            root_volume(filesystem.clone()).await.unwrap(),
2450            Weak::new(),
2451            None,
2452            blob_resupplied_count,
2453            MemoryPressureConfig::default(),
2454        )
2455        .await
2456        .unwrap();
2457        let volume =
2458            volumes_directory.create_and_mount_volume("foo", None, true, None).await.unwrap();
2459
2460        volumes_directory
2461            .clone()
2462            .record_and_replay_profile(None, "foo".to_owned(), 600)
2463            .await
2464            .expect("Recording");
2465
2466        // Deletion fails during in-flight recording.
2467        assert_eq!(
2468            volumes_directory.delete_profile("foo", "foo").await.expect_err("File shouldn't exist"),
2469            Status::SHOULD_WAIT
2470        );
2471
2472        volumes_directory.stop_profile_tasks().await;
2473
2474        // Missing volume name.
2475        assert_eq!(
2476            volumes_directory.delete_profile("bar", "foo").await.expect_err("File shouldn't exist"),
2477            Status::NOT_FOUND
2478        );
2479
2480        // Missing Profile name.
2481        assert_eq!(
2482            volumes_directory.delete_profile("foo", "bar").await.expect_err("File shouldn't exist"),
2483            Status::NOT_FOUND
2484        );
2485
2486        // Deletion should now succeed as the profile will be placed as part of `finish_profiling()`
2487        volumes_directory.delete_profile("foo", "foo").await.expect("Deleting");
2488
2489        // Deletion fails as the file shouldn't exist anymore.
2490        assert_eq!(
2491            volumes_directory.delete_profile("foo", "foo").await.expect_err("File shouldn't exist"),
2492            Status::NOT_FOUND
2493        );
2494
2495        std::mem::drop(volume);
2496        volumes_directory.terminate().await;
2497        std::mem::drop(volumes_directory);
2498        filesystem.close().await.expect("Filesystem close");
2499    }
2500
2501    #[fuchsia::test(threads = 10)]
2502    async fn test_profile_start_single_volume() {
2503        const TEST_VOLUME: &str = "test_1234";
2504        const TEST_RECORDING: &str = "test_5678";
2505        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
2506
2507        let fixture = TestFixture::new().await;
2508        {
2509            let volumes_directory = fixture.volumes_directory();
2510            // Start recording for volume that doesn't exist.
2511            assert_eq!(
2512                volumes_directory
2513                    .record_and_replay_profile(
2514                        Some(TEST_VOLUME.to_owned()),
2515                        TEST_RECORDING.to_owned(),
2516                        1
2517                    )
2518                    .await
2519                    .expect_err("Volumes doesn't exist yet"),
2520                Status::NOT_FOUND
2521            );
2522
2523            // Create the volume unmounted.
2524            {
2525                let volume = volumes_directory
2526                    .create_and_mount_volume(TEST_VOLUME, Some(crypt.clone()), false, None)
2527                    .await
2528                    .unwrap();
2529                volumes_directory
2530                    .lock()
2531                    .await
2532                    .unmount(volume.volume().store().store_object_id())
2533                    .await
2534                    .expect("unmount failed");
2535            }
2536
2537            // Start recording for volume that is not mounted.
2538            assert_eq!(
2539                volumes_directory
2540                    .record_and_replay_profile(
2541                        Some(TEST_VOLUME.to_owned()),
2542                        TEST_RECORDING.to_owned(),
2543                        1
2544                    )
2545                    .await
2546                    .expect_err("Volumes doesn't exist yet"),
2547                Status::UNAVAILABLE
2548            );
2549
2550            // Remount the volume and try again.
2551            let volume = volumes_directory
2552                .mount_volume(TEST_VOLUME, Some(crypt.clone()), false)
2553                .await
2554                .expect("Remount volume");
2555            volumes_directory
2556                .record_and_replay_profile(
2557                    Some(TEST_VOLUME.to_owned()),
2558                    TEST_RECORDING.to_owned(),
2559                    1,
2560                )
2561                .await
2562                .expect("Starting recording");
2563
2564            // Stop the recording and check that it was created on the new volume.
2565            volumes_directory.stop_profile_tasks().await;
2566            {
2567                let profile_dir = volume.volume().get_profile_directory().await.unwrap();
2568                assert!(profile_dir.lookup(TEST_RECORDING).await.unwrap().is_some());
2569            }
2570
2571            // Should be no recording for the other volume.
2572            {
2573                let profile_dir = fixture.volume().volume().get_profile_directory().await.unwrap();
2574                assert!(profile_dir.lookup(TEST_RECORDING).await.unwrap().is_none());
2575            }
2576        }
2577        fixture.close().await;
2578    }
2579
2580    #[fuchsia::test(threads = 10)]
2581    async fn test_delete_volume_while_flushing() {
2582        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2583        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2584        let blob_resupplied_count =
2585            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2586        let volumes_directory = VolumesDirectory::new(
2587            root_volume(filesystem.clone()).await.unwrap(),
2588            Weak::new(),
2589            None,
2590            blob_resupplied_count,
2591            MemoryPressureConfig::default(),
2592        )
2593        .await
2594        .unwrap();
2595        let name = "vol";
2596        let volume =
2597            volumes_directory.create_and_mount_volume(name, None, false, None).await.unwrap();
2598        let mut transaction = filesystem
2599            .root_store()
2600            .new_transaction(
2601                lock_keys![LockKey::object(
2602                    volume.volume().store().store_object_id(),
2603                    volume.root_dir().directory().object_id()
2604                )],
2605                Options::default(),
2606            )
2607            .await
2608            .unwrap();
2609        volume
2610            .root_dir()
2611            .directory()
2612            .create_child_file(&mut transaction, "foo")
2613            .await
2614            .expect("create_child_file failed");
2615        transaction.commit().await.expect("commit failed");
2616        volumes_directory
2617            .lock()
2618            .await
2619            .unmount(volume.volume().store().store_object_id())
2620            .await
2621            .expect("unmount failed");
2622
2623        let filesystem_clone = filesystem.clone();
2624        let filesystem_clone2 = filesystem.clone();
2625        let volumes_directory_clone1 = volumes_directory.clone();
2626        let volumes_directory_clone2 = volumes_directory.clone();
2627        let root_store_object_id = filesystem.root_store().store_object_id();
2628        let store_info_object_id = volume.volume().store().store_info_handle_object_id().unwrap();
2629        join!(
2630            async move {
2631                // Take a lock that the EndFlush transaction requires, so we interleave removing
2632                // the volume between StartFlush and EndFlush.  Release it on a timer since once the
2633                // volume is deleted, `remove_volume` needs to take this lock as well to tombstone
2634                // the store info.
2635                let _guard = filesystem_clone2
2636                    .lock_manager()
2637                    .read_lock(lock_keys![LockKey::object(
2638                        root_store_object_id,
2639                        store_info_object_id,
2640                    )])
2641                    .await;
2642                fasync::Timer::new(Duration::from_millis(200)).await;
2643            },
2644            async move {
2645                filesystem_clone.journal().force_compact().await.expect("Compact failed");
2646            },
2647            async move {
2648                if let Err(e) = volumes_directory_clone1.remove_volume(name).await {
2649                    if !FxfsError::NotFound.matches(&e) {
2650                        panic!("remove_volume failed: {e:?}");
2651                    }
2652                }
2653            },
2654            async move {
2655                if let Err(e) = volumes_directory_clone2.remove_volume(name).await {
2656                    if !FxfsError::NotFound.matches(&e) {
2657                        panic!("remove_volume failed: {e:?}");
2658                    }
2659                }
2660            },
2661        );
2662        volumes_directory.terminate().await;
2663        std::mem::drop(volumes_directory);
2664        filesystem.close().await.expect("Filesystem close");
2665    }
2666
2667    // This mostly just ensures that we exercise the code path. It was added because the path at
2668    // one point contained a deadlock.
2669    #[fuchsia::test(threads = 10)]
2670    async fn test_flush_before_mark_dirty_under_critical_memory_pressure() {
2671        let fixture = TestFixture::new().await;
2672        // Memory is critical, and it's always our fault.
2673        let _ = fixture
2674            .memory_pressure_proxy()
2675            .on_level_changed(MemoryPressureLevel::Critical)
2676            .await
2677            .expect("memory pressure FIDL");
2678        fixture.volumes_directory().max_dirty_bytes_when_critical.store(1, Ordering::Relaxed);
2679
2680        let root = fixture.root();
2681        let file = open_file_checked(
2682            &root,
2683            "foo",
2684            fio::Flags::FLAG_MAYBE_CREATE
2685                | fio::PERM_READABLE
2686                | fio::PERM_WRITABLE
2687                | fio::Flags::PROTOCOL_FILE,
2688            &Default::default(),
2689        )
2690        .await;
2691
2692        file.resize((zx::system_get_page_size() * 2).into())
2693            .await
2694            .expect("resize (FIDL)")
2695            .expect("resize failed");
2696        // The above resize creates zero pages which aren't dirty pages and don't contribute to the
2697        // dirty bytes count but still need to be flushed. The first write below will cross the
2698        // `max_dirty_bytes_when_critical` threshold but `minimize_memory` won't flush the file
2699        // because the zero pages aren't dirty pages. The second write below will also cross the
2700        // `max_dirty_bytes_when_critical` threshold and since the first write dirtied a page, a
2701        // flush will occur. The flush cleans the 2nd page which is a zero page and the kernel is
2702        // actively trying to dirty. The kernel doesn't end up dirtying the 2nd page because it's
2703        // concerned that the clean and dirty raced so it issues another dirty request for the same
2704        // page. The duplicate dirty request again crosses the `max_dirty_bytes_when_critical`
2705        // threshold. The file thinks that it has a dirty page so `minimize_memory` flushes the
2706        // file. `zx_pager_query_dirty_ranges` doesn't return any pages because the kernel didn't
2707        // actually dirty the page that fxfs thought it did. This causes only the file metadata to
2708        // be flushed and the dirty page count to not be reduced. The 2nd page finally gets dirtied
2709        // and now fxfs thinks it has 2 dirty pages instead of 1. `PagedObjectHandle` knows that
2710        // this can happen and will fix up the counts on the next flush. This test doesn't want to
2711        // deal with all of that so it syncs here to clean the zero pages that would cause that
2712        // mess.
2713        file.sync().await.expect("Failed to make sync call").expect("sync failed");
2714
2715        let vmo = file
2716            .get_backing_memory(fio::VmoFlags::READ | fio::VmoFlags::WRITE)
2717            .await
2718            .expect("get_backing_memory (FIDL)")
2719            .expect("get_backing_memory");
2720
2721        let buf = [0xAAu8];
2722        // One call to get dirty bytes over 0, the second to force a flush during mark_dirty.
2723        vmo.write(&buf, 0).expect("Writing to create dirty bytes");
2724        let before = fixture.volumes_directory().pager_dirty_bytes_count.load();
2725        vmo.write(&buf, zx::system_get_page_size().into())
2726            .expect("Writing to force a flush during mark_dirty");
2727        // This is still the page size because we forced a flush of the first write during the
2728        // second write.
2729        assert_eq!(fixture.volumes_directory().pager_dirty_bytes_count.load(), before,);
2730
2731        fixture.close().await;
2732    }
2733
2734    #[fuchsia::test(threads = 10)]
2735    async fn test_delete_crypt_for_volume() {
2736        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2737        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2738        let store_id;
2739        {
2740            let blob_resupplied_count =
2741                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2742            let volumes_directory = VolumesDirectory::new(
2743                root_volume(filesystem.clone()).await.unwrap(),
2744                Weak::new(),
2745                None,
2746                blob_resupplied_count,
2747                MemoryPressureConfig::default(),
2748            )
2749            .await
2750            .unwrap();
2751            let name = "vol";
2752            let crypt = Arc::new(new_insecure_crypt());
2753            let volume = volumes_directory
2754                .create_and_mount_volume(name, Some(crypt.clone()), false, None)
2755                .await
2756                .unwrap();
2757            store_id = volume.volume().store().store_object_id();
2758            // Make sure the volume has some journaled mutations.
2759            let mut transaction = filesystem
2760                .root_store()
2761                .new_transaction(
2762                    lock_keys![LockKey::object(
2763                        volume.volume().store().store_object_id(),
2764                        volume.root_dir().directory().object_id()
2765                    )],
2766                    Options::default(),
2767                )
2768                .await
2769                .unwrap();
2770            volume
2771                .root_dir()
2772                .directory()
2773                .create_child_file(&mut transaction, "foo")
2774                .await
2775                .expect("create_child_file failed");
2776            transaction.commit().await.expect("commit failed");
2777
2778            let (volume_dir_proxy, dir_server_end) =
2779                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2780            volumes_directory
2781                .serve_volume(&volume, dir_server_end, false)
2782                .expect("serve_volume failed");
2783            let (root_dir, root_server_end) =
2784                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2785            volume_dir_proxy
2786                .open(
2787                    "root",
2788                    fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
2789                    &Default::default(),
2790                    root_server_end.into_channel(),
2791                )
2792                .expect("Failed to open volume root");
2793
2794            let filesystem_clone = filesystem.clone();
2795            join!(
2796                async move {
2797                    filesystem_clone.journal().force_compact().await.expect("Compact failed");
2798                },
2799                async move {
2800                    let mut i = 0;
2801                    while let Ok(_) = fuchsia_fs::directory::open_file(
2802                        &root_dir,
2803                        &format!("foo{i}"),
2804                        fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE,
2805                    )
2806                    .await
2807                    {
2808                        i += 1;
2809                    }
2810                },
2811                async move {
2812                    crypt.shutdown();
2813                },
2814            );
2815
2816            // Make sure we can still ask the volume to shutdown.
2817            let (admin_proxy, server_end) =
2818                fidl::endpoints::create_proxy::<fidl_fuchsia_fs::AdminMarker>();
2819            volume_dir_proxy
2820                .open(
2821                    &format!("svc/{}", fidl_fuchsia_fs::AdminMarker::PROTOCOL_NAME),
2822                    fio::Flags::PROTOCOL_SERVICE,
2823                    &Default::default(),
2824                    server_end.into(),
2825                )
2826                .expect("Failed to open Admin connection");
2827            admin_proxy.shutdown().await.expect("shutdown failed");
2828
2829            volumes_directory.terminate().await;
2830        }
2831        filesystem.close().await.expect("Filesystem close");
2832        let device = filesystem.take_device().await;
2833        device.reopen(false);
2834        let filesystem = FxFilesystem::open(device).await.expect("open failed");
2835        let options = FsckOptions { fail_on_warning: true, ..Default::default() };
2836        fsck_with_options(filesystem.clone(), &options).await.expect("fsck failed");
2837        fsck_volume_with_options(
2838            filesystem.as_ref(),
2839            &options,
2840            store_id,
2841            Some(Arc::new(new_insecure_crypt())),
2842        )
2843        .await
2844        .expect("fsck_volume failed");
2845        filesystem.close().await.expect("Filesystem close");
2846    }
2847
2848    /// Tests writing a partition image and installing a volume contained within.
2849    #[fuchsia::test(threads = 10)]
2850    async fn test_volume_installation() {
2851        let fixture = TestFixture::open(
2852            DeviceHolder::new(FakeDevice::new(1024, 4096)),
2853            TestFixtureOptions { format: true, encrypted: false, ..Default::default() },
2854        )
2855        .await;
2856
2857        // Create a file "foo" in the existing volume "vol". This should be gone after installation.
2858        {
2859            let file = open_file_checked(
2860                fixture.root(),
2861                "foo",
2862                fio::Flags::PROTOCOL_FILE | fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2863                &Default::default(),
2864            )
2865            .await;
2866            file.write("Hello, world!".as_bytes()).await.unwrap().expect("write failed");
2867        };
2868
2869        // Create another in-memory partition image with a different set of files.
2870        let image = {
2871            let inner_fixture = TestFixture::open(
2872                DeviceHolder::new(FakeDevice::new(512, 4096)),
2873                TestFixtureOptions { format: true, encrypted: false, ..Default::default() },
2874            )
2875            .await;
2876            let file = open_file_checked(
2877                inner_fixture.root(),
2878                "bar",
2879                fio::Flags::PROTOCOL_FILE | fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2880                &Default::default(),
2881            )
2882            .await;
2883            file.write("Well, this is new...".as_bytes()).await.unwrap().expect("write failed");
2884            file.close().await.unwrap().expect("close error");
2885            inner_fixture.close().await
2886        };
2887
2888        // Write the partition image to a file "install" in a new volume called "src".
2889        {
2890            let (src_out_dir, server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2891            fixture
2892                .volumes_directory()
2893                .create_and_serve_volume("src", server_end, Default::default(), Default::default())
2894                .await
2895                .unwrap();
2896            let src_root = open_dir_checked(
2897                &src_out_dir,
2898                "root",
2899                fio::PERM_READABLE | fio::PERM_WRITABLE,
2900                Default::default(),
2901            )
2902            .await;
2903            let file = open_file_checked(
2904                &src_root,
2905                "image",
2906                fio::Flags::PROTOCOL_FILE | fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2907                &Default::default(),
2908            )
2909            .await;
2910            write_image_to_file(image, file).await;
2911        };
2912
2913        // Installation should not be possible yet since both volumes are still mounted.
2914        assert!(
2915            fixture.volumes_directory().install_volume("src", "image", "vol").await.is_err(),
2916            "volume installation should fail while either src/dst is still mounted"
2917        );
2918
2919        // Now let's re-mount the filesystem manually without a fixture, install the volumes, and
2920        // spin up a new fixture to verify the result.
2921        let device = fixture.close().await;
2922        let fs = FxFilesystem::open(device).await.unwrap();
2923        {
2924            let root = root_volume(fs.clone()).await.unwrap();
2925            root.install_volume("src", "image", "vol").await.unwrap();
2926        }
2927        fs.close().await.unwrap();
2928        let device = fs.take_device().await;
2929        device.reopen(/*read_only*/ true);
2930        let fixture = TestFixture::open(
2931            device,
2932            TestFixtureOptions { encrypted: false, format: false, ..Default::default() },
2933        )
2934        .await;
2935
2936        // Ensure that the "src" volume is now gone, and the old file "foo" is gone from "vol".
2937        assert!(
2938            fixture.volumes_directory().mount_volume("src", None, false).await.is_err(),
2939            "src volume should be deleted after installation"
2940        );
2941        assert!(
2942            testing::open_file(
2943                fixture.volume_out_dir(),
2944                "foo",
2945                fio::PERM_READABLE,
2946                &Default::default()
2947            )
2948            .await
2949            .is_err(),
2950            "foo should be deleted after installation"
2951        );
2952
2953        // Check that we can find the contents of "vol" that we installed from the image.
2954        let file =
2955            open_file_checked(fixture.root(), "bar", fio::PERM_READABLE, &Default::default()).await;
2956        let data = file.read(fio::MAX_TRANSFER_SIZE).await.unwrap().expect("read failed");
2957        assert_eq!(String::from_utf8(data).unwrap(), "Well, this is new...");
2958        file.close().await.unwrap().unwrap();
2959
2960        fixture.close().await;
2961    }
2962
2963    #[fuchsia::test]
2964    async fn test_create_with_low_32_bit_ids() {
2965        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2966        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2967        let blob_resupplied_count =
2968            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2969
2970        {
2971            let volumes_directory = VolumesDirectory::new(
2972                root_volume(filesystem.clone()).await.unwrap(),
2973                Weak::new(),
2974                None,
2975                blob_resupplied_count,
2976                MemoryPressureConfig::default(),
2977            )
2978            .await
2979            .unwrap();
2980
2981            let mut guard = volumes_directory.lock().await;
2982
2983            let vol = guard
2984                .create_or_mount_volume(
2985                    "low_32",
2986                    None,
2987                    Mode::Create { guid: None, low_32_bit_object_ids: true },
2988                    false,
2989                )
2990                .await
2991                .expect("create volume failed");
2992
2993            let root_dir = vol.volume().store().root_directory_object_id();
2994            let root_dir = fxfs::object_store::Directory::open(vol.volume().store(), root_dir)
2995                .await
2996                .expect("open failed");
2997
2998            let mut transaction = filesystem
2999                .root_store()
3000                .new_transaction(
3001                    lock_keys![LockKey::object(
3002                        vol.volume().store().store_object_id(),
3003                        root_dir.object_id()
3004                    )],
3005                    Options::default(),
3006                )
3007                .await
3008                .expect("new_transaction failed");
3009
3010            let object = root_dir
3011                .create_child_file(&mut transaction, "test")
3012                .await
3013                .expect("create_child_file failed");
3014
3015            // We can't check LastObjectIdInfo as it is private, but we can verify behavior.
3016            assert!(object.object_id() < 1 << 32);
3017            transaction.commit().await.expect("commit failed");
3018        };
3019
3020        filesystem.close().await.expect("close filesystem failed");
3021
3022        // Reopen and verify persistence
3023        let device = filesystem.take_device().await;
3024        device.reopen(false);
3025        let filesystem = FxFilesystem::open(device).await.unwrap();
3026        let blob_resupplied_count =
3027            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
3028        let volumes_directory = VolumesDirectory::new(
3029            root_volume(filesystem.clone()).await.unwrap(),
3030            Weak::new(),
3031            None,
3032            blob_resupplied_count,
3033            MemoryPressureConfig::default(),
3034        )
3035        .await
3036        .unwrap();
3037
3038        // Mount the volume again, and check that new files are still created with expected IDs.
3039        {
3040            let mut guard = volumes_directory.lock().await;
3041
3042            let vol = guard
3043                .create_or_mount_volume("low_32", None, Mode::Mount, false)
3044                .await
3045                .expect("mount volume failed");
3046
3047            let root_dir = vol.volume().store().root_directory_object_id();
3048            let root_dir = fxfs::object_store::Directory::open(vol.volume().store(), root_dir)
3049                .await
3050                .expect("open failed");
3051
3052            let mut transaction = filesystem
3053                .root_store()
3054                .new_transaction(
3055                    lock_keys![LockKey::object(
3056                        vol.volume().store().store_object_id(),
3057                        root_dir.object_id()
3058                    )],
3059                    Options::default(),
3060                )
3061                .await
3062                .expect("new_transaction failed");
3063
3064            let object = root_dir
3065                .create_child_file(&mut transaction, "test2")
3066                .await
3067                .expect("create_child_file failed");
3068
3069            assert!(object.object_id() < 1 << 32);
3070            transaction.commit().await.expect("commit failed");
3071        }
3072
3073        filesystem.close().await.expect("close filesystem failed");
3074    }
3075
3076    #[fuchsia::test(threads = 10)]
3077    async fn test_race_unmount_and_flush_with_crypt_error() {
3078        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
3079        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
3080        let blob_resupplied_count = Arc::new(PageRefaultCounter::new().unwrap());
3081        let volumes_directory = VolumesDirectory::new(
3082            root_volume(filesystem.clone()).await.unwrap(),
3083            Weak::new(),
3084            None,
3085            blob_resupplied_count,
3086            MemoryPressureConfig::default(),
3087        )
3088        .await
3089        .unwrap();
3090
3091        let crypt_service = Arc::new(fxfs_crypt::CryptService::new());
3092        crypt_service
3093            .add_wrapping_key(0, fxfs_insecure_crypto::DATA_KEY.to_vec())
3094            .expect("add_wrapping_key failed");
3095        crypt_service
3096            .add_wrapping_key(1, fxfs_insecure_crypto::METADATA_KEY.to_vec())
3097            .expect("add_wrapping_key failed");
3098        crypt_service.set_active_key(KeyPurpose::Data, 0).expect("set_active_key failed");
3099        crypt_service.set_active_key(KeyPurpose::Metadata, 1).expect("set_active_key failed");
3100
3101        for _ in 0..20 {
3102            let (client, mut stream) = create_request_stream::<fidl_fuchsia_fxfs::CryptMarker>();
3103            let (close_tx, mut close_rx) = futures::channel::oneshot::channel::<()>();
3104
3105            let crypt_task = fasync::Task::spawn(async move {
3106                loop {
3107                    futures::select! {
3108                        _ = close_rx => return, // Close signal
3109                        request = stream.try_next() => {
3110                            match request {
3111                                Ok(Some(CryptRequest::CreateKey { responder, .. })) => {
3112                                    responder.send(Ok((&[0; 16], &[0; 48], &[0; 32]))).unwrap();
3113                                }
3114                                Ok(Some(CryptRequest::CreateKeyWithId {
3115                                        wrapping_key_id, responder, .. })) => {
3116                                    let key = WrappedKey::Fxfs(FxfsKey {
3117                                        wrapping_key_id,
3118                                        wrapped_key: [0u8; 48],
3119                                    });
3120                                    responder.send(Ok((&key, &[0; 32]))).unwrap();
3121                                }
3122                                Ok(Some(CryptRequest::UnwrapKey { responder, .. })) => {
3123                                    responder.send(Ok(&vec![0; 32])).unwrap();
3124                                }
3125                                _ => return,
3126                            }
3127                        }
3128                    }
3129                }
3130            });
3131
3132            let volume = volumes_directory
3133                .create_and_mount_volume(
3134                    "encrypted",
3135                    Some(Arc::new(RemoteCrypt::new(client))),
3136                    false,
3137                    None,
3138                )
3139                .await
3140                .unwrap();
3141
3142            // Write some data to dirty the journal.
3143            {
3144                let mut transaction = filesystem
3145                    .root_store()
3146                    .new_transaction(
3147                        lock_keys![LockKey::object(
3148                            volume.volume().store().store_object_id(),
3149                            volume.root_dir().directory().object_id()
3150                        )],
3151                        Options::default(),
3152                    )
3153                    .await
3154                    .unwrap();
3155                volume
3156                    .root_dir()
3157                    .directory()
3158                    .create_child_file(&mut transaction, "foo")
3159                    .await
3160                    .expect("create_child_file failed");
3161                transaction.commit().await.expect("commit failed");
3162            }
3163
3164            let (dir_proxy, dir_server_end) =
3165                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
3166            volumes_directory.serve_volume(&volume, dir_server_end, false).unwrap();
3167            volumes_directory.lock().await.auto_unmount(volume.volume().store().store_object_id());
3168
3169            // Kill crypt.  The next usage should be in flush.
3170            let _ = close_tx.send(());
3171
3172            let filesystem_clone = filesystem.clone();
3173            let compact_task = fasync::Task::spawn(async move {
3174                // Flush should not fail, because that would close the journal for the rest of the
3175                // filesystem.  Instead, the volume should be force-locked and flushed (which
3176                // doesn't depend on crypt).
3177                filesystem_clone.object_manager().flush().await.expect("flush failed");
3178            });
3179
3180            // Trigger unmount.
3181            std::mem::drop(dir_proxy);
3182
3183            // Wait for volume to be unmounted, so we can clean up for the next iteration.
3184            let store_id = volume.volume().store().store_object_id();
3185            loop {
3186                {
3187                    let guard = volumes_directory.lock().await;
3188                    if !guard.mounted_volumes.contains_key(&store_id) {
3189                        break;
3190                    }
3191                }
3192                fasync::Timer::new(Duration::from_millis(10)).await;
3193            }
3194
3195            join!(compact_task, crypt_task);
3196            volumes_directory.remove_volume("encrypted").await.expect("remove_volume failed");
3197        }
3198        volumes_directory.terminate().await;
3199        filesystem.close().await.expect("close filesystem failed");
3200    }
3201}