Skip to main content

fxfs_platform/fuchsia/
volumes_directory.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::RemoteCrypt;
6use crate::fuchsia::component::map_to_raw_status;
7use crate::fuchsia::directory::FxDirectory;
8use crate::fuchsia::errors::map_to_status;
9use crate::fuchsia::fxblob::BlobDirectory;
10use crate::fuchsia::memory_pressure::{MemoryPressureLevel, MemoryPressureMonitor};
11use crate::fuchsia::profile::new_profile_state;
12use crate::fuchsia::volume::{FxVolume, FxVolumeAndRoot, MemoryPressureConfig, RootDir};
13use anyhow::{Context, Error, anyhow, ensure};
14use async_trait::async_trait;
15use fidl::endpoints::{DiscoverableProtocolMarker, ServerEnd};
16use fidl_fuchsia_fs::{AdminMarker, AdminRequest, AdminRequestStream};
17use fidl_fuchsia_fs_startup::{
18    CheckOptions, CreateOptions, MountOptions, VolumeRequest, VolumeRequestStream,
19};
20use fidl_fuchsia_fxfs::{FileBackedVolumeProviderMarker, ProjectIdMarker};
21use fs_inspect::{FsInspectTree, FsInspectVolume};
22use futures::stream::FuturesUnordered;
23use futures::{StreamExt, TryStreamExt};
24use fxfs::errors::FxfsError;
25use fxfs::fsck;
26use fxfs::log::*;
27use fxfs::object_store::transaction::{LockKey, Options, lock_keys};
28use fxfs::object_store::volume::RootVolume;
29use fxfs::object_store::{
30    Directory, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions, StoreOwner,
31};
32use fxfs_crypto::Crypt;
33use fxfs_trace::{TraceFutureExt, trace_future_args};
34use refaults_vmo::PageRefaultCounter;
35use rustc_hash::FxHashMap as HashMap;
36use std::sync::atomic::{AtomicU64, Ordering};
37use std::sync::{Arc, OnceLock, Weak};
38use vfs::directory::entry_container::MutableDirectory;
39use vfs::directory::helper::DirectlyMutable;
40use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
41
42const MEBIBYTE: u64 = 1024 * 1024;
43
44/// VolumesDirectory is a special pseudo-directory used to enumerate and operate on volumes.
45/// Volume creation happens via fuchsia.fs.startup.Volumes.Create, rather than open.
46///
47/// Note that VolumesDirectory assumes exclusive access to |root_volume| and if volumes are
48/// manipulated from elsewhere, strange things will happen.
49pub struct VolumesDirectory {
50    root_volume: RootVolume,
51    directory_node: Arc<vfs::directory::immutable::Simple>,
52    mounted_volumes: futures::lock::Mutex<HashMap<u64, MountedVolume>>,
53    inspect_tree: Weak<FsInspectTree>,
54    mem_monitor: Option<MemoryPressureMonitor>,
55    blob_resupplied_count: Arc<PageRefaultCounter>,
56    // The state of profile recordings. Should be locked *after* mounted_volumes.
57    profiling_state: futures::lock::Mutex<Option<(String, fasync::Task<()>)>>,
58
59    /// A running estimate of the number of dirty bytes outstanding in all pager-backed VMOs across
60    /// all volumes.
61    pager_dirty_bytes_count: AtomicU64,
62
63    /// Max outstanding dirty bytes under critical memory pressure. This could be hardcoded, but is
64    /// broken out for testing.
65    max_dirty_bytes_when_critical: AtomicU64,
66
67    // A callback to invoke when a volume is added.  When the volume is removed, this is called
68    // again with `None` as the second parameter.
69    on_volume_added: OnceLock<Box<dyn Fn(&str, Option<Arc<ObjectStore>>) + Send + Sync>>,
70
71    /// The cache configuration to use under different memory pressure levels.
72    memory_pressure_config: MemoryPressureConfig,
73}
74
75/// Operations on VolumesDirectory that cannot be performed concurrently (i.e. most
76/// volume creation/removal ops) should exist on this guard instead of VolumesDirectory.
77pub struct MountedVolumesGuard<'a> {
78    volumes_directory: Arc<VolumesDirectory>,
79    mounted_volumes: futures::lock::MutexGuard<'a, HashMap<u64, MountedVolume>>,
80}
81
82struct MountedVolume {
83    sequence: u64,
84    name: String,
85    volume: FxVolumeAndRoot,
86
87    // True if the volume was forcibly locked.
88    locked: bool,
89}
90
91pub(crate) enum Mode {
92    Mount,
93    Create { guid: Option<[u8; 16]>, low_32_bit_object_ids: bool },
94}
95
96impl MountedVolumesGuard<'_> {
97    /// Creates or mounts a volume. If |crypt| is set, the volume will be created or mounted as
98    /// encrypted. The volume is mounted according to |as_blob|.
99    async fn create_or_mount_volume(
100        &mut self,
101        name: &str,
102        crypt: Option<Arc<dyn Crypt>>,
103        mode: Mode,
104        as_blob: bool,
105    ) -> Result<FxVolumeAndRoot, Error> {
106        let owner = Arc::downgrade(&self.volumes_directory) as Weak<dyn StoreOwner>;
107        let store = match mode {
108            Mode::Create { guid, low_32_bit_object_ids } => self
109                .volumes_directory
110                .root_volume
111                .new_volume(
112                    name,
113                    NewChildStoreOptions {
114                        options: StoreOptions { owner, crypt },
115                        guid,
116                        low_32_bit_object_ids,
117                        ..Default::default()
118                    },
119                )
120                .await
121                .context("failed to create new volume")?,
122            Mode::Mount => {
123                self.volumes_directory
124                    .root_volume
125                    .volume(name, StoreOptions { owner, crypt })
126                    .await?
127            }
128        };
129        ensure!(
130            !self.mounted_volumes.contains_key(&store.store_object_id()),
131            FxfsError::AlreadyBound
132        );
133
134        let volume = if as_blob {
135            self.mount_store::<BlobDirectory>(
136                name,
137                store,
138                self.volumes_directory.memory_pressure_config,
139            )
140            .await?
141        } else {
142            self.mount_store::<FxDirectory>(
143                name,
144                store,
145                self.volumes_directory.memory_pressure_config,
146            )
147            .await?
148        };
149        // If there is an ongoing profile activity, we should apply it to the mounted volume.
150        if let Some((profile_name, _)) = &(*self.volumes_directory.profiling_state.lock().await) {
151            if let Err(e) = volume
152                .volume()
153                .record_or_replay_profile(new_profile_state(as_blob), profile_name)
154                .await
155            {
156                error!(
157                    "Failed to record or replay profile '{}' for volume {}: {:?}",
158                    profile_name, name, e
159                );
160            }
161        }
162
163        if let Mode::Create { .. } = mode {
164            let store_object_id = volume.volume().store().store_object_id();
165            self.volumes_directory.add_directory_entry(name, store_object_id);
166        }
167        Ok(volume)
168    }
169
170    // Mounts the given store.  A lock *must* be held on the volume directory.
171    async fn mount_store<T: From<Directory<FxVolume>> + RootDir>(
172        &mut self,
173        name: &str,
174        store: Arc<ObjectStore>,
175        flush_task_config: MemoryPressureConfig,
176    ) -> Result<FxVolumeAndRoot, Error> {
177        let unique_id = zx::Event::create();
178        let volume = FxVolumeAndRoot::new::<T>(
179            Arc::downgrade(&self.volumes_directory),
180            store,
181            unique_id.koid().unwrap().raw_koid(),
182            self.volumes_directory.blob_resupplied_count.clone(),
183            self.volumes_directory.memory_pressure_config,
184        )
185        .await?;
186        volume
187            .volume()
188            .start_background_task(flush_task_config, self.volumes_directory.mem_monitor.as_ref());
189        self.add_mount(name, &volume);
190        Ok(volume)
191    }
192
193    /// Adds a volume (`FxVolumeAndRoot`) into the mount list.
194    pub fn add_mount(&mut self, name: &str, volume: &FxVolumeAndRoot) {
195        static SEQUENCE: AtomicU64 = AtomicU64::new(0);
196        let sequence = SEQUENCE.fetch_add(1, Ordering::Relaxed);
197        let store = volume.volume().store();
198        self.mounted_volumes.insert(
199            store.store_object_id(),
200            MountedVolume {
201                sequence,
202                name: name.to_string(),
203                volume: volume.clone(),
204                locked: false,
205            },
206        );
207        if let Some(inspect) = self.volumes_directory.inspect_tree.upgrade() {
208            inspect.register_volume(
209                name.to_string(),
210                Arc::downgrade(volume.volume()) as Weak<dyn FsInspectVolume + Send + Sync>,
211            )
212        }
213        if let Some(callback) = self.volumes_directory.on_volume_added.get() {
214            callback(name, Some(Arc::clone(store)));
215        }
216    }
217
218    async fn remove_volume(&mut self, name: &str) -> Result<(), Error> {
219        let (object_id, transaction) = self
220            .volumes_directory
221            .root_volume
222            .acquire_transaction_for_remove_volume(name, [], false)
223            .await?;
224
225        // Cowardly refuse to delete a mounted volume.
226        ensure!(!self.mounted_volumes.contains_key(&object_id), FxfsError::AlreadyBound);
227        if let Some(inspect) = self.volumes_directory.inspect_tree.upgrade() {
228            inspect.unregister_volume(name.to_string());
229        }
230        let directory_node = self.volumes_directory.directory_node.clone();
231        self.volumes_directory
232            .root_volume
233            .delete_volume(name, transaction, || {
234                // This shouldn't fail because the entry should exist.
235                directory_node.remove_entry(name, /* must_be_directory: */ false).unwrap();
236            })
237            .await?;
238        Ok(())
239    }
240
241    async fn terminate(&mut self) {
242        let volumes = std::mem::take(&mut *self.mounted_volumes);
243        for (_, MountedVolume { name, volume, locked, .. }) in volumes {
244            if let Some(callback) = self.volumes_directory.on_volume_added.get() {
245                callback(&name, None);
246            }
247            let admin_scope = volume.admin_scope();
248            admin_scope.shutdown();
249            admin_scope.wait().await;
250
251            if !locked {
252                volume.volume().terminate().await;
253            }
254        }
255    }
256
257    // Unmounts the volume identified by `store_id`.  The caller should take locks to avoid races if
258    // necessary.
259    //
260    // NOTE: This will not terminate any connections on the admin scope.
261    pub async fn unmount(&mut self, store_id: u64) -> Result<FxVolumeAndRoot, Error> {
262        let MountedVolume { name, volume, locked, .. } =
263            self.mounted_volumes.remove(&store_id).ok_or(FxfsError::NotFound)?;
264        if let Some(callback) = self.volumes_directory.on_volume_added.get() {
265            callback(&name, None);
266        }
267
268        if !locked {
269            // As we are not terminating the admin scope, we must make sure to remove the root entry
270            // which holds strong references to the volume since otherwise `Volume::try_unwrap`
271            // might fail.
272            let _ = volume.outgoing_dir().remove_entry("root", true);
273            volume.volume().terminate().await;
274        }
275
276        Ok(volume)
277    }
278
279    async fn force_lock(&mut self, store_id: u64) -> Result<(), Error> {
280        let Some(MountedVolume { volume, locked, .. }) = self.mounted_volumes.get_mut(&store_id)
281        else {
282            // The volume is already unmounted, so there's nothing to do.
283            return Ok(());
284        };
285
286        if !*locked {
287            // Remove the "root" entry so that no more connections can be made.
288            let _ = volume.outgoing_dir().remove_entry("root", true);
289            volume.volume().terminate().await;
290            *locked = true;
291        }
292
293        Ok(())
294    }
295
296    // Auto-unmount the volume when the last connection to the volume is closed.
297    fn auto_unmount(&self, store_id: u64) {
298        let volumes_directory = self.volumes_directory.clone();
299        let mounted_volume = self.mounted_volumes.get(&store_id).unwrap();
300        let sequence = mounted_volume.sequence;
301        let admin_scope = mounted_volume.volume.admin_scope().clone();
302        let scope = mounted_volume.volume.volume().scope().clone();
303        fasync::Task::spawn(async move {
304            // Check the admin_scope first because once that has finished, there can never be any
305            // more connections to it.
306            admin_scope.wait().await;
307            scope.wait().await;
308
309            // Check that the same volume is still mounted i.e. there wasn't an explicit
310            // unmount.
311            let mut mounted_volumes = volumes_directory.lock().await;
312            match mounted_volumes.mounted_volumes.get(&store_id) {
313                Some(m) if m.sequence == sequence => {}
314                _ => return,
315            }
316
317            warn!(store_id; "Last connection to volume closed without unmount, shutting down");
318            let root_store = volumes_directory.root_volume.volume_directory().store();
319            let fs = root_store.filesystem();
320            let _guard = fs
321                .lock_manager()
322                .txn_lock(lock_keys![LockKey::object(
323                    root_store.store_object_id(),
324                    volumes_directory.root_volume.volume_directory().object_id(),
325                )])
326                .await;
327
328            if let Err(e) = mounted_volumes.unmount(store_id).await {
329                warn!(e:?, store_id; "Failed to unmount volume");
330            }
331        })
332        .detach();
333    }
334}
335
336impl VolumesDirectory {
337    /// Fills the VolumesDirectory with all volumes in |root_volume|.  No volume is opened during
338    /// this.
339    pub async fn new(
340        root_volume: RootVolume,
341        inspect_tree: Weak<FsInspectTree>,
342        mem_monitor: Option<MemoryPressureMonitor>,
343        blob_resupplied_count: Arc<PageRefaultCounter>,
344        memory_pressure_config: MemoryPressureConfig,
345    ) -> Result<Arc<Self>, Error> {
346        let layer_set = root_volume.volume_directory().store().tree().layer_set();
347        let mut merger = layer_set.merger();
348        let me = Arc::new(Self {
349            root_volume,
350            directory_node: vfs::directory::immutable::simple(),
351            mounted_volumes: futures::lock::Mutex::new(HashMap::default()),
352            inspect_tree,
353            mem_monitor,
354            blob_resupplied_count,
355            profiling_state: futures::lock::Mutex::new(None),
356            pager_dirty_bytes_count: AtomicU64::new(0),
357            max_dirty_bytes_when_critical: AtomicU64::new(zx::system_get_physmem() / 100),
358            on_volume_added: OnceLock::new(),
359            memory_pressure_config,
360        });
361        let mut iter = me.root_volume.volume_directory().iter(&mut merger).await?;
362        while let Some((name, store_id, object_descriptor)) = iter.get() {
363            ensure!(*object_descriptor == ObjectDescriptor::Volume, FxfsError::Inconsistent);
364
365            me.add_directory_entry(&name, store_id);
366
367            iter.advance().await?;
368        }
369        Ok(me)
370    }
371
372    /// Delete a profile for a given volume. Fails if that volume isn't mounted or if there is
373    /// active profile recording or replay.
374    pub async fn delete_profile(
375        self: &Arc<Self>,
376        volume_name: &str,
377        profile_name: &str,
378    ) -> Result<(), zx::Status> {
379        // Volumes lock is taken first to provide consistent lock ordering with mounting a volume.
380        let volumes = self.mounted_volumes.lock().await;
381        let state = self.profiling_state.lock().await;
382
383        // Only allow deletion when no operations are in flight. This removes confusion around
384        // deleting a profile while one is recording with the same name, as the profile will not be
385        // available for deletion until the recording completes. This would also mean that deleting
386        // during a recording may succeed for deleting an older version but will be confusingly
387        // replaced moments later.
388        if state.is_some() {
389            warn!("Failing profile deletion while profile operations are in flight.");
390            return Err(zx::Status::UNAVAILABLE);
391        }
392        for (_, MountedVolume { name, volume, .. }) in &*volumes {
393            if name == volume_name {
394                let dir = Arc::new(FxDirectory::new(
395                    None,
396                    volume.volume().get_profile_directory().await.map_err(map_to_status)?,
397                ));
398                return dir.unlink(profile_name, false).await;
399            }
400        }
401        warn!(volume_name, profile_name; "Volume not found while deleting profile");
402        Err(zx::Status::NOT_FOUND)
403    }
404
405    pub fn memory_pressure_monitor(&self) -> Option<&MemoryPressureMonitor> {
406        self.mem_monitor.as_ref()
407    }
408
409    /// Stop all ongoing replays, and complete and persist ongoing recordings.
410    pub async fn stop_profile_tasks(self: &Arc<Self>) {
411        let mut state;
412        let volumes;
413        // Take the mounted_volumes lock first to keep consistent lock ordering with other
414        // operations, but don't need to hold it for the entire operation. We need to take the
415        // profiling_state lock before the mounted_volumes lock is dropped to ensure that another
416        // thread doesn't mount a volume and start a profile task on it in between.
417        {
418            volumes = self
419                .mounted_volumes
420                .lock()
421                .await
422                .values()
423                .map(|v| v.volume.volume().clone()) // Clones of each FxVolume.
424                .collect::<Vec<Arc<FxVolume>>>();
425            state = self.profiling_state.lock().await;
426        }
427        for volume in volumes {
428            volume.stop_profile_tasks().await;
429        }
430        *state = None;
431    }
432
433    /// Record a named profile for a number of seconds, fails if there is an in flight recording or
434    /// replay.
435    pub async fn record_or_replay_profile(
436        self: Arc<Self>,
437        profile_name: String,
438        duration_secs: u32,
439    ) -> Result<(), Error> {
440        // Volumes lock is taken first to provide consistent lock ordering with mounting a volume.
441        let volumes = self.mounted_volumes.lock().await;
442        let mut state = self.profiling_state.lock().await;
443        if state.is_none() {
444            for (_, MountedVolume { name, volume, .. }) in &*volumes {
445                let is_blob = volume.root().clone().into_any().downcast::<BlobDirectory>().is_ok();
446                // Just log the errors, don't stop half-way.
447                if let Err(error) = volume
448                    .volume()
449                    .record_or_replay_profile(new_profile_state(is_blob), &profile_name)
450                    .await
451                {
452                    error!(
453                        error:?,
454                        profile_name = profile_name.as_str(),
455                        volume_name = name.as_str();
456                        "Failed to record or replay profile",
457                    );
458                }
459            }
460            let this = self.clone();
461            let timer_task = fasync::Task::spawn(async move {
462                fasync::Timer::new(fasync::MonotonicDuration::from_seconds(duration_secs.into()))
463                    .await;
464                this.stop_profile_tasks().await;
465            });
466            *state = Some((profile_name, timer_task));
467            Ok(())
468        } else {
469            // Consistency in the recording and replaying cannot be ensured at the volume level
470            // if more than one operation can be in flight at a time.
471            Err(anyhow!(FxfsError::AlreadyExists).context("Profile operation already in progress."))
472        }
473    }
474
475    /// Returns the directory node which can be used to provide connections for e.g. enumerating
476    /// entries in the VolumesDirectory.
477    /// Directly manipulating the entries in this node will result in strange behaviour.
478    pub fn directory_node(&self) -> &Arc<vfs::directory::immutable::Simple> {
479        &self.directory_node
480    }
481
482    // This serves as an exclusive lock for operations that manipulate the set of mounted volumes.
483    pub async fn lock<'a>(self: &'a Arc<Self>) -> MountedVolumesGuard<'a> {
484        MountedVolumesGuard {
485            volumes_directory: self.clone(),
486            mounted_volumes: self.mounted_volumes.lock().await,
487        }
488    }
489
490    fn add_directory_entry(self: &Arc<Self>, name: &str, store_id: u64) {
491        let weak = Arc::downgrade(self);
492        let name_owned = Arc::new(name.to_string());
493        self.directory_node
494            .add_entry(
495                name,
496                vfs::service::host(move |requests| {
497                    let weak = weak.clone();
498                    let name = name_owned.clone();
499                    async move {
500                        if let Some(me) = weak.upgrade() {
501                            let _ =
502                                me.handle_volume_requests(name.as_ref(), requests, store_id).await;
503                        }
504                    }
505                }),
506            )
507            .unwrap();
508    }
509
510    /// Creates and mounts a new volume. If |crypt| is set, the volume will be encrypted. The
511    /// volume is mounted according to |as_blob|.
512    pub async fn create_and_mount_volume(
513        self: &Arc<Self>,
514        name: &str,
515        crypt: Option<Arc<dyn Crypt>>,
516        as_blob: bool,
517        guid: Option<[u8; 16]>,
518    ) -> Result<FxVolumeAndRoot, Error> {
519        self.lock()
520            .await
521            .create_or_mount_volume(
522                name,
523                crypt,
524                Mode::Create { guid, low_32_bit_object_ids: false },
525                as_blob,
526            )
527            .await
528    }
529
530    /// Mounts an existing volume. `crypt` will be used to unlock the volume if provided.
531    /// If `as_blob` is `true`, the volume will be mounted as a blob filesystem, otherwise
532    /// it will be treated as a regular fxfs volume.
533    pub async fn mount_volume(
534        self: &Arc<Self>,
535        name: &str,
536        crypt: Option<Arc<dyn Crypt>>,
537        as_blob: bool,
538    ) -> Result<FxVolumeAndRoot, Error> {
539        self.lock().await.create_or_mount_volume(name, crypt, Mode::Mount, as_blob).await
540    }
541
542    /// Removes a volume. The volume must exist but encrypted volume keys are not required.
543    pub async fn remove_volume(self: &Arc<Self>, name: &str) -> Result<(), Error> {
544        self.lock().await.remove_volume(name).await
545    }
546
547    /// Terminates all opened volumes.  This will not cancel any profiling that might be taking
548    /// place.
549    pub async fn terminate(self: &Arc<Self>) {
550        // Abort the profiling timer task.
551        let profiling_state = self.profiling_state.lock().await.take();
552        if let Some((_, timer_task)) = profiling_state {
553            timer_task.abort().await;
554        }
555        self.lock().await.terminate().await
556    }
557
558    /// Serves the given volume on `outgoing_dir_server_end`.
559    pub fn serve_volume(
560        self: &Arc<Self>,
561        volume: &FxVolumeAndRoot,
562        outgoing_dir_server_end: ServerEnd<fio::DirectoryMarker>,
563        as_blob: bool,
564    ) -> Result<(), Error> {
565        // A note regarding strong references to `FxVolume`: connections to services here are all on
566        // the admin scope.  When we force lock a volume, we want to keep the admin scope running
567        // but terminate the volume.  When a volume is terminated in this way, we want to ensure
568        // that there are no outstanding strong references to the volume.  With that in mind, the
569        // services here should not hold strong references.  They can hold a strong reference to
570        // VolumesDirectory and find the volume via the mount list, or they can hold a weak
571        // reference to the FxVolume.  Before upgrading the weak reference, an active guard must be
572        // acquired on the volume's scope first.  This ensures that no new strong references are
573        // taken once the volume has commenced termination.  The "root" entry just below is an
574        // exception that does hold a strong reference.  We handle that by making sure we remove
575        // that entry before we call `FxVolume::terminate`.
576
577        let outgoing_dir = volume.outgoing_dir();
578        outgoing_dir.add_entry("root", volume.root().clone().as_directory_entry())?;
579        let svc_dir = vfs::directory::immutable::simple();
580        outgoing_dir.add_entry("svc", svc_dir.clone())?;
581
582        let store_id = volume.volume().store().store_object_id();
583        let me = self.clone();
584        svc_dir.add_entry(
585            AdminMarker::PROTOCOL_NAME,
586            vfs::service::host(move |requests| {
587                let me = me.clone();
588                async move {
589                    let _ = me.handle_admin_requests(requests, store_id).await;
590                }
591            }),
592        )?;
593        let vol_scope = volume.volume().scope().clone();
594        let weak_vol = Arc::downgrade(volume.volume());
595        {
596            let vol_scope = vol_scope.clone();
597            let weak_vol = weak_vol.clone();
598            svc_dir.add_entry(
599                ProjectIdMarker::PROTOCOL_NAME,
600                vfs::service::host(move |requests| {
601                    let weak_vol = weak_vol.clone();
602                    let scope = vol_scope.clone();
603                    async move {
604                        let _ =
605                            FxVolume::handle_project_id_requests(weak_vol, scope, requests).await;
606                    }
607                }),
608            )?;
609        }
610        svc_dir.add_entry(
611            FileBackedVolumeProviderMarker::PROTOCOL_NAME,
612            vfs::service::host(move |requests| {
613                let weak_vol = weak_vol.clone();
614                let scope = vol_scope.clone();
615                async move {
616                    let _ = FxVolume::handle_file_backed_volume_provider_requests(
617                        weak_vol, scope, requests,
618                    )
619                    .await;
620                }
621            }),
622        )?;
623        volume.root().clone().register_additional_volume_services(&svc_dir)?;
624
625        let scope = volume.admin_scope().clone();
626        let mut flags = fio::PERM_READABLE | fio::PERM_WRITABLE;
627        if as_blob {
628            flags |= fio::PERM_EXECUTABLE;
629        }
630        vfs::directory::serve_on(Arc::clone(outgoing_dir), flags, scope, outgoing_dir_server_end);
631
632        info!(
633            store_id;
634            "Serving volume, pager port koid={}",
635            fasync::EHandle::local().port().koid().unwrap().raw_koid()
636        );
637        Ok(())
638    }
639
640    /// Creates and serves the volume with the given name.
641    pub async fn create_and_serve_volume(
642        self: &Arc<Self>,
643        name: &str,
644        outgoing_directory_server_end: ServerEnd<fio::DirectoryMarker>,
645        mount_options: MountOptions,
646        create_options: CreateOptions,
647    ) -> Result<(), Error> {
648        let mut guard = self.lock().await;
649        let crypt =
650            mount_options.crypt.map(|crypt| Arc::new(RemoteCrypt::new(crypt)) as Arc<dyn Crypt>);
651        let as_blob = mount_options.as_blob.unwrap_or(false);
652        let guid = create_options.guid;
653        let low_32_bit_object_ids = create_options.restrict_inode_ids_to_32_bit.unwrap_or(false);
654        let volume = guard
655            .create_or_mount_volume(
656                name,
657                crypt,
658                Mode::Create { guid, low_32_bit_object_ids },
659                as_blob,
660            )
661            .await?;
662        self.serve_volume(&volume, outgoing_directory_server_end, as_blob)
663            .context("failed to serve volume")?;
664        guard.auto_unmount(volume.volume().store().store_object_id());
665        Ok(())
666    }
667
668    async fn handle_volume_requests(
669        self: &Arc<Self>,
670        name: &str,
671        mut requests: VolumeRequestStream,
672        store_id: u64,
673    ) -> Result<(), Error> {
674        while let Some(request) = requests.try_next().await? {
675            match request {
676                VolumeRequest::Check { responder, options } => {
677                    responder.send(self.handle_check(store_id, options).await.map_err(|error| {
678                        error!(error:?, store_id; "Failed to check volume");
679                        map_to_raw_status(error)
680                    }))?
681                }
682                VolumeRequest::Mount { responder, outgoing_directory, options } => responder.send(
683                    self.handle_mount(name, store_id, outgoing_directory, options).await.map_err(
684                        |error| {
685                            error!(error:?, name, store_id; "Failed to mount volume");
686                            map_to_raw_status(error)
687                        },
688                    ),
689                )?,
690                VolumeRequest::SetLimit { responder, bytes } => responder.send(
691                    self.handle_set_limit(store_id, bytes).await.map_err(|error| {
692                        error!(error:?, store_id; "Failed to set volume limit");
693                        map_to_raw_status(error)
694                    }),
695                )?,
696                VolumeRequest::GetLimit { responder } => {
697                    responder.send(Ok(self.handle_get_limit(store_id)))?
698                }
699                VolumeRequest::GetInfo { responder } => {
700                    let result = self.handle_get_info(store_id).await.map(|guid| {
701                        fidl_fuchsia_fs_startup::VolumeInfo {
702                            guid: Some(guid),
703                            ..Default::default()
704                        }
705                    });
706                    match result {
707                        Ok(response) => responder.send(Ok(&response))?,
708                        Err(error) => {
709                            error!(error:?, store_id; "Failed to get volume info");
710                            responder.send(Err(map_to_raw_status(error)))?
711                        }
712                    }
713                }
714            }
715        }
716        Ok(())
717    }
718
719    pub fn memory_pressure_config(&self) -> &MemoryPressureConfig {
720        &self.memory_pressure_config
721    }
722
723    fn is_flush_required_to_dirty(&self, byte_count: u64) -> bool {
724        let mem_pressure = self
725            .mem_monitor
726            .as_ref()
727            .map(|mem_monitor| mem_monitor.level())
728            .unwrap_or(MemoryPressureLevel::Normal);
729        if !matches!(mem_pressure, MemoryPressureLevel::Critical) {
730            return false;
731        }
732
733        let total_dirty = self.pager_dirty_bytes_count.load(Ordering::Acquire);
734        total_dirty + byte_count >= self.max_dirty_bytes_when_critical.load(Ordering::Relaxed)
735    }
736
737    /// Reports that a certain number of bytes will be dirtied in a pager-backed VMO. If the memory
738    /// pressure level is critical and fxfs has lots of dirty pages then a new task will be spawned
739    /// in `volume` to flush the dirty pages before `mark_dirty` is called. If the memory pressure
740    /// level is not critical then `mark_dirty` will be synchronously called.
741    pub fn report_pager_dirty(
742        self: Arc<Self>,
743        byte_count: u64,
744        volume: Arc<FxVolume>,
745        mark_dirty: impl FnOnce() + Send + 'static,
746    ) {
747        if !self.is_flush_required_to_dirty(byte_count) {
748            self.pager_dirty_bytes_count.fetch_add(byte_count, Ordering::AcqRel);
749            mark_dirty();
750        } else {
751            volume.spawn(
752                async move {
753                    let volumes = self.mounted_volumes.lock().await;
754
755                    // Re-check the number of outstanding pager dirty bytes because another thread
756                    // could have raced and flushed the volumes first.
757                    if self.is_flush_required_to_dirty(byte_count) {
758                        debug!(
759                            "Flushing all volumes. Memory pressure is critical & dirty pager bytes \
760                            ({} MiB) >= limit ({} MiB)",
761                            self.pager_dirty_bytes_count.load(Ordering::Acquire) / MEBIBYTE,
762                            self.max_dirty_bytes_when_critical.load(Ordering::Relaxed) / MEBIBYTE
763                        );
764
765                        let flushes = FuturesUnordered::new();
766                        for MountedVolume { volume, .. } in volumes.values() {
767                            let vol = volume.volume().clone();
768                            flushes.push(async move {
769                                vol.minimize_memory().await;
770                            });
771                        }
772
773                        flushes.collect::<()>().await;
774                    }
775                    self.pager_dirty_bytes_count.fetch_add(byte_count, Ordering::AcqRel);
776                    mark_dirty();
777                }
778                .trace(trace_future_args!("flush-before-mark-dirty")),
779            )
780        }
781    }
782
783    /// Reports that a certain number of bytes were cleaned in a pager-backed VMO.
784    pub fn report_pager_clean(&self, byte_count: u64) {
785        let prev_dirty = self.pager_dirty_bytes_count.fetch_sub(byte_count, Ordering::AcqRel);
786
787        if prev_dirty < byte_count {
788            // An unlikely scenario, but if there was an underflow, reset the pager dirty bytes to
789            // zero.
790            self.pager_dirty_bytes_count.store(0, Ordering::Release);
791        }
792    }
793
794    async fn handle_check(
795        self: &Arc<Self>,
796        store_id: u64,
797        options: CheckOptions,
798    ) -> Result<(), Error> {
799        let fs = self.root_volume.volume_directory().store().filesystem();
800        let crypt = if let Some(crypt) = options.crypt {
801            Some(Arc::new(RemoteCrypt::new(crypt)) as Arc<dyn Crypt>)
802        } else {
803            None
804        };
805        let result = fsck::fsck_volume(fs.as_ref(), store_id, crypt).await?;
806        // TODO(b/311550633): Stash result in inspect.
807        info!(store_id:%; "{result:?}");
808        Ok(())
809    }
810
811    async fn handle_set_limit(self: &Arc<Self>, store_id: u64, bytes: u64) -> Result<(), Error> {
812        let fs = self.root_volume.volume_directory().store().filesystem();
813        let mut transaction = fs.clone().new_transaction(lock_keys![], Options::default()).await?;
814        fs.allocator().set_bytes_limit(&mut transaction, store_id, bytes)?;
815        transaction.commit().await?;
816        Ok(())
817    }
818
819    fn handle_get_limit(self: &Arc<Self>, store_id: u64) -> u64 {
820        let fs = self.root_volume.volume_directory().store().filesystem();
821        fs.allocator().get_owner_bytes_limit(store_id).unwrap_or_default()
822    }
823
824    async fn handle_get_info(self: &Arc<Self>, store_id: u64) -> Result<[u8; 16], Error> {
825        let fs = self.root_volume.volume_directory().store().filesystem();
826        let store =
827            fs.object_manager().store(store_id).ok_or_else(|| anyhow!("Store not found"))?;
828        Ok(store.guid())
829    }
830
831    async fn handle_mount(
832        self: &Arc<Self>,
833        name: &str,
834        store_id: u64,
835        outgoing_directory_server_end: ServerEnd<fio::DirectoryMarker>,
836        options: MountOptions,
837    ) -> Result<(), Error> {
838        info!(name:%, store_id:%, options:?; "Received mount request");
839        let crypt = options.crypt.map(|crypt| Arc::new(RemoteCrypt::new(crypt)) as Arc<dyn Crypt>);
840        let as_blob = options.as_blob.unwrap_or(false);
841        let mut guard = self.lock().await;
842        let volume = guard
843            .create_or_mount_volume(name, crypt, Mode::Mount, as_blob)
844            .await
845            .context("failed to mount volume")?;
846        self.serve_volume(&volume, outgoing_directory_server_end, as_blob)
847            .context("failed to serve volume")?;
848        guard.auto_unmount(volume.volume().store().store_object_id());
849        Ok(())
850    }
851
852    async fn handle_admin_requests(
853        self: &Arc<Self>,
854        mut stream: AdminRequestStream,
855        store_id: u64,
856    ) -> Result<(), Error> {
857        // If the Admin protocol ever supports more methods, this should change to a while.
858        if let Some(request) = stream.try_next().await.context("Reading request")? {
859            match request {
860                AdminRequest::Shutdown { responder } => {
861                    info!(store_id; "Received shutdown request for volume");
862
863                    let root_store = self.root_volume.volume_directory().store();
864                    let fs = root_store.filesystem();
865                    let _guard = fs
866                        .lock_manager()
867                        .txn_lock(lock_keys![LockKey::object(
868                            root_store.store_object_id(),
869                            self.root_volume.volume_directory().object_id(),
870                        )])
871                        .await;
872
873                    let maybe_volume = self.lock().await.unmount(store_id).await;
874                    responder
875                        .send()
876                        .unwrap_or_else(|e| warn!("Failed to send shutdown response: {}", e));
877
878                    if let Ok(volume) = maybe_volume {
879                        // NOTE: After calling this, this task might be dropped at the next await
880                        // point.
881                        volume.admin_scope().shutdown();
882                    }
883
884                    return Ok(());
885                }
886            }
887        }
888        Ok(())
889    }
890
891    /// Sets a callback which is invoked when a volume is added.  When the volume is removed, this
892    /// is called again with `None` as the second parameter.
893    /// Note that this can only be set once per VolumesDirectory; repeated calls will panic.
894    pub fn set_on_mount_callback<F: Fn(&str, Option<Arc<ObjectStore>>) + Send + Sync + 'static>(
895        &self,
896        callback: F,
897    ) {
898        self.on_volume_added.set(Box::new(callback)).ok().unwrap();
899    }
900
901    pub async fn install_volume(
902        self: &Arc<Self>,
903        src: &str,
904        image_file: &str,
905        dst: &str,
906    ) -> Result<(), Error> {
907        let guard = self.lock().await;
908        info!("installing {src}/{image_file} -> {dst}");
909        for (_, MountedVolume { name, .. }) in guard.mounted_volumes.iter() {
910            if name == src {
911                return Err(zx::Status::ALREADY_BOUND)
912                    .with_context(|| format!("volume {src} is already mounted"));
913            }
914            if name == dst {
915                return Err(zx::Status::ALREADY_BOUND)
916                    .with_context(|| format!("volume {dst} is already mounted"));
917            }
918        }
919        guard.volumes_directory.root_volume.install_volume(&src, &image_file, &dst).await?;
920
921        // The above function ensures that we've deleted `src` and `dst` now exists. Before we
922        // release `guard`, we need to update the entries in the volumes directory accordingly.
923        guard
924            .volumes_directory
925            .directory_node()
926            .remove_entry(src, /* must_be_directory: */ false)
927            .unwrap();
928        guard
929            .volumes_directory
930            .directory_node()
931            .remove_entry(dst, /* must_be_directory: */ false)
932            .unwrap();
933        let new_dst_object_id =
934            match guard.volumes_directory.root_volume.volume_directory().lookup(dst).await? {
935                Some((object_id, ObjectDescriptor::Volume, _)) => Ok(object_id),
936                Some(_) => Err(FxfsError::Inconsistent),
937                None => Err(FxfsError::NotFound),
938            }?;
939        self.add_directory_entry(dst, new_dst_object_id);
940
941        info!("install complete");
942        Ok(())
943    }
944}
945
946#[async_trait]
947impl StoreOwner for VolumesDirectory {
948    async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), Error> {
949        self.lock().await.force_lock(store.store_object_id()).await
950    }
951}
952
953#[cfg(test)]
954pub(crate) fn serve_startup_volume_proxy(
955    volumes_directory: &Arc<VolumesDirectory>,
956    volume_name: &str,
957) -> (fidl_fuchsia_fs_startup::VolumeProxy, vfs::ExecutionScope) {
958    use vfs::ToObjectRequest;
959    use vfs::service::ServiceLike;
960    let scope = vfs::ExecutionScope::new();
961    let entry = volumes_directory.directory_node().get_entry(volume_name).unwrap();
962    let service = entry.into_any().downcast::<vfs::service::Service>().unwrap();
963    let (proxy, server) = fidl::endpoints::create_proxy::<fidl_fuchsia_fs_startup::VolumeMarker>();
964    service
965        .connect(
966            scope.clone(),
967            Default::default(),
968            &mut fio::Flags::PROTOCOL_SERVICE.to_object_request(server),
969        )
970        .unwrap();
971    (proxy, scope)
972}
973
974#[cfg(test)]
975mod tests {
976    use super::{Mode, serve_startup_volume_proxy};
977    use crate::fuchsia::RemoteCrypt;
978    use crate::fuchsia::memory_pressure::MemoryPressureLevel;
979    use crate::fuchsia::testing::{self, TestFixture, open_dir_checked, open_file_checked};
980    use crate::fuchsia::volume::MemoryPressureConfig;
981    use crate::fuchsia::volumes_directory::VolumesDirectory;
982    use crate::testing::TestFixtureOptions;
983    use fidl::endpoints::{DiscoverableProtocolMarker, create_proxy, create_request_stream};
984    use fidl_fuchsia_fs::AdminMarker;
985    use fidl_fuchsia_fs_startup::{MountOptions, VolumeProxy};
986    use fidl_fuchsia_fxfs::{CryptRequest, FxfsKey, KeyPurpose, WrappedKey};
987    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
988    use fuchsia_fs::file;
989    use futures::{TryStreamExt, join};
990    use fxfs::errors::FxfsError;
991    use fxfs::filesystem::FxFilesystem;
992    use fxfs::fsck::{FsckOptions, fsck, fsck_volume_with_options, fsck_with_options};
993    use fxfs::lock_keys;
994    use fxfs::object_handle::ObjectHandle;
995    use fxfs::object_store::allocator::Allocator;
996    use fxfs::object_store::transaction::{LockKey, Options};
997    use fxfs::object_store::volume::root_volume;
998    use fxfs_crypto::Crypt;
999    use fxfs_insecure_crypto::new_insecure_crypt;
1000    use refaults_vmo::PageRefaultCounter;
1001    use std::sync::atomic::Ordering;
1002    use std::sync::{Arc, Weak};
1003    use std::time::Duration;
1004    use storage_device::DeviceHolder;
1005    use storage_device::fake_device::FakeDevice;
1006    use vfs::temp_clone::{TempClonable, unblock};
1007    use zx::Status;
1008    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
1009
1010    async fn write_image_to_file(image: DeviceHolder, file: fio::FileProxy) {
1011        file.resize(image.size()).await.unwrap().expect("resize failed");
1012        let vmo = TempClonable::new(
1013            file.get_backing_memory(fio::VmoFlags::SHARED_BUFFER | fio::VmoFlags::WRITE)
1014                .await
1015                .unwrap()
1016                .expect("get backing memory failed"),
1017        );
1018
1019        const CHUNK_READ_SIZE: usize = 131_072; /* 128 KiB */
1020        let mut buff = image.allocate_buffer(CHUNK_READ_SIZE).await;
1021        let total = image.size();
1022        let mut offset = 0;
1023        while offset < total {
1024            let amount = std::cmp::min(total - offset, CHUNK_READ_SIZE as u64);
1025            image.read(offset, buff.as_mut()).await.expect("image read failed");
1026            {
1027                // *NOTE*: We have to unblock our write to the VMO since it's pager backed and could
1028                // be running on the same thread as the filesystem is.
1029                let vmo = vmo.temp_clone();
1030                let data = buff.as_slice()[0..amount as usize].to_vec();
1031                let offset = offset;
1032                unblock(move || vmo.write(&data, offset)).await.expect("vmo write failed");
1033            }
1034            offset += amount;
1035        }
1036        assert_eq!(offset, total);
1037        file.sync().await.unwrap().expect("sync failed");
1038        file.close().await.unwrap().expect("close failed");
1039    }
1040
1041    #[fuchsia::test]
1042    async fn test_volume_creation() {
1043        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1044        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1045        let blob_resupplied_count =
1046            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1047        let volumes_directory = VolumesDirectory::new(
1048            root_volume(filesystem.clone()).await.unwrap(),
1049            Weak::new(),
1050            None,
1051            blob_resupplied_count,
1052            MemoryPressureConfig::default(),
1053        )
1054        .await
1055        .unwrap();
1056
1057        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1058        {
1059            let vol = volumes_directory
1060                .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1061                .await
1062                .expect("create encrypted volume failed");
1063            vol.volume().store().store_object_id()
1064        };
1065
1066        volumes_directory.terminate().await;
1067        std::mem::drop(volumes_directory);
1068        filesystem.close().await.expect("close filesystem failed");
1069        let device = filesystem.take_device().await;
1070        device.reopen(false);
1071        let filesystem = FxFilesystem::open(device).await.unwrap();
1072        let blob_resupplied_count =
1073            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1074        let volumes_directory = VolumesDirectory::new(
1075            root_volume(filesystem.clone()).await.unwrap(),
1076            Weak::new(),
1077            None,
1078            blob_resupplied_count,
1079            MemoryPressureConfig::default(),
1080        )
1081        .await
1082        .unwrap();
1083
1084        let error = volumes_directory
1085            .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1086            .await
1087            .err()
1088            .expect("Creating existing encrypted volume should fail");
1089        assert!(FxfsError::AlreadyExists.matches(&error));
1090    }
1091
1092    #[fuchsia::test]
1093    async fn test_dirty_pages_accumulate_in_parent() {
1094        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1095        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1096        let blob_resupplied_count =
1097            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1098        let volumes_directory = VolumesDirectory::new(
1099            root_volume(filesystem.clone()).await.unwrap(),
1100            Weak::new(),
1101            None,
1102            blob_resupplied_count,
1103            MemoryPressureConfig::default(),
1104        )
1105        .await
1106        .unwrap();
1107
1108        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1109        let vol = volumes_directory
1110            .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1111            .await
1112            .expect("create encrypted volume failed");
1113        let old_dirty = volumes_directory.pager_dirty_bytes_count.load(Ordering::SeqCst);
1114
1115        let new_dirty = {
1116            let (root, server_end) = create_proxy::<fio::DirectoryMarker>();
1117            vol.root().clone().serve(fio::PERM_READABLE | fio::PERM_WRITABLE, server_end);
1118            let f = open_file_checked(
1119                &root,
1120                "foo",
1121                fio::Flags::FLAG_MAYBE_CREATE
1122                    | fio::PERM_READABLE
1123                    | fio::PERM_WRITABLE
1124                    | fio::Flags::PROTOCOL_FILE,
1125                &Default::default(),
1126            )
1127            .await;
1128            let buf = vec![0xaa as u8; 8192];
1129            file::write(&f, buf.as_slice()).await.expect("Write");
1130            // It's important to check the dirty bytes before closing the file, as closing can
1131            // trigger a flush.
1132            volumes_directory.pager_dirty_bytes_count.load(Ordering::SeqCst)
1133        };
1134        assert_ne!(old_dirty, new_dirty);
1135
1136        volumes_directory.terminate().await;
1137        std::mem::drop(volumes_directory);
1138        filesystem.close().await.expect("close filesystem failed");
1139    }
1140
1141    #[fuchsia::test]
1142    async fn test_volume_reopen() {
1143        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1144        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1145        let blob_resupplied_count =
1146            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1147        let volumes_directory = VolumesDirectory::new(
1148            root_volume(filesystem.clone()).await.unwrap(),
1149            Weak::new(),
1150            None,
1151            blob_resupplied_count,
1152            MemoryPressureConfig::default(),
1153        )
1154        .await
1155        .unwrap();
1156
1157        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1158        let volume_id = {
1159            let vol = volumes_directory
1160                .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1161                .await
1162                .expect("create encrypted volume failed");
1163            vol.volume().store().store_object_id()
1164        };
1165
1166        volumes_directory.terminate().await;
1167        std::mem::drop(volumes_directory);
1168        filesystem.close().await.expect("close filesystem failed");
1169        let device = filesystem.take_device().await;
1170        device.reopen(false);
1171        let filesystem = FxFilesystem::open(device).await.unwrap();
1172        let blob_resupplied_count =
1173            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1174        let volumes_directory = VolumesDirectory::new(
1175            root_volume(filesystem.clone()).await.unwrap(),
1176            Weak::new(),
1177            None,
1178            blob_resupplied_count,
1179            MemoryPressureConfig::default(),
1180        )
1181        .await
1182        .unwrap();
1183
1184        {
1185            let vol = volumes_directory
1186                .mount_volume("encrypted", Some(crypt.clone()), false)
1187                .await
1188                .expect("open existing encrypted volume failed");
1189            assert_eq!(vol.volume().store().store_object_id(), volume_id);
1190        }
1191
1192        volumes_directory.terminate().await;
1193        std::mem::drop(volumes_directory);
1194        filesystem.close().await.expect("close filesystem failed");
1195    }
1196
1197    #[fuchsia::test]
1198    async fn test_volume_creation_unencrypted() {
1199        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1200        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1201        let blob_resupplied_count =
1202            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1203        let volumes_directory = VolumesDirectory::new(
1204            root_volume(filesystem.clone()).await.unwrap(),
1205            Weak::new(),
1206            None,
1207            blob_resupplied_count,
1208            MemoryPressureConfig::default(),
1209        )
1210        .await
1211        .unwrap();
1212
1213        {
1214            let vol = volumes_directory
1215                .create_and_mount_volume("unencrypted", None, false, None)
1216                .await
1217                .expect("create unencrypted volume failed");
1218            vol.volume().store().store_object_id()
1219        };
1220
1221        volumes_directory.terminate().await;
1222        std::mem::drop(volumes_directory);
1223        filesystem.close().await.expect("close filesystem failed");
1224        let device = filesystem.take_device().await;
1225        device.reopen(false);
1226        let filesystem = FxFilesystem::open(device).await.unwrap();
1227        let blob_resupplied_count =
1228            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1229        let volumes_directory = VolumesDirectory::new(
1230            root_volume(filesystem.clone()).await.unwrap(),
1231            Weak::new(),
1232            None,
1233            blob_resupplied_count,
1234            MemoryPressureConfig::default(),
1235        )
1236        .await
1237        .unwrap();
1238
1239        let error = volumes_directory
1240            .create_and_mount_volume("unencrypted", None, false, None)
1241            .await
1242            .err()
1243            .expect("Creating existing unencrypted volume should fail");
1244        assert!(FxfsError::AlreadyExists.matches(&error));
1245
1246        volumes_directory.terminate().await;
1247        std::mem::drop(volumes_directory);
1248        filesystem.close().await.expect("close filesystem failed");
1249    }
1250
1251    #[fuchsia::test]
1252    async fn test_volume_reopen_unencrypted() {
1253        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1254        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1255        let blob_resupplied_count =
1256            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1257        let volumes_directory = VolumesDirectory::new(
1258            root_volume(filesystem.clone()).await.unwrap(),
1259            Weak::new(),
1260            None,
1261            blob_resupplied_count,
1262            MemoryPressureConfig::default(),
1263        )
1264        .await
1265        .unwrap();
1266
1267        let volume_id = {
1268            let vol = volumes_directory
1269                .create_and_mount_volume("unencrypted", None, false, None)
1270                .await
1271                .expect("create unencrypted volume failed");
1272            vol.volume().store().store_object_id()
1273        };
1274
1275        volumes_directory.terminate().await;
1276        std::mem::drop(volumes_directory);
1277        filesystem.close().await.expect("close filesystem failed");
1278        let device = filesystem.take_device().await;
1279        device.reopen(false);
1280        let filesystem = FxFilesystem::open(device).await.unwrap();
1281        let blob_resupplied_count =
1282            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1283        let volumes_directory = VolumesDirectory::new(
1284            root_volume(filesystem.clone()).await.unwrap(),
1285            Weak::new(),
1286            None,
1287            blob_resupplied_count,
1288            MemoryPressureConfig::default(),
1289        )
1290        .await
1291        .unwrap();
1292
1293        {
1294            let vol = volumes_directory
1295                .mount_volume("unencrypted", None, false)
1296                .await
1297                .expect("open existing unencrypted volume failed");
1298            assert_eq!(vol.volume().store().store_object_id(), volume_id);
1299        }
1300
1301        volumes_directory.terminate().await;
1302        std::mem::drop(volumes_directory);
1303        filesystem.close().await.expect("close filesystem failed");
1304    }
1305
1306    #[fuchsia::test]
1307    async fn test_volume_enumeration() {
1308        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1309        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1310        let blob_resupplied_count =
1311            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1312        let volumes_directory = VolumesDirectory::new(
1313            root_volume(filesystem.clone()).await.unwrap(),
1314            Weak::new(),
1315            None,
1316            blob_resupplied_count,
1317            MemoryPressureConfig::default(),
1318        )
1319        .await
1320        .unwrap();
1321
1322        // Add an encrypted volume...
1323        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1324        {
1325            volumes_directory
1326                .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1327                .await
1328                .expect("create encrypted volume failed");
1329        };
1330        // And an unencrypted volume.
1331        {
1332            volumes_directory
1333                .create_and_mount_volume("unencrypted", None, false, None)
1334                .await
1335                .expect("create unencrypted volume failed");
1336        };
1337
1338        // Restart, so that we can test enumeration of unopened volumes.
1339        volumes_directory.terminate().await;
1340        std::mem::drop(volumes_directory);
1341        filesystem.close().await.expect("close filesystem failed");
1342        let device = filesystem.take_device().await;
1343        device.reopen(false);
1344        let filesystem = FxFilesystem::open(device).await.unwrap();
1345        let blob_resupplied_count =
1346            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1347        let volumes_directory = VolumesDirectory::new(
1348            root_volume(filesystem.clone()).await.unwrap(),
1349            Weak::new(),
1350            None,
1351            blob_resupplied_count,
1352            MemoryPressureConfig::default(),
1353        )
1354        .await
1355        .unwrap();
1356
1357        let readdir = |dir: Arc<fio::DirectoryProxy>| async move {
1358            let status = dir.rewind().await.expect("FIDL call failed");
1359            Status::ok(status).expect("rewind failed");
1360            let (status, buf) = dir.read_dirents(fio::MAX_BUF).await.expect("FIDL call failed");
1361            Status::ok(status).expect("read_dirents failed");
1362            let mut entries = vec![];
1363            for res in fuchsia_fs::directory::parse_dir_entries(&buf) {
1364                entries.push(res.expect("Failed to parse entry").name);
1365            }
1366            entries
1367        };
1368
1369        let dir_proxy =
1370            Arc::new(vfs::directory::serve_read_only(volumes_directory.directory_node().clone()));
1371        let entries = readdir(dir_proxy.clone()).await;
1372        assert_eq!(entries, [".", "encrypted", "unencrypted"]);
1373
1374        let _vol = volumes_directory
1375            .mount_volume("encrypted", Some(crypt.clone()), false)
1376            .await
1377            .expect("Open encrypted volume failed");
1378
1379        // Ensure that the behaviour is the same after we've opened a volume.
1380        let entries = readdir(dir_proxy).await;
1381        assert_eq!(entries, [".", "encrypted", "unencrypted"]);
1382
1383        volumes_directory.terminate().await;
1384        std::mem::drop(volumes_directory);
1385        filesystem.close().await.expect("close filesystem failed");
1386    }
1387
1388    #[fuchsia::test]
1389    async fn test_get_info() {
1390        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1391        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1392        let root_volume = root_volume(filesystem.clone()).await.unwrap();
1393        let blob_resupplied_count =
1394            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1395        let volumes_directory = VolumesDirectory::new(
1396            root_volume,
1397            Weak::new(),
1398            None,
1399            blob_resupplied_count,
1400            MemoryPressureConfig::default(),
1401        )
1402        .await
1403        .unwrap();
1404
1405        let vol = volumes_directory
1406            .create_and_mount_volume("vol", None, false, None)
1407            .await
1408            .expect("create_and_mount_volume failed");
1409        let guid = vol.volume().store().guid();
1410
1411        let (volume_proxy, _scope) = serve_startup_volume_proxy(&volumes_directory, "vol");
1412
1413        let info: fidl_fuchsia_fs_startup::VolumeInfo = volume_proxy
1414            .get_info()
1415            .await
1416            .expect("get_info failed")
1417            .expect("get_info returned error");
1418        assert_eq!(info.guid, Some(guid));
1419
1420        volumes_directory.terminate().await;
1421    }
1422
1423    #[fuchsia::test]
1424    async fn test_deleted_encrypted_volume_while_mounted() {
1425        const VOLUME_NAME: &str = "encrypted";
1426
1427        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1428        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1429        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1430        let blob_resupplied_count =
1431            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1432        let volumes_directory = VolumesDirectory::new(
1433            root_volume(filesystem.clone()).await.unwrap(),
1434            Weak::new(),
1435            None,
1436            blob_resupplied_count,
1437            MemoryPressureConfig::default(),
1438        )
1439        .await
1440        .unwrap();
1441        volumes_directory
1442            .create_and_mount_volume(VOLUME_NAME, Some(crypt.clone()), false, None)
1443            .await
1444            .expect("create encrypted volume failed");
1445        // We have the volume mounted so delete attempts should fail.
1446        assert!(
1447            FxfsError::AlreadyBound.matches(
1448                &volumes_directory
1449                    .remove_volume(VOLUME_NAME)
1450                    .await
1451                    .err()
1452                    .expect("Deleting volume should fail")
1453            )
1454        );
1455        volumes_directory.terminate().await;
1456        std::mem::drop(volumes_directory);
1457        filesystem.close().await.expect("close filesystem failed");
1458    }
1459
1460    #[fuchsia::test]
1461    async fn test_mount_volume_using_volume_protocol() {
1462        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1463        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1464        let blob_resupplied_count =
1465            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1466        let volumes_directory = VolumesDirectory::new(
1467            root_volume(filesystem.clone()).await.unwrap(),
1468            Weak::new(),
1469            None,
1470            blob_resupplied_count,
1471            MemoryPressureConfig::default(),
1472        )
1473        .await
1474        .unwrap();
1475
1476        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1477        let store_id = {
1478            let vol = volumes_directory
1479                .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1480                .await
1481                .expect("create encrypted volume failed");
1482            vol.volume().store().store_object_id()
1483        };
1484        volumes_directory.lock().await.unmount(store_id).await.expect("unmount failed");
1485
1486        let (volume_proxy, _scope) = serve_startup_volume_proxy(&volumes_directory, "encrypted");
1487
1488        let (dir_proxy, dir_server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1489
1490        let crypt_service = fxfs_crypt::CryptService::new();
1491        crypt_service
1492            .add_wrapping_key(0, fxfs_insecure_crypto::DATA_KEY.to_vec())
1493            .expect("add_wrapping_key failed");
1494        crypt_service
1495            .add_wrapping_key(1, fxfs_insecure_crypto::METADATA_KEY.to_vec())
1496            .expect("add_wrapping_key failed");
1497        crypt_service.set_active_key(KeyPurpose::Data, 0).expect("set_active_key failed");
1498        crypt_service.set_active_key(KeyPurpose::Metadata, 1).expect("set_active_key failed");
1499        let (client1, stream1) = create_request_stream();
1500        let (client2, stream2) = create_request_stream();
1501
1502        join!(
1503            async {
1504                volume_proxy
1505                    .mount(
1506                        dir_server_end,
1507                        MountOptions { crypt: Some(client1), ..MountOptions::default() },
1508                    )
1509                    .await
1510                    .expect("mount (fidl) failed")
1511                    .expect("mount failed");
1512
1513                open_file_checked(
1514                    &dir_proxy,
1515                    "root/test",
1516                    fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::FLAG_MAYBE_CREATE,
1517                    &Default::default(),
1518                )
1519                .await;
1520
1521                // Attempting to mount again should fail with ALREADY_BOUND.
1522                let (_dir_proxy, dir_server_end) =
1523                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1524
1525                assert_eq!(
1526                    Status::from_raw(
1527                        volume_proxy
1528                            .mount(
1529                                dir_server_end,
1530                                MountOptions { crypt: Some(client2), ..MountOptions::default() },
1531                            )
1532                            .await
1533                            .expect("mount (fidl) failed")
1534                            .expect_err("mount succeeded")
1535                    ),
1536                    Status::ALREADY_BOUND
1537                );
1538
1539                std::mem::drop(dir_proxy);
1540
1541                // The volume should get unmounted a short time later.
1542                let mut count = 0;
1543                loop {
1544                    if volumes_directory.mounted_volumes.lock().await.is_empty() {
1545                        break;
1546                    }
1547                    count += 1;
1548                    assert!(count <= 100);
1549                    fasync::Timer::new(Duration::from_millis(100)).await;
1550                }
1551            },
1552            async {
1553                crypt_service
1554                    .handle_request(fxfs_crypt::Services::Crypt(stream1))
1555                    .await
1556                    .expect("handle_request failed");
1557                crypt_service
1558                    .handle_request(fxfs_crypt::Services::Crypt(stream2))
1559                    .await
1560                    .expect("handle_request failed");
1561            }
1562        );
1563        // Make sure the background thread that actually calls terminate() on the volume finishes
1564        // before exiting the test. terminate() should be a no-op since we already verified
1565        // mounted_directories is empty, but the volume's terminate() future in the background task
1566        // may still be outstanding. As both the background task and VolumesDirectory::terminate()
1567        // hold the write lock, we use that to block until the background task has completed.
1568        volumes_directory.terminate().await;
1569    }
1570
1571    #[fuchsia::test]
1572    #[ignore] // TODO(b/293917849) re-enable this test when de-flaked
1573
1574    async fn test_volume_dir_races() {
1575        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1576        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1577        let blob_resupplied_count =
1578            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1579        let volumes_directory = VolumesDirectory::new(
1580            root_volume(filesystem.clone()).await.unwrap(),
1581            Weak::new(),
1582            None,
1583            blob_resupplied_count,
1584            MemoryPressureConfig::default(),
1585        )
1586        .await
1587        .unwrap();
1588
1589        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1590        let store_id = {
1591            let vol = volumes_directory
1592                .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1593                .await
1594                .expect("create encrypted volume failed");
1595            vol.volume().store().store_object_id()
1596        };
1597        volumes_directory.lock().await.unmount(store_id).await.expect("unmount failed");
1598
1599        let (volume_proxy, _scope) = serve_startup_volume_proxy(&volumes_directory, "encrypted");
1600
1601        let crypt_service = Arc::new(fxfs_crypt::CryptService::new());
1602        crypt_service
1603            .add_wrapping_key(0, fxfs_insecure_crypto::DATA_KEY.to_vec())
1604            .expect("add_wrapping_key failed");
1605        crypt_service
1606            .add_wrapping_key(1, fxfs_insecure_crypto::METADATA_KEY.to_vec())
1607            .expect("add_wrapping_key failed");
1608        crypt_service.set_active_key(KeyPurpose::Data, 0).expect("set_active_key failed");
1609        crypt_service.set_active_key(KeyPurpose::Metadata, 1).expect("set_active_key failed");
1610        let (client1, stream1) = create_request_stream();
1611        let (client2, stream2) = create_request_stream();
1612        let crypt_service_clone = crypt_service.clone();
1613        let crypt_task1 = fasync::Task::spawn(async move {
1614            crypt_service_clone
1615                .handle_request(fxfs_crypt::Services::Crypt(stream1))
1616                .await
1617                .expect("handle_request failed");
1618        });
1619        let crypt_task2 = fasync::Task::spawn(async move {
1620            crypt_service
1621                .handle_request(fxfs_crypt::Services::Crypt(stream2))
1622                .await
1623                .expect("handle_request failed");
1624        });
1625
1626        // Create two tasks each of mount and remove, and one to recreate the volume, so that we get
1627        // to exercise a wide variety of concurrent actions.
1628        // Delay remove and create a bit, since mount is slower due to FIDL.
1629        join!(
1630            async {
1631                let (_dir_proxy, dir_server_end) =
1632                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1633                if let Err(status) = volume_proxy
1634                    .mount(
1635                        dir_server_end,
1636                        MountOptions { crypt: Some(client1), ..MountOptions::default() },
1637                    )
1638                    .await
1639                    .expect("mount (fidl) failed")
1640                {
1641                    let status = Status::from_raw(status);
1642                    if status != Status::NOT_FOUND && status != Status::ALREADY_BOUND {
1643                        assert!(false, "Unexpected status {:}", status);
1644                    }
1645                }
1646            },
1647            async {
1648                let (_dir_proxy, dir_server_end) =
1649                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1650                if let Err(status) = volume_proxy
1651                    .mount(
1652                        dir_server_end,
1653                        MountOptions { crypt: Some(client2), ..MountOptions::default() },
1654                    )
1655                    .await
1656                    .expect("mount (fidl) failed")
1657                {
1658                    let status = Status::from_raw(status);
1659                    if status != Status::NOT_FOUND && status != Status::ALREADY_BOUND {
1660                        assert!(false, "Unexpected status {:}", status);
1661                    }
1662                }
1663            },
1664            async {
1665                let volumes_directory = volumes_directory.clone();
1666                let wait_time = rand::random_range(0..5);
1667                fasync::Timer::new(Duration::from_millis(wait_time)).await;
1668                if let Err(err) = volumes_directory.remove_volume("encrypted").await {
1669                    assert!(
1670                        FxfsError::NotFound.matches(&err) || FxfsError::AlreadyBound.matches(&err),
1671                        "Unexpected error {:?}",
1672                        err
1673                    );
1674                }
1675            },
1676            async {
1677                let volumes_directory = volumes_directory.clone();
1678                let wait_time = rand::random_range(0..5);
1679                fasync::Timer::new(Duration::from_millis(wait_time)).await;
1680                if let Err(err) = volumes_directory.remove_volume("encrypted").await {
1681                    assert!(
1682                        FxfsError::NotFound.matches(&err) || FxfsError::AlreadyBound.matches(&err),
1683                        "Unexpected error {:?}",
1684                        err
1685                    );
1686                }
1687            },
1688            async {
1689                let volumes_directory = volumes_directory.clone();
1690                let wait_time = rand::random_range(0..5);
1691                fasync::Timer::new(Duration::from_millis(wait_time)).await;
1692                let mut guard = volumes_directory.lock().await;
1693                match guard
1694                    .create_or_mount_volume(
1695                        "encrypted",
1696                        Some(crypt.clone()),
1697                        Mode::Create { guid: None, low_32_bit_object_ids: false },
1698                        false,
1699                    )
1700                    .await
1701                {
1702                    Ok(vol) => {
1703                        let store_id = vol.volume().store().store_object_id();
1704                        std::mem::drop(vol);
1705                        guard.unmount(store_id).await.expect("unmount failed");
1706                    }
1707                    Err(err) => {
1708                        assert!(
1709                            FxfsError::AlreadyExists.matches(&err)
1710                                || FxfsError::AlreadyBound.matches(&err),
1711                            "Unexpected error {:?}",
1712                            err
1713                        );
1714                    }
1715                }
1716            }
1717        );
1718        std::mem::drop(crypt_task1);
1719        std::mem::drop(crypt_task2);
1720        // Make sure the background thread that actually calls terminate() on the volume finishes
1721        // before exiting the test. terminate() should be a no-op since we already verified
1722        // mounted_directories is empty, but the volume's terminate() future in the background task
1723        // may still be outstanding. As both the background task and VolumesDirectory::terminate()
1724        // hold the write lock, we use that to block until the background task has completed.
1725        volumes_directory.terminate().await;
1726    }
1727
1728    #[fuchsia::test]
1729    async fn test_shutdown_volume() {
1730        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
1731        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1732        let blob_resupplied_count =
1733            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1734        let volumes_directory = VolumesDirectory::new(
1735            root_volume(filesystem.clone()).await.unwrap(),
1736            Weak::new(),
1737            None,
1738            blob_resupplied_count,
1739            MemoryPressureConfig::default(),
1740        )
1741        .await
1742        .unwrap();
1743
1744        let crypt = Arc::new(new_insecure_crypt()) as Arc<dyn Crypt>;
1745        let vol = volumes_directory
1746            .create_and_mount_volume("encrypted", Some(crypt.clone()), false, None)
1747            .await
1748            .expect("create encrypted volume failed");
1749
1750        let (dir_proxy, dir_server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1751
1752        volumes_directory.serve_volume(&vol, dir_server_end, false).expect("serve_volume failed");
1753
1754        let admin_proxy = connect_to_protocol_at_dir_svc::<AdminMarker>(&dir_proxy)
1755            .expect("Unable to connect to admin service");
1756
1757        admin_proxy.shutdown().await.expect("shutdown failed");
1758
1759        assert!(volumes_directory.mounted_volumes.lock().await.is_empty());
1760    }
1761
1762    #[fuchsia::test]
1763    async fn test_byte_limit_persistence() {
1764        const BYTES_LIMIT_1: u64 = 123456;
1765        const BYTES_LIMIT_2: u64 = 456789;
1766        const VOLUME_NAME: &str = "A";
1767        let mut device = DeviceHolder::new(FakeDevice::new(8192, 512));
1768        {
1769            let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1770            let blob_resupplied_count =
1771                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1772            let volumes_directory = VolumesDirectory::new(
1773                root_volume(filesystem.clone()).await.unwrap(),
1774                Weak::new(),
1775                None,
1776                blob_resupplied_count,
1777                MemoryPressureConfig::default(),
1778            )
1779            .await
1780            .unwrap();
1781
1782            volumes_directory
1783                .create_and_mount_volume(VOLUME_NAME, None, false, None)
1784                .await
1785                .expect("create unencrypted volume failed");
1786
1787            let (volume_proxy, _scope) =
1788                serve_startup_volume_proxy(&volumes_directory, VOLUME_NAME);
1789
1790            volume_proxy.set_limit(BYTES_LIMIT_1).await.unwrap().expect("To set limits");
1791            {
1792                let limits = (filesystem.allocator() as Arc<Allocator>).owner_byte_limits();
1793                assert_eq!(limits.len(), 1);
1794                assert_eq!(limits[0].1, BYTES_LIMIT_1);
1795            }
1796
1797            volume_proxy.set_limit(BYTES_LIMIT_2).await.unwrap().expect("To set limits");
1798            {
1799                let limits = (filesystem.allocator() as Arc<Allocator>).owner_byte_limits();
1800                assert_eq!(limits.len(), 1);
1801                assert_eq!(limits[0].1, BYTES_LIMIT_2);
1802            }
1803            std::mem::drop(volume_proxy);
1804            volumes_directory.terminate().await;
1805            std::mem::drop(volumes_directory);
1806            filesystem.close().await.expect("close filesystem failed");
1807            device = filesystem.take_device().await;
1808        }
1809        device.ensure_unique();
1810        device.reopen(false);
1811        {
1812            let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1813            fsck(filesystem.clone()).await.expect("Fsck");
1814            let blob_resupplied_count =
1815                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1816            let volumes_directory = VolumesDirectory::new(
1817                root_volume(filesystem.clone()).await.unwrap(),
1818                Weak::new(),
1819                None,
1820                blob_resupplied_count,
1821                MemoryPressureConfig::default(),
1822            )
1823            .await
1824            .unwrap();
1825            {
1826                let limits = (filesystem.allocator() as Arc<Allocator>).owner_byte_limits();
1827                assert_eq!(limits.len(), 1);
1828                assert_eq!(limits[0].1, BYTES_LIMIT_2);
1829            }
1830            volumes_directory.remove_volume(VOLUME_NAME).await.expect("Volume deletion failed");
1831            {
1832                let limits = (filesystem.allocator() as Arc<Allocator>).owner_byte_limits();
1833                assert_eq!(limits.len(), 0);
1834            }
1835            volumes_directory.terminate().await;
1836            std::mem::drop(volumes_directory);
1837            filesystem.close().await.expect("close filesystem failed");
1838            device = filesystem.take_device().await;
1839        }
1840        device.ensure_unique();
1841        device.reopen(false);
1842        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
1843        fsck(filesystem.clone()).await.expect("Fsck");
1844        let limits = (filesystem.allocator() as Arc<Allocator>).owner_byte_limits();
1845        assert_eq!(limits.len(), 0);
1846    }
1847
1848    struct VolumeInfo {
1849        _scope: vfs::ExecutionScope,
1850        volume_proxy: VolumeProxy,
1851        file_proxy: fio::FileProxy,
1852    }
1853
1854    impl VolumeInfo {
1855        async fn new(volumes_directory: &Arc<VolumesDirectory>, name: &'static str) -> Self {
1856            let volume = volumes_directory
1857                .create_and_mount_volume(name, None, false, None)
1858                .await
1859                .expect("create unencrypted volume failed");
1860
1861            let (volume_dir_proxy, dir_server_end) =
1862                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1863            volumes_directory
1864                .serve_volume(&volume, dir_server_end, false)
1865                .expect("serve_volume failed");
1866
1867            let (volume_proxy, _scope) = serve_startup_volume_proxy(&volumes_directory, name);
1868
1869            let (root_proxy, root_server_end) =
1870                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
1871            volume_dir_proxy
1872                .open(
1873                    "root",
1874                    fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
1875                    &Default::default(),
1876                    root_server_end.into_channel(),
1877                )
1878                .expect("Failed to open volume root");
1879
1880            let file_proxy = open_file_checked(
1881                &root_proxy,
1882                "foo",
1883                fio::Flags::FLAG_MAYBE_CREATE
1884                    | fio::PERM_READABLE
1885                    | fio::PERM_WRITABLE
1886                    | fio::Flags::PROTOCOL_FILE,
1887                &Default::default(),
1888            )
1889            .await;
1890            VolumeInfo { _scope, volume_proxy, file_proxy }
1891        }
1892    }
1893
1894    #[fuchsia::test]
1895    async fn test_limit_bytes() {
1896        const BYTES_LIMIT: u64 = 262_144; // 256KiB
1897        const BLOCK_SIZE: usize = 8192; // 8KiB
1898        let device = DeviceHolder::new(FakeDevice::new(BLOCK_SIZE.try_into().unwrap(), 512));
1899        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1900        let blob_resupplied_count =
1901            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1902        let volumes_directory = VolumesDirectory::new(
1903            root_volume(filesystem.clone()).await.unwrap(),
1904            Weak::new(),
1905            None,
1906            blob_resupplied_count,
1907            MemoryPressureConfig::default(),
1908        )
1909        .await
1910        .unwrap();
1911
1912        let vol = VolumeInfo::new(&volumes_directory, "foo").await;
1913        let old_info = {
1914            let (status, info) = vol.file_proxy.query_filesystem().await.expect("Getting fs info");
1915            assert_eq!(status, zx::Status::OK.into_raw());
1916            let info = info.unwrap();
1917            // With no limit set, the total filesystem size should be returned.
1918            assert!(info.total_bytes > BYTES_LIMIT);
1919            info
1920        };
1921
1922        vol.volume_proxy.set_limit(BYTES_LIMIT).await.unwrap().expect("To set limits");
1923        {
1924            let (status, info) = vol.file_proxy.query_filesystem().await.expect("Getting fs info");
1925            assert!(status == zx::Status::OK.into_raw());
1926            let new_info = info.unwrap();
1927            assert_eq!(new_info.total_bytes, BYTES_LIMIT);
1928            // Now since the limit is the volume limit, the space used should be the volume usage,
1929            // which should be strictly less than the filesystem.
1930            assert!(new_info.used_bytes < old_info.used_bytes);
1931        }
1932
1933        let zeros = vec![0u8; BLOCK_SIZE];
1934        // First write should succeed.
1935        assert_eq!(
1936            <u64 as TryInto<usize>>::try_into(
1937                vol.file_proxy
1938                    .write(&zeros)
1939                    .await
1940                    .expect("Failed Write message")
1941                    .expect("Failed write")
1942            )
1943            .unwrap(),
1944            BLOCK_SIZE
1945        );
1946        // Likely to run out of space before writing the full limit due to overheads.
1947        for _ in (BLOCK_SIZE..BYTES_LIMIT as usize).step_by(BLOCK_SIZE) {
1948            match vol.file_proxy.write(&zeros).await.expect("Failed Write message") {
1949                Err(_) => break,
1950                Ok(b) if b < BLOCK_SIZE.try_into().unwrap() => break,
1951                _ => (),
1952            };
1953        }
1954
1955        // Any further writes should fail with out of space.
1956        assert_eq!(
1957            vol.file_proxy
1958                .write(&zeros)
1959                .await
1960                .expect("Failed write message")
1961                .expect_err("Write should have been limited"),
1962            Status::NO_SPACE.into_raw()
1963        );
1964
1965        // Double the limit and try again. We should have write space again.
1966        vol.volume_proxy.set_limit(BYTES_LIMIT * 2).await.unwrap().expect("To set limits");
1967        assert_eq!(
1968            <u64 as TryInto<usize>>::try_into(
1969                vol.file_proxy
1970                    .write(&zeros)
1971                    .await
1972                    .expect("Failed Write message")
1973                    .expect("Failed write")
1974            )
1975            .unwrap(),
1976            BLOCK_SIZE
1977        );
1978
1979        vol.file_proxy.close().await.unwrap().expect("Failed to close file");
1980        volumes_directory.terminate().await;
1981        std::mem::drop(volumes_directory);
1982        filesystem.close().await.expect("close filesystem failed");
1983    }
1984
1985    #[fuchsia::test]
1986    async fn test_limit_bytes_two_hit_device_limit() {
1987        const BYTES_LIMIT: u64 = 3_145_728; // 3MiB
1988        const BLOCK_SIZE: usize = 8192; // 8KiB
1989        const BLOCK_COUNT: u32 = 512;
1990        let device =
1991            DeviceHolder::new(FakeDevice::new(BLOCK_SIZE.try_into().unwrap(), BLOCK_COUNT));
1992        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
1993        let blob_resupplied_count =
1994            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
1995        let volumes_directory = VolumesDirectory::new(
1996            root_volume(filesystem.clone()).await.unwrap(),
1997            Weak::new(),
1998            None,
1999            blob_resupplied_count,
2000            MemoryPressureConfig::default(),
2001        )
2002        .await
2003        .unwrap();
2004
2005        let a = VolumeInfo::new(&volumes_directory, "foo").await;
2006        let b = VolumeInfo::new(&volumes_directory, "bar").await;
2007        a.volume_proxy.set_limit(BYTES_LIMIT).await.unwrap().expect("To set limits");
2008        b.volume_proxy.set_limit(BYTES_LIMIT).await.unwrap().expect("To set limits");
2009        let mut a_written: u64 = 0;
2010        let mut b_written: u64 = 0;
2011
2012        // Write chunks of BLOCK_SIZE.
2013        let zeros = vec![0u8; BLOCK_SIZE];
2014
2015        // First write should succeed for both.
2016        assert_eq!(
2017            <u64 as TryInto<usize>>::try_into(
2018                a.file_proxy
2019                    .write(&zeros)
2020                    .await
2021                    .expect("Failed Write message")
2022                    .expect("Failed write")
2023            )
2024            .unwrap(),
2025            BLOCK_SIZE
2026        );
2027        a_written += BLOCK_SIZE as u64;
2028        assert_eq!(
2029            <u64 as TryInto<usize>>::try_into(
2030                b.file_proxy
2031                    .write(&zeros)
2032                    .await
2033                    .expect("Failed Write message")
2034                    .expect("Failed write")
2035            )
2036            .unwrap(),
2037            BLOCK_SIZE
2038        );
2039        b_written += BLOCK_SIZE as u64;
2040
2041        // Likely to run out of space before writing the full limit due to overheads.
2042        for _ in (BLOCK_SIZE..BYTES_LIMIT as usize).step_by(BLOCK_SIZE) {
2043            match a.file_proxy.write(&zeros).await.expect("Failed Write message") {
2044                Err(_) => break,
2045                Ok(bytes) => {
2046                    a_written += bytes;
2047                    if bytes < BLOCK_SIZE.try_into().unwrap() {
2048                        break;
2049                    }
2050                }
2051            };
2052        }
2053        // Any further writes should fail with out of space.
2054        assert_eq!(
2055            a.file_proxy
2056                .write(&zeros)
2057                .await
2058                .expect("Failed write message")
2059                .expect_err("Write should have been limited"),
2060            Status::NO_SPACE.into_raw()
2061        );
2062
2063        // Now write to the second volume. Likely to run out of space before writing the full limit
2064        // due to overheads.
2065        for _ in (BLOCK_SIZE..BYTES_LIMIT as usize).step_by(BLOCK_SIZE) {
2066            match b.file_proxy.write(&zeros).await.expect("Failed Write message") {
2067                Err(_) => break,
2068                Ok(bytes) => {
2069                    b_written += bytes;
2070                    if bytes < BLOCK_SIZE.try_into().unwrap() {
2071                        break;
2072                    }
2073                }
2074            };
2075        }
2076        // Any further writes should fail with out of space.
2077        assert_eq!(
2078            b.file_proxy
2079                .write(&zeros)
2080                .await
2081                .expect("Failed write message")
2082                .expect_err("Write should have been limited"),
2083            Status::NO_SPACE.into_raw()
2084        );
2085
2086        // Second volume should have failed very early.
2087        assert!(BLOCK_SIZE as u64 * BLOCK_COUNT as u64 - BYTES_LIMIT >= b_written);
2088        // First volume should have gotten further.
2089        assert!(BLOCK_SIZE as u64 * BLOCK_COUNT as u64 - BYTES_LIMIT <= a_written);
2090
2091        a.file_proxy.close().await.unwrap().expect("Failed to close file");
2092        b.file_proxy.close().await.unwrap().expect("Failed to close file");
2093        volumes_directory.terminate().await;
2094        std::mem::drop(volumes_directory);
2095        filesystem.close().await.expect("close filesystem failed");
2096    }
2097
2098    #[fuchsia::test(threads = 10)]
2099    async fn test_profile_start() {
2100        const PREMOUNT_BLOB: &str = "premount_blob";
2101        const PREMOUNT_NOBLOB: &str = "premount_noblob";
2102        const LIVE_BLOB: &str = "live_blob";
2103        const LIVE_NOBLOB: &str = "live_noblob";
2104
2105        const RECORDING_NAME: &str = "foo";
2106
2107        let device = {
2108            let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2109            let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2110            let blob_resupplied_count =
2111                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2112            let volumes_directory = VolumesDirectory::new(
2113                root_volume(filesystem.clone()).await.unwrap(),
2114                Weak::new(),
2115                None,
2116                blob_resupplied_count,
2117                MemoryPressureConfig::default(),
2118            )
2119            .await
2120            .unwrap();
2121            volumes_directory
2122                .create_and_mount_volume(PREMOUNT_BLOB, None, true, None)
2123                .await
2124                .unwrap();
2125            volumes_directory
2126                .create_and_mount_volume(PREMOUNT_NOBLOB, None, false, None)
2127                .await
2128                .unwrap();
2129            volumes_directory.create_and_mount_volume(LIVE_BLOB, None, true, None).await.unwrap();
2130            volumes_directory
2131                .create_and_mount_volume(LIVE_NOBLOB, None, false, None)
2132                .await
2133                .unwrap();
2134
2135            volumes_directory.terminate().await;
2136            std::mem::drop(volumes_directory);
2137            filesystem.close().await.expect("Filesystem close");
2138            filesystem.take_device().await
2139        };
2140
2141        device.ensure_unique();
2142        device.reopen(false);
2143        let device = {
2144            let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2145            let blob_resupplied_count =
2146                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2147            let volumes_directory = VolumesDirectory::new(
2148                root_volume(filesystem.clone()).await.unwrap(),
2149                Weak::new(),
2150                None,
2151                blob_resupplied_count,
2152                MemoryPressureConfig::default(),
2153            )
2154            .await
2155            .unwrap();
2156
2157            // Premount two volumes.
2158            let _premount_blob = volumes_directory
2159                .mount_volume(PREMOUNT_BLOB, None, true)
2160                .await
2161                .expect("Reopen volume");
2162            let _premount_noblob = volumes_directory
2163                .mount_volume(PREMOUNT_NOBLOB, None, false)
2164                .await
2165                .expect("Reopen volume");
2166
2167            // Start the recording, let it run a really long time, it doesn't need to end for this
2168            // test. If it does wait this long then it should trigger test timeouts.
2169            volumes_directory
2170                .clone()
2171                .record_or_replay_profile(RECORDING_NAME.to_owned(), 600)
2172                .await
2173                .expect("Recording");
2174
2175            // Live mount two volumes.
2176            let _live_blob =
2177                volumes_directory.mount_volume(LIVE_BLOB, None, true).await.expect("Reopen volume");
2178            let _live_noblob = volumes_directory
2179                .mount_volume(LIVE_NOBLOB, None, false)
2180                .await
2181                .expect("Reopen volume");
2182
2183            // Wait for the recordings to finish.
2184            volumes_directory.stop_profile_tasks().await;
2185
2186            volumes_directory.terminate().await;
2187            std::mem::drop(volumes_directory);
2188            filesystem.close().await.expect("Filesystem close");
2189            filesystem.take_device().await
2190        };
2191
2192        device.ensure_unique();
2193        device.reopen(false);
2194        let filesystem = FxFilesystem::open(device as DeviceHolder).await.unwrap();
2195        {
2196            let blob_resupplied_count =
2197                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2198            let volumes_directory = VolumesDirectory::new(
2199                root_volume(filesystem.clone()).await.unwrap(),
2200                Weak::new(),
2201                None,
2202                blob_resupplied_count,
2203                MemoryPressureConfig::default(),
2204            )
2205            .await
2206            .unwrap();
2207
2208            let _premount_blob = volumes_directory
2209                .mount_volume(PREMOUNT_BLOB, None, true)
2210                .await
2211                .expect("Reopen volume");
2212            let _premount_noblob = volumes_directory
2213                .mount_volume(PREMOUNT_NOBLOB, None, false)
2214                .await
2215                .expect("Reopen volume");
2216            let _live_blob =
2217                volumes_directory.mount_volume(LIVE_BLOB, None, true).await.expect("Reopen volume");
2218            let _live_noblob = volumes_directory
2219                .mount_volume(LIVE_NOBLOB, None, false)
2220                .await
2221                .expect("Reopen volume");
2222
2223            // Verify which recordings ran based on the saved recordings.
2224            volumes_directory
2225                .delete_profile(PREMOUNT_BLOB, RECORDING_NAME)
2226                .await
2227                .expect("Finding profile to delete.");
2228            volumes_directory
2229                .delete_profile(PREMOUNT_NOBLOB, RECORDING_NAME)
2230                .await
2231                .expect("Finding profile to delete.");
2232            volumes_directory
2233                .delete_profile(LIVE_BLOB, RECORDING_NAME)
2234                .await
2235                .expect("Finding profile to delete.");
2236            volumes_directory
2237                .delete_profile(LIVE_NOBLOB, RECORDING_NAME)
2238                .await
2239                .expect("Finding profile to delete.");
2240
2241            volumes_directory.terminate().await;
2242        }
2243
2244        filesystem.close().await.expect("Filesystem close");
2245    }
2246
2247    #[fuchsia::test(threads = 10)]
2248    async fn test_profile_stop() {
2249        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2250        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2251        let blob_resupplied_count =
2252            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2253        let volumes_directory = VolumesDirectory::new(
2254            root_volume(filesystem.clone()).await.unwrap(),
2255            Weak::new(),
2256            None,
2257            blob_resupplied_count,
2258            MemoryPressureConfig::default(),
2259        )
2260        .await
2261        .unwrap();
2262        let volume =
2263            volumes_directory.create_and_mount_volume("foo", None, true, None).await.unwrap();
2264
2265        // Run the recording with no time at all and ensure that it still shuts down properly.
2266        volumes_directory
2267            .clone()
2268            .record_or_replay_profile("foo".to_owned(), 0)
2269            .await
2270            .expect("Recording");
2271
2272        // Delete will succeed once the profile is completed.
2273        while volumes_directory.delete_profile("foo", "foo").await.is_err() {
2274            fasync::Timer::new(Duration::from_millis(10)).await;
2275        }
2276
2277        std::mem::drop(volume);
2278        volumes_directory.terminate().await;
2279        std::mem::drop(volumes_directory);
2280        filesystem.close().await.expect("Filesystem close");
2281    }
2282
2283    #[fuchsia::test(threads = 10)]
2284    async fn test_delete_profile() {
2285        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2286        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2287        let blob_resupplied_count =
2288            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2289        let volumes_directory = VolumesDirectory::new(
2290            root_volume(filesystem.clone()).await.unwrap(),
2291            Weak::new(),
2292            None,
2293            blob_resupplied_count,
2294            MemoryPressureConfig::default(),
2295        )
2296        .await
2297        .unwrap();
2298        let volume =
2299            volumes_directory.create_and_mount_volume("foo", None, true, None).await.unwrap();
2300
2301        volumes_directory
2302            .clone()
2303            .record_or_replay_profile("foo".to_owned(), 600)
2304            .await
2305            .expect("Recording");
2306
2307        // Deletion fails during in-flight recording.
2308        assert_eq!(
2309            volumes_directory.delete_profile("foo", "foo").await.expect_err("File shouldn't exist"),
2310            Status::UNAVAILABLE
2311        );
2312
2313        volumes_directory.stop_profile_tasks().await;
2314
2315        // Missing volume name.
2316        assert_eq!(
2317            volumes_directory.delete_profile("bar", "foo").await.expect_err("File shouldn't exist"),
2318            Status::NOT_FOUND
2319        );
2320
2321        // Missing Profile name.
2322        assert_eq!(
2323            volumes_directory.delete_profile("foo", "bar").await.expect_err("File shouldn't exist"),
2324            Status::NOT_FOUND
2325        );
2326
2327        // Deletion should now succeed as the profile will be placed as part of `finish_profiling()`
2328        volumes_directory.delete_profile("foo", "foo").await.expect("Deleting");
2329
2330        // Deletion fails as the file shouldn't exist anymore.
2331        assert_eq!(
2332            volumes_directory.delete_profile("foo", "foo").await.expect_err("File shouldn't exist"),
2333            Status::NOT_FOUND
2334        );
2335
2336        std::mem::drop(volume);
2337        volumes_directory.terminate().await;
2338        std::mem::drop(volumes_directory);
2339        filesystem.close().await.expect("Filesystem close");
2340    }
2341
2342    #[fuchsia::test(threads = 10)]
2343    async fn test_delete_volume_while_flushing() {
2344        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2345        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2346        let blob_resupplied_count =
2347            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2348        let volumes_directory = VolumesDirectory::new(
2349            root_volume(filesystem.clone()).await.unwrap(),
2350            Weak::new(),
2351            None,
2352            blob_resupplied_count,
2353            MemoryPressureConfig::default(),
2354        )
2355        .await
2356        .unwrap();
2357        let name = "vol";
2358        let volume =
2359            volumes_directory.create_and_mount_volume(name, None, false, None).await.unwrap();
2360        let mut transaction = filesystem
2361            .clone()
2362            .new_transaction(
2363                lock_keys![LockKey::object(
2364                    volume.volume().store().store_object_id(),
2365                    volume.root_dir().directory().object_id()
2366                )],
2367                Options::default(),
2368            )
2369            .await
2370            .unwrap();
2371        volume
2372            .root_dir()
2373            .directory()
2374            .create_child_file(&mut transaction, "foo")
2375            .await
2376            .expect("create_child_file failed");
2377        transaction.commit().await.expect("commit failed");
2378        volumes_directory
2379            .lock()
2380            .await
2381            .unmount(volume.volume().store().store_object_id())
2382            .await
2383            .expect("unmount failed");
2384
2385        let filesystem_clone = filesystem.clone();
2386        let filesystem_clone2 = filesystem.clone();
2387        let volumes_directory_clone1 = volumes_directory.clone();
2388        let volumes_directory_clone2 = volumes_directory.clone();
2389        let root_store_object_id = filesystem.root_store().store_object_id();
2390        let store_info_object_id = volume.volume().store().store_info_handle_object_id().unwrap();
2391        join!(
2392            async move {
2393                // Take a lock that the EndFlush transaction requires, so we interleave removing
2394                // the volume between StartFlush and EndFlush.  Release it on a timer since once the
2395                // volume is deleted, `remove_volume` needs to take this lock as well to tombstone
2396                // the store info.
2397                let _guard = filesystem_clone2
2398                    .lock_manager()
2399                    .read_lock(lock_keys![LockKey::object(
2400                        root_store_object_id,
2401                        store_info_object_id,
2402                    )])
2403                    .await;
2404                fasync::Timer::new(Duration::from_millis(200)).await;
2405            },
2406            async move {
2407                filesystem_clone.journal().compact().await.expect("Compact failed");
2408            },
2409            async move {
2410                if let Err(e) = volumes_directory_clone1.remove_volume(name).await {
2411                    if !FxfsError::NotFound.matches(&e) {
2412                        panic!("remove_volume failed: {e:?}");
2413                    }
2414                }
2415            },
2416            async move {
2417                if let Err(e) = volumes_directory_clone2.remove_volume(name).await {
2418                    if !FxfsError::NotFound.matches(&e) {
2419                        panic!("remove_volume failed: {e:?}");
2420                    }
2421                }
2422            },
2423        );
2424        volumes_directory.terminate().await;
2425        std::mem::drop(volumes_directory);
2426        filesystem.close().await.expect("Filesystem close");
2427    }
2428
2429    // This mostly just ensures that we exercise the code path. It was added because the path at
2430    // one point contained a deadlock.
2431    #[fuchsia::test(threads = 10)]
2432    async fn test_flush_before_mark_dirty_under_critical_memory_pressure() {
2433        let fixture = TestFixture::new().await;
2434        // Memory is critical, and it's always our fault.
2435        let _ = fixture
2436            .memory_pressure_proxy()
2437            .on_level_changed(MemoryPressureLevel::Critical)
2438            .await
2439            .expect("memory pressure FIDL");
2440        fixture.volumes_directory().max_dirty_bytes_when_critical.store(1, Ordering::Relaxed);
2441
2442        let root = fixture.root();
2443        let file = open_file_checked(
2444            &root,
2445            "foo",
2446            fio::Flags::FLAG_MAYBE_CREATE
2447                | fio::PERM_READABLE
2448                | fio::PERM_WRITABLE
2449                | fio::Flags::PROTOCOL_FILE,
2450            &Default::default(),
2451        )
2452        .await;
2453
2454        file.resize((zx::system_get_page_size() * 2).into())
2455            .await
2456            .expect("resize (FIDL)")
2457            .expect("resize failed");
2458        // The above resize creates zero pages which aren't dirty pages and don't contribute to the
2459        // dirty bytes count but still need to be flushed. The first write below will cross the
2460        // `max_dirty_bytes_when_critical` threshold but `minimize_memory` won't flush the file
2461        // because the zero pages aren't dirty pages. The second write below will also cross the
2462        // `max_dirty_bytes_when_critical` threshold and since the first write dirtied a page, a
2463        // flush will occur. The flush cleans the 2nd page which is a zero page and the kernel is
2464        // actively trying to dirty. The kernel doesn't end up dirtying the 2nd page because it's
2465        // concerned that the clean and dirty raced so it issues another dirty request for the same
2466        // page. The duplicate dirty request again crosses the `max_dirty_bytes_when_critical`
2467        // threshold. The file thinks that it has a dirty page so `minimize_memory` flushes the
2468        // file. `zx_pager_query_dirty_ranges` doesn't return any pages because the kernel didn't
2469        // actually dirty the page that fxfs thought it did. This causes only the file metadata to
2470        // be flushed and the dirty page count to not be reduced. The 2nd page finally gets dirtied
2471        // and now fxfs thinks it has 2 dirty pages instead of 1. `PagedObjectHandle` knows that
2472        // this can happen and will fix up the counts on the next flush. This test doesn't want to
2473        // deal with all of that so it syncs here to clean the zero pages that would cause that
2474        // mess.
2475        file.sync().await.expect("Failed to make sync call").expect("sync failed");
2476
2477        let vmo = file
2478            .get_backing_memory(fio::VmoFlags::READ | fio::VmoFlags::WRITE)
2479            .await
2480            .expect("get_backing_memory (FIDL)")
2481            .expect("get_backing_memory");
2482
2483        let buf = [0xAAu8];
2484        // One call to get dirty bytes over 0, the second to force a flush during mark_dirty.
2485        vmo.write(&buf, 0).expect("Writing to create dirty bytes");
2486        let before = fixture.volumes_directory().pager_dirty_bytes_count.load(Ordering::Relaxed);
2487        vmo.write(&buf, zx::system_get_page_size().into())
2488            .expect("Writing to force a flush during mark_dirty");
2489        // This is still the page size because we forced a flush of the first write during the
2490        // second write.
2491        assert_eq!(
2492            fixture.volumes_directory().pager_dirty_bytes_count.load(Ordering::Relaxed),
2493            before,
2494        );
2495
2496        fixture.close().await;
2497    }
2498
2499    #[fuchsia::test(threads = 10)]
2500    async fn test_delete_crypt_for_volume() {
2501        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2502        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2503        let store_id;
2504        {
2505            let blob_resupplied_count =
2506                Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2507            let volumes_directory = VolumesDirectory::new(
2508                root_volume(filesystem.clone()).await.unwrap(),
2509                Weak::new(),
2510                None,
2511                blob_resupplied_count,
2512                MemoryPressureConfig::default(),
2513            )
2514            .await
2515            .unwrap();
2516            let name = "vol";
2517            let crypt = Arc::new(new_insecure_crypt());
2518            let volume = volumes_directory
2519                .create_and_mount_volume(name, Some(crypt.clone()), false, None)
2520                .await
2521                .unwrap();
2522            store_id = volume.volume().store().store_object_id();
2523            // Make sure the volume has some journaled mutations.
2524            let mut transaction = filesystem
2525                .clone()
2526                .new_transaction(
2527                    lock_keys![LockKey::object(
2528                        volume.volume().store().store_object_id(),
2529                        volume.root_dir().directory().object_id()
2530                    )],
2531                    Options::default(),
2532                )
2533                .await
2534                .unwrap();
2535            volume
2536                .root_dir()
2537                .directory()
2538                .create_child_file(&mut transaction, "foo")
2539                .await
2540                .expect("create_child_file failed");
2541            transaction.commit().await.expect("commit failed");
2542
2543            let (volume_dir_proxy, dir_server_end) =
2544                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2545            volumes_directory
2546                .serve_volume(&volume, dir_server_end, false)
2547                .expect("serve_volume failed");
2548            let (root_dir, root_server_end) =
2549                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2550            volume_dir_proxy
2551                .open(
2552                    "root",
2553                    fio::PERM_READABLE | fio::PERM_WRITABLE | fio::Flags::PROTOCOL_DIRECTORY,
2554                    &Default::default(),
2555                    root_server_end.into_channel(),
2556                )
2557                .expect("Failed to open volume root");
2558
2559            let filesystem_clone = filesystem.clone();
2560            join!(
2561                async move {
2562                    filesystem_clone.journal().compact().await.expect("Compact failed");
2563                },
2564                async move {
2565                    let mut i = 0;
2566                    while let Ok(_) = fuchsia_fs::directory::open_file(
2567                        &root_dir,
2568                        &format!("foo{i}"),
2569                        fio::Flags::FLAG_MAYBE_CREATE | fio::PERM_READABLE,
2570                    )
2571                    .await
2572                    {
2573                        i += 1;
2574                    }
2575                },
2576                async move {
2577                    crypt.shutdown();
2578                },
2579            );
2580
2581            // Make sure we can still ask the volume to shutdown.
2582            let (admin_proxy, server_end) =
2583                fidl::endpoints::create_proxy::<fidl_fuchsia_fs::AdminMarker>();
2584            volume_dir_proxy
2585                .open(
2586                    &format!("svc/{}", fidl_fuchsia_fs::AdminMarker::PROTOCOL_NAME),
2587                    fio::Flags::PROTOCOL_SERVICE,
2588                    &Default::default(),
2589                    server_end.into(),
2590                )
2591                .expect("Failed to open Admin connection");
2592            admin_proxy.shutdown().await.expect("shutdown failed");
2593
2594            volumes_directory.terminate().await;
2595        }
2596        filesystem.close().await.expect("Filesystem close");
2597        let device = filesystem.take_device().await;
2598        device.reopen(false);
2599        let filesystem = FxFilesystem::open(device).await.expect("open failed");
2600        let options = FsckOptions { fail_on_warning: true, ..Default::default() };
2601        fsck_with_options(filesystem.clone(), &options).await.expect("fsck failed");
2602        fsck_volume_with_options(
2603            filesystem.as_ref(),
2604            &options,
2605            store_id,
2606            Some(Arc::new(new_insecure_crypt())),
2607        )
2608        .await
2609        .expect("fsck_volume failed");
2610        filesystem.close().await.expect("Filesystem close");
2611    }
2612
2613    /// Tests writing a partition image and installing a volume contained within.
2614    #[fuchsia::test(threads = 10)]
2615    async fn test_volume_installation() {
2616        let fixture = TestFixture::open(
2617            DeviceHolder::new(FakeDevice::new(1024, 4096)),
2618            TestFixtureOptions { format: true, encrypted: false, ..Default::default() },
2619        )
2620        .await;
2621
2622        // Create a file "foo" in the existing volume "vol". This should be gone after installation.
2623        {
2624            let file = open_file_checked(
2625                fixture.root(),
2626                "foo",
2627                fio::Flags::PROTOCOL_FILE | fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2628                &Default::default(),
2629            )
2630            .await;
2631            file.write("Hello, world!".as_bytes()).await.unwrap().expect("write failed");
2632        };
2633
2634        // Create another in-memory partition image with a different set of files.
2635        let image = {
2636            let inner_fixture = TestFixture::open(
2637                DeviceHolder::new(FakeDevice::new(512, 4096)),
2638                TestFixtureOptions { format: true, encrypted: false, ..Default::default() },
2639            )
2640            .await;
2641            let file = open_file_checked(
2642                inner_fixture.root(),
2643                "bar",
2644                fio::Flags::PROTOCOL_FILE | fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2645                &Default::default(),
2646            )
2647            .await;
2648            file.write("Well, this is new...".as_bytes()).await.unwrap().expect("write failed");
2649            file.close().await.unwrap().expect("close error");
2650            inner_fixture.close().await
2651        };
2652
2653        // Write the partition image to a file "install" in a new volume called "src".
2654        {
2655            let (src_out_dir, server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2656            fixture
2657                .volumes_directory()
2658                .create_and_serve_volume("src", server_end, Default::default(), Default::default())
2659                .await
2660                .unwrap();
2661            let src_root = open_dir_checked(
2662                &src_out_dir,
2663                "root",
2664                fio::PERM_READABLE | fio::PERM_WRITABLE,
2665                Default::default(),
2666            )
2667            .await;
2668            let file = open_file_checked(
2669                &src_root,
2670                "image",
2671                fio::Flags::PROTOCOL_FILE | fio::Flags::FLAG_MUST_CREATE | fio::PERM_WRITABLE,
2672                &Default::default(),
2673            )
2674            .await;
2675            write_image_to_file(image, file).await;
2676        };
2677
2678        // Installation should not be possible yet since both volumes are still mounted.
2679        assert!(
2680            fixture.volumes_directory().install_volume("src", "image", "vol").await.is_err(),
2681            "volume installation should fail while either src/dst is still mounted"
2682        );
2683
2684        // Now let's re-mount the filesystem manually without a fixture, install the volumes, and
2685        // spin up a new fixture to verify the result.
2686        let device = fixture.close().await;
2687        let fs = FxFilesystem::open(device).await.unwrap();
2688        {
2689            let root = root_volume(fs.clone()).await.unwrap();
2690            root.install_volume("src", "image", "vol").await.unwrap();
2691        }
2692        fs.close().await.unwrap();
2693        let device = fs.take_device().await;
2694        device.reopen(/*read_only*/ true);
2695        let fixture = TestFixture::open(
2696            device,
2697            TestFixtureOptions { encrypted: false, format: false, ..Default::default() },
2698        )
2699        .await;
2700
2701        // Ensure that the "src" volume is now gone, and the old file "foo" is gone from "vol".
2702        assert!(
2703            fixture.volumes_directory().mount_volume("src", None, false).await.is_err(),
2704            "src volume should be deleted after installation"
2705        );
2706        assert!(
2707            testing::open_file(
2708                fixture.volume_out_dir(),
2709                "foo",
2710                fio::PERM_READABLE,
2711                &Default::default()
2712            )
2713            .await
2714            .is_err(),
2715            "foo should be deleted after installation"
2716        );
2717
2718        // Check that we can find the contents of "vol" that we installed from the image.
2719        let file =
2720            open_file_checked(fixture.root(), "bar", fio::PERM_READABLE, &Default::default()).await;
2721        let data = file.read(fio::MAX_TRANSFER_SIZE).await.unwrap().expect("read failed");
2722        assert_eq!(String::from_utf8(data).unwrap(), "Well, this is new...");
2723        file.close().await.unwrap().unwrap();
2724
2725        fixture.close().await;
2726    }
2727
2728    #[fuchsia::test]
2729    async fn test_create_with_low_32_bit_ids() {
2730        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2731        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2732        let blob_resupplied_count =
2733            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2734
2735        {
2736            let volumes_directory = VolumesDirectory::new(
2737                root_volume(filesystem.clone()).await.unwrap(),
2738                Weak::new(),
2739                None,
2740                blob_resupplied_count,
2741                MemoryPressureConfig::default(),
2742            )
2743            .await
2744            .unwrap();
2745
2746            let mut guard = volumes_directory.lock().await;
2747
2748            let vol = guard
2749                .create_or_mount_volume(
2750                    "low_32",
2751                    None,
2752                    Mode::Create { guid: None, low_32_bit_object_ids: true },
2753                    false,
2754                )
2755                .await
2756                .expect("create volume failed");
2757
2758            let root_dir = vol.volume().store().root_directory_object_id();
2759            let root_dir = fxfs::object_store::Directory::open(vol.volume().store(), root_dir)
2760                .await
2761                .expect("open failed");
2762
2763            let mut transaction = filesystem
2764                .clone()
2765                .new_transaction(
2766                    lock_keys![LockKey::object(
2767                        vol.volume().store().store_object_id(),
2768                        root_dir.object_id()
2769                    )],
2770                    Options::default(),
2771                )
2772                .await
2773                .expect("new_transaction failed");
2774
2775            let object = root_dir
2776                .create_child_file(&mut transaction, "test")
2777                .await
2778                .expect("create_child_file failed");
2779
2780            // We can't check LastObjectIdInfo as it is private, but we can verify behavior.
2781            assert!(object.object_id() < 1 << 32);
2782            transaction.commit().await.expect("commit failed");
2783        };
2784
2785        filesystem.close().await.expect("close filesystem failed");
2786
2787        // Reopen and verify persistence
2788        let device = filesystem.take_device().await;
2789        device.reopen(false);
2790        let filesystem = FxFilesystem::open(device).await.unwrap();
2791        let blob_resupplied_count =
2792            Arc::new(PageRefaultCounter::new().expect("Failed to create PageRefaultCounter"));
2793        let volumes_directory = VolumesDirectory::new(
2794            root_volume(filesystem.clone()).await.unwrap(),
2795            Weak::new(),
2796            None,
2797            blob_resupplied_count,
2798            MemoryPressureConfig::default(),
2799        )
2800        .await
2801        .unwrap();
2802
2803        // Mount the volume again, and check that new files are still created with expected IDs.
2804        {
2805            let mut guard = volumes_directory.lock().await;
2806
2807            let vol = guard
2808                .create_or_mount_volume("low_32", None, Mode::Mount, false)
2809                .await
2810                .expect("mount volume failed");
2811
2812            let root_dir = vol.volume().store().root_directory_object_id();
2813            let root_dir = fxfs::object_store::Directory::open(vol.volume().store(), root_dir)
2814                .await
2815                .expect("open failed");
2816
2817            let mut transaction = filesystem
2818                .clone()
2819                .new_transaction(
2820                    lock_keys![LockKey::object(
2821                        vol.volume().store().store_object_id(),
2822                        root_dir.object_id()
2823                    )],
2824                    Options::default(),
2825                )
2826                .await
2827                .expect("new_transaction failed");
2828
2829            let object = root_dir
2830                .create_child_file(&mut transaction, "test2")
2831                .await
2832                .expect("create_child_file failed");
2833
2834            assert!(object.object_id() < 1 << 32);
2835            transaction.commit().await.expect("commit failed");
2836        }
2837
2838        filesystem.close().await.expect("close filesystem failed");
2839    }
2840
2841    #[fuchsia::test(threads = 10)]
2842    async fn test_race_unmount_and_flush_with_crypt_error() {
2843        let device = DeviceHolder::new(FakeDevice::new(8192, 512));
2844        let filesystem = FxFilesystem::new_empty(device).await.unwrap();
2845        let blob_resupplied_count = Arc::new(PageRefaultCounter::new().unwrap());
2846        let volumes_directory = VolumesDirectory::new(
2847            root_volume(filesystem.clone()).await.unwrap(),
2848            Weak::new(),
2849            None,
2850            blob_resupplied_count,
2851            MemoryPressureConfig::default(),
2852        )
2853        .await
2854        .unwrap();
2855
2856        let crypt_service = Arc::new(fxfs_crypt::CryptService::new());
2857        crypt_service
2858            .add_wrapping_key(0, fxfs_insecure_crypto::DATA_KEY.to_vec())
2859            .expect("add_wrapping_key failed");
2860        crypt_service
2861            .add_wrapping_key(1, fxfs_insecure_crypto::METADATA_KEY.to_vec())
2862            .expect("add_wrapping_key failed");
2863        crypt_service.set_active_key(KeyPurpose::Data, 0).expect("set_active_key failed");
2864        crypt_service.set_active_key(KeyPurpose::Metadata, 1).expect("set_active_key failed");
2865
2866        for _ in 0..20 {
2867            let (client, mut stream) = create_request_stream::<fidl_fuchsia_fxfs::CryptMarker>();
2868            let (close_tx, mut close_rx) = futures::channel::oneshot::channel::<()>();
2869
2870            let crypt_task = fasync::Task::spawn(async move {
2871                loop {
2872                    futures::select! {
2873                        _ = close_rx => return, // Close signal
2874                        request = stream.try_next() => {
2875                            match request {
2876                                Ok(Some(CryptRequest::CreateKey { responder, .. })) => {
2877                                    responder.send(Ok((&[0; 16], &[0; 48], &[0; 32]))).unwrap();
2878                                }
2879                                Ok(Some(CryptRequest::CreateKeyWithId {
2880                                        wrapping_key_id, responder, .. })) => {
2881                                    let key = WrappedKey::Fxfs(FxfsKey {
2882                                        wrapping_key_id,
2883                                        wrapped_key: [0u8; 48],
2884                                    });
2885                                    responder.send(Ok((&key, &[0; 32]))).unwrap();
2886                                }
2887                                Ok(Some(CryptRequest::UnwrapKey { responder, .. })) => {
2888                                    responder.send(Ok(&vec![0; 32])).unwrap();
2889                                }
2890                                _ => return,
2891                            }
2892                        }
2893                    }
2894                }
2895            });
2896
2897            let volume = volumes_directory
2898                .create_and_mount_volume(
2899                    "encrypted",
2900                    Some(Arc::new(RemoteCrypt::new(client))),
2901                    false,
2902                    None,
2903                )
2904                .await
2905                .unwrap();
2906
2907            // Write some data to dirty the journal.
2908            {
2909                let mut transaction = filesystem
2910                    .clone()
2911                    .new_transaction(
2912                        lock_keys![LockKey::object(
2913                            volume.volume().store().store_object_id(),
2914                            volume.root_dir().directory().object_id()
2915                        )],
2916                        Options::default(),
2917                    )
2918                    .await
2919                    .unwrap();
2920                volume
2921                    .root_dir()
2922                    .directory()
2923                    .create_child_file(&mut transaction, "foo")
2924                    .await
2925                    .expect("create_child_file failed");
2926                transaction.commit().await.expect("commit failed");
2927            }
2928
2929            let (dir_proxy, dir_server_end) =
2930                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
2931            volumes_directory.serve_volume(&volume, dir_server_end, false).unwrap();
2932            volumes_directory.lock().await.auto_unmount(volume.volume().store().store_object_id());
2933
2934            // Kill crypt.  The next usage should be in flush.
2935            let _ = close_tx.send(());
2936
2937            let filesystem_clone = filesystem.clone();
2938            let compact_task = fasync::Task::spawn(async move {
2939                // Flush should not fail, because that would close the journal for the rest of the
2940                // filesystem.  Instead, the volume should be force-locked and flushed (which
2941                // doesn't depend on crypt).
2942                filesystem_clone.object_manager().flush().await.expect("flush failed");
2943            });
2944
2945            // Trigger unmount.
2946            std::mem::drop(dir_proxy);
2947
2948            // Wait for volume to be unmounted, so we can clean up for the next iteration.
2949            let store_id = volume.volume().store().store_object_id();
2950            loop {
2951                {
2952                    let guard = volumes_directory.lock().await;
2953                    if !guard.mounted_volumes.contains_key(&store_id) {
2954                        break;
2955                    }
2956                }
2957                fasync::Timer::new(Duration::from_millis(10)).await;
2958            }
2959
2960            join!(compact_task, crypt_task);
2961            volumes_directory.remove_volume("encrypted").await.expect("remove_volume failed");
2962        }
2963        volumes_directory.terminate().await;
2964        filesystem.close().await.expect("close filesystem failed");
2965    }
2966}