Skip to main content

fxfs_platform/fuchsia/
component.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::debug::{FxfsDebug, handle_debug_request};
6use crate::fuchsia::errors::map_to_status;
7use crate::fuchsia::memory_pressure::MemoryPressureMonitor;
8use crate::fuchsia::volume::MemoryPressureConfig;
9use crate::fuchsia::volumes_directory::VolumesDirectory;
10use anyhow::{Context, Error, bail};
11use async_trait::async_trait;
12use block_client::{BlockClient as _, RemoteBlockClient};
13use fidl::endpoints::{ClientEnd, DiscoverableProtocolMarker, RequestStream};
14use fidl_fuchsia_fs::{AdminMarker, AdminRequest, AdminRequestStream};
15use fidl_fuchsia_fs_startup::{
16    CheckOptions, StartOptions, StartupMarker, StartupRequest, StartupRequestStream, VolumesMarker,
17    VolumesRequest, VolumesRequestStream,
18};
19use fidl_fuchsia_fxfs::{
20    DebugMarker, DebugRequestStream, VolumeInstallerMarker, VolumeInstallerRequest,
21    VolumeInstallerRequestStream,
22};
23use fidl_fuchsia_process_lifecycle::{LifecycleRequest, LifecycleRequestStream};
24use fidl_fuchsia_storage_block::BlockMarker;
25use fs_inspect::{FsInspect, FsInspectTree, InfoData, UsageData};
26use fuchsia_component_client::connect_to_protocol;
27use futures::TryStreamExt;
28use futures::lock::Mutex;
29use fxfs::filesystem::{FxFilesystem, FxFilesystemBuilder, MIN_BLOCK_SIZE, OpenFxFilesystem, mkfs};
30use fxfs::log::*;
31use fxfs::object_store::volume::root_volume;
32use fxfs::serialized_types::LATEST_VERSION;
33use fxfs::{fsck, metrics};
34use fxfs_trace::{TraceFutureExt, trace_future_args};
35use refaults_vmo::PageRefaultCounter;
36use std::ops::Deref;
37use std::sync::{Arc, Weak};
38use storage_device::DeviceHolder;
39use storage_device::block_device::BlockDevice;
40use vfs::directory::helper::DirectlyMutable;
41use vfs::execution_scope::ExecutionScope;
42use {
43    fidl_fuchsia_io as fio, fidl_fuchsia_memory_attribution as fattribution,
44    fuchsia_async as fasync,
45};
46
47const FXFS_INFO_NAME: &'static str = "fxfs";
48
49pub fn map_to_raw_status(e: Error) -> zx::sys::zx_status_t {
50    map_to_status(e).into_raw()
51}
52
53pub async fn new_block_client(remote: ClientEnd<BlockMarker>) -> Result<RemoteBlockClient, Error> {
54    Ok(RemoteBlockClient::new(remote.into_proxy()).await?)
55}
56
57/// Runs Fxfs as a component.
58pub struct Component {
59    state: futures::lock::Mutex<State>,
60
61    // The execution scope of the pseudo filesystem.
62    scope: ExecutionScope,
63
64    // The root of the pseudo filesystem for the component.
65    export_dir: Arc<vfs::directory::immutable::Simple>,
66}
67
68// Wrapper type to add `FsInspect` support to an `OpenFxFilesystem`.
69struct InspectedFxFilesystem(OpenFxFilesystem, /*fs_id=*/ u64);
70
71impl From<OpenFxFilesystem> for InspectedFxFilesystem {
72    fn from(fs: OpenFxFilesystem) -> Self {
73        Self(fs, zx::Event::create().koid().unwrap().raw_koid())
74    }
75}
76
77impl Deref for InspectedFxFilesystem {
78    type Target = Arc<FxFilesystem>;
79    fn deref(&self) -> &Self::Target {
80        &self.0.deref()
81    }
82}
83
84#[async_trait]
85impl FsInspect for InspectedFxFilesystem {
86    fn get_info_data(&self) -> InfoData {
87        let earliest_version = self.0.super_block_header().earliest_version;
88        InfoData {
89            id: self.1,
90            fs_type: fidl_fuchsia_fs::VfsType::Fxfs.into_primitive().into(),
91            name: FXFS_INFO_NAME.into(),
92            version_major: LATEST_VERSION.major.into(),
93            version_minor: LATEST_VERSION.minor.into(),
94            block_size: self.0.block_size() as u64,
95            max_filename_length: fio::MAX_NAME_LENGTH,
96            oldest_version: Some(format!("{}.{}", earliest_version.major, earliest_version.minor)),
97        }
98    }
99
100    async fn get_usage_data(&self) -> UsageData {
101        let info = self.0.get_info();
102        UsageData {
103            total_bytes: info.total_bytes,
104            used_bytes: info.used_bytes,
105            // TODO(https://fxbug.dev/42175930): Should these be moved to per-volume nodes?
106            total_nodes: 0,
107            used_nodes: 0,
108        }
109    }
110}
111
112struct RunningState {
113    // We have to wrap this in an Arc, even though it itself basically just wraps an Arc, so that
114    // FsInspectTree can reference `fs` as a Weak<dyn FsInspect>`.
115    fs: Arc<InspectedFxFilesystem>,
116    volumes: Arc<VolumesDirectory>,
117    _debug: Arc<FxfsDebug>,
118    _inspect_tree: Arc<FsInspectTree>,
119}
120
121enum State {
122    ComponentStarted,
123    Running(RunningState),
124}
125
126impl State {
127    async fn stop(&mut self, outgoing_dir: &vfs::directory::immutable::Simple) {
128        if let State::Running(RunningState { fs, volumes, .. }) =
129            std::mem::replace(self, State::ComponentStarted)
130        {
131            info!("Stopping Fxfs runtime; remaining connections will be forcibly closed");
132            let _ = outgoing_dir.remove_entry("volumes", /* must_be_directory: */ false);
133            let _ = outgoing_dir.remove_entry("debug", /* must_be_directory: */ false);
134            volumes.terminate().await;
135            let _ = fs.deref().close().await;
136        }
137    }
138}
139
140impl Component {
141    pub fn new() -> Arc<Self> {
142        Arc::new(Self {
143            state: Mutex::new(State::ComponentStarted),
144            scope: ExecutionScope::new(),
145            export_dir: vfs::directory::immutable::simple(),
146        })
147    }
148
149    /// Runs Fxfs as a component.
150    pub async fn run(
151        self: Arc<Self>,
152        outgoing_dir: zx::Channel,
153        lifecycle_channel: Option<zx::Channel>,
154    ) -> Result<(), Error> {
155        let svc_dir = vfs::directory::immutable::simple();
156        self.export_dir.add_entry("svc", svc_dir.clone()).expect("Unable to create svc dir");
157
158        let weak = Arc::downgrade(&self);
159        svc_dir.add_entry(
160            StartupMarker::PROTOCOL_NAME,
161            vfs::service::host(move |requests| {
162                let weak = weak.clone();
163                async move {
164                    if let Some(me) = weak.upgrade() {
165                        let _ = me.handle_startup_requests(requests).await;
166                    }
167                }
168            }),
169        )?;
170        let weak = Arc::downgrade(&self);
171        svc_dir.add_entry(
172            VolumesMarker::PROTOCOL_NAME,
173            vfs::service::host(move |requests| {
174                let weak = weak.clone();
175                async move {
176                    if let Some(me) = weak.upgrade() {
177                        me.handle_volumes_requests(requests).await;
178                    }
179                }
180            }),
181        )?;
182
183        let weak = Arc::downgrade(&self);
184        svc_dir.add_entry(
185            AdminMarker::PROTOCOL_NAME,
186            vfs::service::host(move |requests| {
187                let weak = weak.clone();
188                async move {
189                    if let Some(me) = weak.upgrade() {
190                        let _ = me.handle_admin_requests(requests).await;
191                    }
192                }
193            }),
194        )?;
195
196        let weak = Arc::downgrade(&self);
197        svc_dir.add_entry(
198            VolumeInstallerMarker::PROTOCOL_NAME,
199            vfs::service::host(move |requests| {
200                let weak = weak.clone();
201                async move {
202                    if let Some(me) = weak.upgrade() {
203                        if let Err(error) = me.handle_volume_installer_requests(requests).await {
204                            error!(error:?; "failed to handle VolumeInstaller requests");
205                        }
206                    }
207                }
208            }),
209        )?;
210
211        // TODO(b/315704445): Only enable in debug builds?
212        let weak = Arc::downgrade(&self);
213        svc_dir.add_entry(
214            DebugMarker::PROTOCOL_NAME,
215            vfs::service::host(move |requests| {
216                let weak = weak.clone();
217                async move {
218                    if let Some(me) = weak.upgrade() {
219                        let _ = me.handle_debug_requests(requests).await;
220                    }
221                }
222            }),
223        )?;
224
225        vfs::directory::serve_on(
226            self.export_dir.clone(),
227            fio::PERM_READABLE | fio::PERM_WRITABLE | fio::PERM_EXECUTABLE,
228            self.scope.clone(),
229            fidl::endpoints::ServerEnd::new(outgoing_dir),
230        );
231
232        if let Some(channel) = lifecycle_channel {
233            let me = self.clone();
234            self.scope.spawn(async move {
235                if let Err(error) = me.handle_lifecycle_requests(channel).await {
236                    warn!(error:?; "handle_lifecycle_requests");
237                }
238            });
239        }
240
241        self.scope.wait().await;
242
243        Ok(())
244    }
245
246    async fn handle_startup_requests(&self, mut stream: StartupRequestStream) -> Result<(), Error> {
247        while let Some(request) = stream.try_next().await? {
248            match request {
249                StartupRequest::Start { responder, device, options } => {
250                    async move {
251                        responder.send(self.handle_start(device, options).await.map_err(|error| {
252                            error!(error:?; "handle_start failed");
253                            map_to_raw_status(error)
254                        }))
255                    }
256                    .trace(trace_future_args!("Startup::Start"))
257                    .await?
258                }
259                StartupRequest::Format { responder, device, .. } => {
260                    async move {
261                        responder.send(self.handle_format(device).await.map_err(|error| {
262                            error!(error:?; "handle_format failed");
263                            map_to_raw_status(error)
264                        }))
265                    }
266                    .trace(trace_future_args!("Startup::Format"))
267                    .await?
268                }
269                StartupRequest::Check { responder, device, options } => {
270                    async move {
271                        responder.send(self.handle_check(device, options).await.map_err(|error| {
272                            error!(error:?; "handle_check failed");
273                            map_to_raw_status(error)
274                        }))
275                    }
276                    .trace(trace_future_args!("Startup::Check"))
277                    .await?
278                }
279            }
280        }
281        Ok(())
282    }
283
284    async fn handle_start(
285        &self,
286        device: ClientEnd<BlockMarker>,
287        options: StartOptions,
288    ) -> Result<(), Error> {
289        info!(options:?; "Received start request");
290        let mut state = self.state.lock().await;
291        // TODO(https://fxbug.dev/42174810): This is not very graceful.  It would be better for the client to
292        // explicitly shut down all volumes first, and make this fail if there are remaining active
293        // connections.  Fix the bug in fs_test which requires this.
294        state.stop(&self.export_dir).await;
295        let client = new_block_client(device).await?;
296
297        // TODO(https://fxbug.dev/42063349) Add support for block sizes greater than the page size.
298        assert!(client.block_size() <= zx::system_get_page_size());
299        assert!((zx::system_get_page_size() as u64) == MIN_BLOCK_SIZE);
300
301        let fs = FxFilesystemBuilder::new()
302            .fsck_after_every_transaction(options.fsck_after_every_transaction.unwrap_or(false))
303            .read_only(options.read_only.unwrap_or(false))
304            .inline_crypto_enabled(options.inline_crypto_enabled.unwrap_or(false))
305            .barriers_enabled(options.barriers_enabled.unwrap_or(false))
306            .open(DeviceHolder::new(
307                BlockDevice::new(client, options.read_only.unwrap_or(false)).await?,
308            ))
309            .await?;
310        let root_volume = root_volume(fs.clone()).await?;
311        let fs: Arc<InspectedFxFilesystem> = Arc::new(fs.into());
312        let weak_fs = Arc::downgrade(&fs) as Weak<dyn FsInspect + Send + Sync>;
313        let inspect_tree =
314            Arc::new(FsInspectTree::new(weak_fs, fuchsia_inspect::component::inspector().root()));
315
316        let blob_resupplied_count = Arc::new(PageRefaultCounter::new()?);
317        connect_to_protocol::<fattribution::PageRefaultSinkMarker>()?
318            .send_page_refault_count(blob_resupplied_count.readonly_vmo()?)?;
319
320        let mem_monitor = match MemoryPressureMonitor::start() {
321            Ok(v) => Some(v),
322            Err(error) => {
323                warn!(
324                    error:?;
325                    "Failed to connect to memory pressure monitor. Running \
326                     without pressure awareness."
327                );
328                None
329            }
330        };
331
332        let volumes = VolumesDirectory::new(
333            root_volume,
334            Arc::downgrade(&inspect_tree),
335            mem_monitor,
336            blob_resupplied_count,
337            MemoryPressureConfig::default(),
338        )
339        .await?;
340
341        self.export_dir.add_entry_may_overwrite(
342            "volumes",
343            volumes.directory_node().clone(),
344            /* overwrite: */ true,
345        )?;
346
347        let debug = FxfsDebug::new(&**fs, &volumes)?;
348        self.export_dir.add_entry_may_overwrite("debug", debug.root(), true)?;
349
350        fs.allocator().track_statistics(&metrics::detail(), "allocator");
351        fs.journal().track_statistics(&metrics::detail(), "journal");
352        fs.object_manager().track_statistics(&metrics::detail(), "object_manager");
353
354        let info = fs.get_info();
355        info!(
356            device_size = info.total_bytes,
357            used = info.used_bytes,
358            free = info.total_bytes - info.used_bytes;
359            "Mounted"
360        );
361
362        if let Some(profile_time) = options.startup_profiling_seconds {
363            // Unwrap ok, shouldn't have anything else recording or replaying this early in startup.
364            volumes.record_or_replay_profile(None, ".boot".to_owned(), profile_time).await.unwrap();
365        }
366
367        *state = State::Running(RunningState {
368            fs,
369            volumes,
370            _debug: debug,
371            _inspect_tree: inspect_tree,
372        });
373
374        Ok(())
375    }
376
377    async fn handle_format(&self, device: ClientEnd<BlockMarker>) -> Result<(), Error> {
378        let device = DeviceHolder::new(
379            BlockDevice::new(new_block_client(device).await?, /* read_only: */ false).await?,
380        );
381        mkfs(device).await?;
382        info!("Formatted filesystem");
383        Ok(())
384    }
385
386    async fn handle_check(
387        &self,
388        device: ClientEnd<BlockMarker>,
389        _options: CheckOptions,
390    ) -> Result<(), Error> {
391        let state = self.state.lock().await;
392        let (fs_container, fs) = match *state {
393            State::ComponentStarted => {
394                let client = new_block_client(device).await?;
395                let fs_container = FxFilesystemBuilder::new()
396                    .read_only(true)
397                    .open(DeviceHolder::new(BlockDevice::new(client, /* read_only: */ true).await?))
398                    .await?;
399                let fs = fs_container.clone();
400                (Some(fs_container), fs)
401            }
402            State::Running(RunningState { ref fs, .. }) => (None, fs.deref().deref().clone()),
403        };
404        let res = fsck::fsck(fs.clone()).await;
405        if let Some(fs_container) = fs_container {
406            let _ = fs_container.close().await;
407        }
408        // TODO(b/311550633): Stash ok res in inspect.
409        info!("handle_check for fs: {:?}", res?);
410        Ok(())
411    }
412
413    async fn handle_admin_requests(&self, mut stream: AdminRequestStream) -> Result<(), Error> {
414        while let Some(request) = stream.try_next().await.context("Reading request")? {
415            if self.handle_admin(request).await? {
416                break;
417            }
418        }
419        Ok(())
420    }
421
422    // Returns true if we should close the connection.
423    async fn handle_admin(&self, req: AdminRequest) -> Result<bool, Error> {
424        match req {
425            AdminRequest::Shutdown { responder } => {
426                async move {
427                    info!("Received shutdown request");
428                    self.shutdown().await;
429                    responder
430                        .send()
431                        .unwrap_or_else(|error| warn!(error:?; "Failed to send shutdown response"));
432                }
433                .trace(trace_future_args!("Admin::Shutdown"))
434                .await;
435                return Ok(true);
436            }
437        }
438    }
439
440    /// Handles fuchsia.fxfs.Debug requests, providing live debugging internals of the running
441    /// filesystem.
442    async fn handle_debug_requests(&self, mut stream: DebugRequestStream) -> Result<(), Error> {
443        while let Some(request) = stream.try_next().await.context("Reading request")? {
444            let state = self.state.lock().await;
445            let (fs, volumes) = match &*state {
446                State::ComponentStarted => {
447                    info!("Debug commands are not valid unless component is started.");
448                    bail!("Component not started");
449                }
450                State::Running(RunningState { fs, volumes, .. }) => {
451                    (fs.deref().deref().clone(), volumes.clone())
452                }
453            };
454            handle_debug_request(fs, volumes, request).await?;
455        }
456        Ok(())
457    }
458
459    async fn shutdown(&self) {
460        self.state.lock().await.stop(&self.export_dir).await;
461        info!("Filesystem terminated");
462    }
463
464    async fn handle_volumes_requests(&self, mut stream: VolumesRequestStream) {
465        let volumes =
466            if let State::Running(RunningState { volumes, .. }) = &*self.state.lock().await {
467                volumes.clone()
468            } else {
469                let _ = stream.into_inner().0.shutdown_with_epitaph(zx::Status::BAD_STATE);
470                return;
471            };
472        while let Ok(Some(request)) = stream.try_next().await {
473            match request {
474                VolumesRequest::Create {
475                    name,
476                    outgoing_directory,
477                    create_options,
478                    mount_options,
479                    responder,
480                } => {
481                    async {
482                        info!(
483                            name = name.as_str();
484                            "Create {}volume",
485                            if mount_options.crypt.is_some() { "encrypted " } else { "" }
486                        );
487                        responder
488                            .send(
489                                volumes
490                                    .create_and_serve_volume(
491                                        &name,
492                                        outgoing_directory.into_channel().into(),
493                                        mount_options,
494                                        create_options,
495                                    )
496                                    .await
497                                    .map_err(map_to_raw_status),
498                            )
499                            .unwrap_or_else(
500                                |error| warn!(error:?; "Failed to send volume creation response"),
501                            );
502                    }
503                    .trace(trace_future_args!("Volumes::Create"))
504                    .await;
505                }
506                VolumesRequest::Remove { name, responder } => {
507                    async {
508                        info!(name = name.as_str(); "Remove volume");
509                        responder
510                            .send(volumes.remove_volume(&name).await.map_err(map_to_raw_status))
511                            .unwrap_or_else(
512                                |error| warn!(error:?; "Failed to send volume removal response"),
513                            );
514                    }
515                    .trace(trace_future_args!("Volumes::Remove"))
516                    .await;
517                }
518                VolumesRequest::GetInfo { responder } => {
519                    async move {
520                        responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw())).unwrap_or_else(
521                            |error| warn!(error:?; "Failed to send volume removal response"),
522                        )
523                    }
524                    .trace(trace_future_args!("Volumes::GetInfo"))
525                    .await;
526                }
527            }
528        }
529    }
530
531    async fn handle_lifecycle_requests(&self, lifecycle_channel: zx::Channel) -> Result<(), Error> {
532        let mut stream =
533            LifecycleRequestStream::from_channel(fasync::Channel::from_channel(lifecycle_channel));
534        match stream.try_next().await.context("Reading request")? {
535            Some(LifecycleRequest::Stop { .. }) => {
536                info!("Received Lifecycle::Stop request");
537                self.shutdown().await;
538            }
539            None => {}
540        }
541        Ok(())
542    }
543
544    async fn handle_volume_installer_requests(
545        &self,
546        mut stream: VolumeInstallerRequestStream,
547    ) -> Result<(), Error> {
548        let volumes =
549            if let State::Running(RunningState { volumes, .. }) = &*self.state.lock().await {
550                volumes.clone()
551            } else {
552                let _ = stream.into_inner().0.shutdown_with_epitaph(zx::Status::BAD_STATE);
553                return Err(zx::Status::BAD_STATE).context("fxfs component not running");
554            };
555        while let Some(request) = stream.try_next().await.context("reading request")? {
556            match request {
557                VolumeInstallerRequest::Install { src, image_file, dst, responder } => {
558                    async {
559                        let response = volumes.install_volume(&src, &image_file, &dst).await;
560                        responder.send(response.map_err(|error| {
561                            error!(error:?; "install failed");
562                            map_to_raw_status(error)
563                        }))
564                    }
565                    .trace(trace_future_args!("VolumeInstaller::Install"))
566                    .await?;
567                }
568            }
569        }
570        Ok(())
571    }
572}
573
574#[cfg(test)]
575mod tests {
576    use super::{Component, new_block_client};
577    use fidl::endpoints::Proxy;
578    use fidl_fuchsia_fs::AdminMarker;
579    use fidl_fuchsia_fs_startup::{
580        CreateOptions, MountOptions, StartOptions, StartupMarker, VolumesMarker,
581    };
582    use fidl_fuchsia_process_lifecycle::{LifecycleMarker, LifecycleProxy};
583    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
584    use fuchsia_fs::directory::readdir;
585    use futures::future::{BoxFuture, FusedFuture, FutureExt};
586    use futures::{pin_mut, select};
587    use fxfs::filesystem::FxFilesystem;
588    use fxfs::object_store::volume::root_volume;
589    use fxfs::object_store::{NewChildStoreOptions, StoreOptions};
590    use std::pin::Pin;
591    use std::sync::Arc;
592    use storage_device::DeviceHolder;
593    use storage_device::block_device::BlockDevice;
594    use vmo_backed_block_server::{
595        InitialContents, VmoBackedServer, VmoBackedServerOptions, VmoBackedServerTestingExt,
596    };
597    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
598
599    async fn run_test(
600        callback: impl Fn(&fio::DirectoryProxy, LifecycleProxy) -> BoxFuture<'static, ()>,
601    ) -> (Pin<Box<impl FusedFuture>>, Arc<VmoBackedServer>) {
602        const BLOCK_SIZE: u32 = 512;
603        let block_server = Arc::new(
604            VmoBackedServerOptions {
605                block_size: BLOCK_SIZE,
606                initial_contents: InitialContents::FromCapacity(BLOCK_SIZE as u64 * 16384),
607                ..Default::default()
608            }
609            .build()
610            .expect("build failed"),
611        );
612
613        {
614            let fs = FxFilesystem::new_empty(DeviceHolder::new(
615                BlockDevice::new(
616                    new_block_client(block_server.connect())
617                        .await
618                        .expect("Unable to create block client"),
619                    false,
620                )
621                .await
622                .unwrap(),
623            ))
624            .await
625            .expect("FxFilesystem::new_empty failed");
626            {
627                let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
628                root_volume
629                    .new_volume("default", NewChildStoreOptions::default())
630                    .await
631                    .expect("Create volume failed");
632            }
633            fs.close().await.expect("close failed");
634        }
635
636        let (client_end, server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
637
638        let (lifecycle_client, lifecycle_server) =
639            fidl::endpoints::create_proxy::<LifecycleMarker>();
640
641        let mut component_task = Box::pin(
642            async {
643                Component::new()
644                    .run(server_end.into_channel(), Some(lifecycle_server.into_channel()))
645                    .await
646                    .expect("Failed to run component");
647            }
648            .fuse(),
649        );
650
651        let startup_proxy = connect_to_protocol_at_dir_svc::<StartupMarker>(&client_end)
652            .expect("Unable to connect to Startup protocol");
653        let block_server_connection = block_server.connect();
654        let task = async {
655            startup_proxy
656                .start(block_server_connection, &StartOptions::default())
657                .await
658                .expect("Start failed (FIDL)")
659                .expect("Start failed");
660            callback(&client_end, lifecycle_client).await;
661        }
662        .fuse();
663
664        pin_mut!(task);
665
666        loop {
667            select! {
668                () = component_task => {},
669                () = task => break,
670            }
671        }
672
673        (component_task, block_server)
674    }
675
676    #[fuchsia::test(threads = 2)]
677    async fn test_shutdown() {
678        let (component_task, _block_server) = run_test(|client, _| {
679            let admin_proxy = connect_to_protocol_at_dir_svc::<AdminMarker>(client)
680                .expect("Unable to connect to Admin protocol");
681            async move {
682                admin_proxy.shutdown().await.expect("shutdown failed");
683            }
684            .boxed()
685        })
686        .await;
687        assert!(!component_task.is_terminated());
688    }
689
690    #[fuchsia::test(threads = 2)]
691    async fn test_lifecycle_stop() {
692        let (component_task, _block_server) = run_test(|_, lifecycle_client| {
693            lifecycle_client.stop().expect("Stop failed");
694            async move {
695                fasync::OnSignals::new(
696                    &lifecycle_client.into_channel().expect("into_channel failed"),
697                    zx::Signals::CHANNEL_PEER_CLOSED,
698                )
699                .await
700                .expect("OnSignals failed");
701            }
702            .boxed()
703        })
704        .await;
705        component_task.await;
706    }
707
708    #[fuchsia::test(threads = 2)]
709    async fn test_create_and_remove() {
710        let (component_task, _block_server) = run_test(|client, _| {
711            let volumes_proxy = connect_to_protocol_at_dir_svc::<VolumesMarker>(client)
712                .expect("Unable to connect to Volumes protocol");
713
714            let fs_admin_proxy = connect_to_protocol_at_dir_svc::<AdminMarker>(client)
715                .expect("Unable to connect to Admin protocol");
716
717            async move {
718                let (dir_proxy, server_end) =
719                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
720                volumes_proxy
721                    .create("test", server_end, CreateOptions::default(), MountOptions::default())
722                    .await
723                    .expect("fidl failed")
724                    .expect("create failed");
725
726                // This should fail whilst the volume is mounted.
727                volumes_proxy
728                    .remove("test")
729                    .await
730                    .expect("fidl failed")
731                    .expect_err("remove succeeded");
732
733                let volume_admin_proxy = connect_to_protocol_at_dir_svc::<AdminMarker>(&dir_proxy)
734                    .expect("Unable to connect to Admin protocol");
735                volume_admin_proxy.shutdown().await.expect("shutdown failed");
736
737                // Creating another volume with the same name should fail.
738                let (_dir_proxy, server_end) =
739                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
740                volumes_proxy
741                    .create("test", server_end, CreateOptions::default(), MountOptions::default())
742                    .await
743                    .expect("fidl failed")
744                    .expect_err("create succeeded");
745
746                volumes_proxy.remove("test").await.expect("fidl failed").expect("remove failed");
747
748                // Removing a non-existent volume should fail.
749                volumes_proxy
750                    .remove("test")
751                    .await
752                    .expect("fidl failed")
753                    .expect_err("remove failed");
754
755                // Create the same volume again and it should now succeed.
756                let (_dir_proxy, server_end) =
757                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
758                volumes_proxy
759                    .create("test", server_end, CreateOptions::default(), MountOptions::default())
760                    .await
761                    .expect("fidl failed")
762                    .expect("create failed");
763
764                fs_admin_proxy.shutdown().await.expect("shutdown failed");
765            }
766            .boxed()
767        })
768        .await;
769        component_task.await;
770    }
771
772    #[fuchsia::test(threads = 2)]
773    async fn test_volumes_enumeration() {
774        let (component_task, _block_server) = run_test(|client, _| {
775            let volumes_proxy = connect_to_protocol_at_dir_svc::<VolumesMarker>(client)
776                .expect("Unable to connect to Volumes protocol");
777
778            let (volumes_dir_proxy, server_end) =
779                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
780            client
781                .open("volumes", fio::PERM_READABLE, &Default::default(), server_end.into_channel())
782                .expect("open failed");
783
784            let fs_admin_proxy = connect_to_protocol_at_dir_svc::<AdminMarker>(client)
785                .expect("Unable to connect to Admin protocol");
786
787            async move {
788                let (_dir_proxy, server_end) =
789                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
790                volumes_proxy
791                    .create("test", server_end, CreateOptions::default(), MountOptions::default())
792                    .await
793                    .expect("fidl failed")
794                    .expect("create failed");
795
796                let entries = readdir(&volumes_dir_proxy).await.expect("readdir failed");
797                let mut entry_names = entries.iter().map(|d| d.name.as_str()).collect::<Vec<_>>();
798                entry_names.sort();
799                assert_eq!(entry_names, ["default", "test"]);
800
801                fs_admin_proxy.shutdown().await.expect("shutdown failed");
802            }
803            .boxed()
804        })
805        .await;
806        component_task.await;
807    }
808
809    #[fuchsia::test(threads = 2)]
810    async fn test_create_with_guid() {
811        let (component_task, block_server) = run_test(|client, _| {
812            let volumes_proxy = connect_to_protocol_at_dir_svc::<VolumesMarker>(client)
813                .expect("Unable to connect to Volumes protocol");
814            let admin_proxy = connect_to_protocol_at_dir_svc::<AdminMarker>(client)
815                .expect("Unable to connect to Admin protocol");
816            async move {
817                let (_dir_proxy, server_end) =
818                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
819                let guid = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
820                volumes_proxy
821                    .create(
822                        "test_guid",
823                        server_end,
824                        CreateOptions { guid: Some(guid), ..Default::default() },
825                        MountOptions::default(),
826                    )
827                    .await
828                    .expect("fidl failed")
829                    .expect("create failed");
830                admin_proxy.shutdown().await.expect("shutdown failed");
831            }
832            .boxed()
833        })
834        .await;
835        component_task.await;
836
837        // Verify the GUID
838        let fs = FxFilesystem::open(DeviceHolder::new(
839            BlockDevice::new(
840                new_block_client(block_server.connect())
841                    .await
842                    .expect("Unable to create block client"),
843                false,
844            )
845            .await
846            .unwrap(),
847        ))
848        .await
849        .expect("FxFilesystem::open failed");
850        {
851            let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
852            let vol = root_volume
853                .volume("test_guid", StoreOptions::default())
854                .await
855                .expect("Open volume failed");
856            assert_eq!(vol.guid(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]);
857        }
858        fs.close().await.expect("close failed");
859    }
860}