Skip to main content

fxfs_platform_testing/fuchsia/
component.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::debug::{FxfsDebug, handle_debug_request};
6use crate::fuchsia::errors::map_to_status;
7use crate::fuchsia::memory_pressure::MemoryPressureMonitor;
8use crate::fuchsia::power::FuchsiaPowerManager;
9use crate::fuchsia::volume::MemoryPressureConfig;
10use crate::fuchsia::volumes_directory::VolumesDirectory;
11use anyhow::{Context, Error, bail};
12use async_trait::async_trait;
13use block_client::{BlockClient as _, RemoteBlockClient};
14use fidl::endpoints::{ClientEnd, DiscoverableProtocolMarker, RequestStream};
15use fidl_fuchsia_fs::{AdminMarker, AdminRequest, AdminRequestStream};
16use fidl_fuchsia_fs_startup::{
17    CheckOptions, StartOptions, StartupMarker, StartupRequest, StartupRequestStream, VolumesMarker,
18    VolumesRequest, VolumesRequestStream,
19};
20use fidl_fuchsia_fxfs::{
21    DebugMarker, DebugRequestStream, VolumeInstallerMarker, VolumeInstallerRequest,
22    VolumeInstallerRequestStream,
23};
24use fidl_fuchsia_io as fio;
25use fidl_fuchsia_memory_attribution as fattribution;
26use fidl_fuchsia_process_lifecycle::{LifecycleRequest, LifecycleRequestStream};
27use fidl_fuchsia_storage_block::BlockMarker;
28use fs_inspect::{FsInspect, FsInspectTree, InfoData, UsageData};
29use fuchsia_async as fasync;
30use fuchsia_component_client::connect_to_protocol;
31use futures::TryStreamExt;
32use futures::lock::Mutex;
33use fxfs::filesystem::{FxFilesystem, FxFilesystemBuilder, MIN_BLOCK_SIZE, OpenFxFilesystem, mkfs};
34use fxfs::log::*;
35use fxfs::object_store::volume::root_volume;
36use fxfs::serialized_types::LATEST_VERSION;
37use fxfs::{fsck, metrics};
38use fxfs_trace::{TraceFutureExt, trace_future_args};
39use refaults_vmo::PageRefaultCounter;
40use std::ops::Deref;
41use std::sync::{Arc, Weak};
42use storage_device::DeviceHolder;
43use storage_device::block_device::BlockDevice;
44use vfs::directory::helper::DirectlyMutable;
45use vfs::execution_scope::ExecutionScope;
46
47const FXFS_INFO_NAME: &'static str = "fxfs";
48
49pub fn map_to_raw_status(e: Error) -> zx::sys::zx_status_t {
50    map_to_status(e).into_raw()
51}
52
53pub async fn new_block_client(remote: ClientEnd<BlockMarker>) -> Result<RemoteBlockClient, Error> {
54    Ok(RemoteBlockClient::new(remote.into_proxy()).await?)
55}
56
57/// Runs Fxfs as a component.
58pub struct Component {
59    state: futures::lock::Mutex<State>,
60
61    // The execution scope of the pseudo filesystem.
62    scope: ExecutionScope,
63
64    // The root of the pseudo filesystem for the component.
65    export_dir: Arc<vfs::directory::immutable::Simple>,
66}
67
68// Wrapper type to add `FsInspect` support to an `OpenFxFilesystem`.
69struct InspectedFxFilesystem(OpenFxFilesystem, /*fs_id=*/ u64);
70
71impl From<OpenFxFilesystem> for InspectedFxFilesystem {
72    fn from(fs: OpenFxFilesystem) -> Self {
73        Self(fs, zx::Event::create().koid().unwrap().raw_koid())
74    }
75}
76
77impl Deref for InspectedFxFilesystem {
78    type Target = Arc<FxFilesystem>;
79    fn deref(&self) -> &Self::Target {
80        &self.0.deref()
81    }
82}
83
84#[async_trait]
85impl FsInspect for InspectedFxFilesystem {
86    fn get_info_data(&self) -> InfoData {
87        let earliest_version = self.0.super_block_header().earliest_version;
88        InfoData {
89            id: self.1,
90            fs_type: fidl_fuchsia_fs::VfsType::Fxfs.into_primitive().into(),
91            name: FXFS_INFO_NAME.into(),
92            version_major: LATEST_VERSION.major.into(),
93            version_minor: LATEST_VERSION.minor.into(),
94            block_size: self.0.block_size() as u64,
95            max_filename_length: fio::MAX_NAME_LENGTH,
96            oldest_version: Some(format!("{}.{}", earliest_version.major, earliest_version.minor)),
97        }
98    }
99
100    async fn get_usage_data(&self) -> UsageData {
101        let info = self.0.get_info();
102        UsageData {
103            total_bytes: info.total_bytes,
104            used_bytes: info.used_bytes,
105            // TODO(https://fxbug.dev/42175930): Should these be moved to per-volume nodes?
106            total_nodes: 0,
107            used_nodes: 0,
108        }
109    }
110}
111
112struct RunningState {
113    // We have to wrap this in an Arc, even though it itself basically just wraps an Arc, so that
114    // FsInspectTree can reference `fs` as a Weak<dyn FsInspect>`.
115    fs: Arc<InspectedFxFilesystem>,
116    volumes: Arc<VolumesDirectory>,
117    _debug: Arc<FxfsDebug>,
118    _inspect_tree: Arc<FsInspectTree>,
119}
120
121enum State {
122    ComponentStarted,
123    Running(RunningState),
124}
125
126impl State {
127    async fn stop(&mut self, outgoing_dir: &vfs::directory::immutable::Simple) {
128        if let State::Running(RunningState { fs, volumes, .. }) =
129            std::mem::replace(self, State::ComponentStarted)
130        {
131            info!("Stopping Fxfs runtime; remaining connections will be forcibly closed");
132            let _ = outgoing_dir.remove_entry("volumes", /* must_be_directory: */ false);
133            let _ = outgoing_dir.remove_entry("debug", /* must_be_directory: */ false);
134            volumes.terminate().await;
135            let _ = fs.deref().close().await;
136        }
137    }
138}
139
140impl Component {
141    pub fn new() -> Arc<Self> {
142        Arc::new(Self {
143            state: Mutex::new(State::ComponentStarted),
144            scope: ExecutionScope::new(),
145            export_dir: vfs::directory::immutable::simple(),
146        })
147    }
148
149    /// Runs Fxfs as a component.
150    pub async fn run(
151        self: Arc<Self>,
152        outgoing_dir: zx::Channel,
153        lifecycle_channel: Option<zx::Channel>,
154    ) -> Result<(), Error> {
155        let svc_dir = vfs::directory::immutable::simple();
156        self.export_dir.add_entry("svc", svc_dir.clone()).expect("Unable to create svc dir");
157
158        let weak = Arc::downgrade(&self);
159        svc_dir.add_entry(
160            StartupMarker::PROTOCOL_NAME,
161            vfs::service::host(move |requests| {
162                let weak = weak.clone();
163                async move {
164                    if let Some(me) = weak.upgrade() {
165                        let _ = me.handle_startup_requests(requests).await;
166                    }
167                }
168            }),
169        )?;
170        let weak = Arc::downgrade(&self);
171        svc_dir.add_entry(
172            VolumesMarker::PROTOCOL_NAME,
173            vfs::service::host(move |requests| {
174                let weak = weak.clone();
175                async move {
176                    if let Some(me) = weak.upgrade() {
177                        me.handle_volumes_requests(requests).await;
178                    }
179                }
180            }),
181        )?;
182
183        let weak = Arc::downgrade(&self);
184        svc_dir.add_entry(
185            AdminMarker::PROTOCOL_NAME,
186            vfs::service::host(move |requests| {
187                let weak = weak.clone();
188                async move {
189                    if let Some(me) = weak.upgrade() {
190                        let _ = me.handle_admin_requests(requests).await;
191                    }
192                }
193            }),
194        )?;
195
196        let weak = Arc::downgrade(&self);
197        svc_dir.add_entry(
198            VolumeInstallerMarker::PROTOCOL_NAME,
199            vfs::service::host(move |requests| {
200                let weak = weak.clone();
201                async move {
202                    if let Some(me) = weak.upgrade() {
203                        if let Err(error) = me.handle_volume_installer_requests(requests).await {
204                            error!(error:?; "failed to handle VolumeInstaller requests");
205                        }
206                    }
207                }
208            }),
209        )?;
210
211        // TODO(b/315704445): Only enable in debug builds?
212        let weak = Arc::downgrade(&self);
213        svc_dir.add_entry(
214            DebugMarker::PROTOCOL_NAME,
215            vfs::service::host(move |requests| {
216                let weak = weak.clone();
217                async move {
218                    if let Some(me) = weak.upgrade() {
219                        let _ = me.handle_debug_requests(requests).await;
220                    }
221                }
222            }),
223        )?;
224
225        vfs::directory::serve_on(
226            self.export_dir.clone(),
227            fio::PERM_READABLE | fio::PERM_WRITABLE | fio::PERM_EXECUTABLE,
228            self.scope.clone(),
229            fidl::endpoints::ServerEnd::new(outgoing_dir),
230        );
231
232        if let Some(channel) = lifecycle_channel {
233            let me = self.clone();
234            self.scope.spawn(async move {
235                if let Err(error) = me.handle_lifecycle_requests(channel).await {
236                    warn!(error:?; "handle_lifecycle_requests");
237                }
238            });
239        }
240
241        self.scope.wait().await;
242
243        Ok(())
244    }
245
246    async fn handle_startup_requests(&self, mut stream: StartupRequestStream) -> Result<(), Error> {
247        while let Some(request) = stream.try_next().await? {
248            match request {
249                StartupRequest::Start { responder, device, options } => {
250                    async move {
251                        responder.send(self.handle_start(device, options).await.map_err(|error| {
252                            error!(error:?; "handle_start failed");
253                            map_to_raw_status(error)
254                        }))
255                    }
256                    .trace(trace_future_args!("Startup::Start"))
257                    .await?
258                }
259                StartupRequest::Format { responder, device, .. } => {
260                    async move {
261                        responder.send(self.handle_format(device).await.map_err(|error| {
262                            error!(error:?; "handle_format failed");
263                            map_to_raw_status(error)
264                        }))
265                    }
266                    .trace(trace_future_args!("Startup::Format"))
267                    .await?
268                }
269                StartupRequest::Check { responder, device, options } => {
270                    async move {
271                        responder.send(self.handle_check(device, options).await.map_err(|error| {
272                            error!(error:?; "handle_check failed");
273                            map_to_raw_status(error)
274                        }))
275                    }
276                    .trace(trace_future_args!("Startup::Check"))
277                    .await?
278                }
279            }
280        }
281        Ok(())
282    }
283
284    async fn handle_start(
285        &self,
286        device: ClientEnd<BlockMarker>,
287        options: StartOptions,
288    ) -> Result<(), Error> {
289        info!(options:?; "Received start request");
290        let mut state = self.state.lock().await;
291        // TODO(https://fxbug.dev/42174810): This is not very graceful.  It would be better for the client to
292        // explicitly shut down all volumes first, and make this fail if there are remaining active
293        // connections.  Fix the bug in fs_test which requires this.
294        state.stop(&self.export_dir).await;
295        let client = new_block_client(device).await?;
296
297        // TODO(https://fxbug.dev/42063349) Add support for block sizes greater than the page size.
298        assert!(client.block_size() <= zx::system_get_page_size());
299        assert!((zx::system_get_page_size() as u64) == MIN_BLOCK_SIZE);
300
301        let fs = FxFilesystemBuilder::new()
302            .fsck_after_every_transaction(options.fsck_after_every_transaction.unwrap_or(false))
303            .read_only(options.read_only.unwrap_or(false))
304            .inline_crypto_enabled(options.inline_crypto_enabled.unwrap_or(false))
305            .barriers_enabled(options.barriers_enabled.unwrap_or(false))
306            .power_manager(FuchsiaPowerManager::new())
307            .open(DeviceHolder::new(
308                BlockDevice::new(client, options.read_only.unwrap_or(false)).await?,
309            ))
310            .await?;
311        let root_volume = root_volume(fs.clone()).await?;
312        let fs: Arc<InspectedFxFilesystem> = Arc::new(fs.into());
313        let weak_fs = Arc::downgrade(&fs) as Weak<dyn FsInspect + Send + Sync>;
314        let inspect_tree =
315            Arc::new(FsInspectTree::new(weak_fs, fuchsia_inspect::component::inspector().root()));
316
317        let blob_resupplied_count = Arc::new(PageRefaultCounter::new()?);
318        connect_to_protocol::<fattribution::PageRefaultSinkMarker>()?
319            .send_page_refault_count(blob_resupplied_count.readonly_vmo()?)?;
320
321        let mem_monitor = match MemoryPressureMonitor::start() {
322            Ok(v) => Some(v),
323            Err(error) => {
324                warn!(
325                    error:?;
326                    "Failed to connect to memory pressure monitor. Running \
327                     without pressure awareness."
328                );
329                None
330            }
331        };
332
333        let volumes = VolumesDirectory::new(
334            root_volume,
335            Arc::downgrade(&inspect_tree),
336            mem_monitor,
337            blob_resupplied_count,
338            MemoryPressureConfig::default(),
339        )
340        .await?;
341
342        self.export_dir.add_entry_may_overwrite(
343            "volumes",
344            volumes.directory_node().clone(),
345            /* overwrite: */ true,
346        )?;
347
348        let debug = FxfsDebug::new(&**fs, &volumes)?;
349        self.export_dir.add_entry_may_overwrite("debug", debug.root(), true)?;
350
351        fs.allocator().track_statistics(&metrics::detail(), "allocator");
352        fs.journal().track_statistics(&metrics::detail(), "journal");
353        fs.object_manager().track_statistics(&metrics::detail(), "object_manager");
354
355        let info = fs.get_info();
356        info!(
357            device_size = info.total_bytes,
358            used = info.used_bytes,
359            free = info.total_bytes - info.used_bytes;
360            "Mounted"
361        );
362
363        if let Some(profile_time) = options.startup_profiling_seconds {
364            // Unwrap ok, shouldn't have anything else recording or replaying this early in startup.
365            volumes.record_or_replay_profile(None, ".boot".to_owned(), profile_time).await.unwrap();
366        }
367
368        *state = State::Running(RunningState {
369            fs,
370            volumes,
371            _debug: debug,
372            _inspect_tree: inspect_tree,
373        });
374
375        Ok(())
376    }
377
378    async fn handle_format(&self, device: ClientEnd<BlockMarker>) -> Result<(), Error> {
379        let device = DeviceHolder::new(
380            BlockDevice::new(new_block_client(device).await?, /* read_only: */ false).await?,
381        );
382        mkfs(device).await?;
383        info!("Formatted filesystem");
384        Ok(())
385    }
386
387    async fn handle_check(
388        &self,
389        device: ClientEnd<BlockMarker>,
390        _options: CheckOptions,
391    ) -> Result<(), Error> {
392        let state = self.state.lock().await;
393        let (fs_container, fs) = match *state {
394            State::ComponentStarted => {
395                let client = new_block_client(device).await?;
396                let fs_container = FxFilesystemBuilder::new()
397                    .read_only(true)
398                    .open(DeviceHolder::new(BlockDevice::new(client, /* read_only: */ true).await?))
399                    .await?;
400                let fs = fs_container.clone();
401                (Some(fs_container), fs)
402            }
403            State::Running(RunningState { ref fs, .. }) => (None, fs.deref().deref().clone()),
404        };
405        let res = fsck::fsck(fs.clone()).await;
406        if let Some(fs_container) = fs_container {
407            let _ = fs_container.close().await;
408        }
409        // TODO(b/311550633): Stash ok res in inspect.
410        info!("handle_check for fs: {:?}", res?);
411        Ok(())
412    }
413
414    async fn handle_admin_requests(&self, mut stream: AdminRequestStream) -> Result<(), Error> {
415        while let Some(request) = stream.try_next().await.context("Reading request")? {
416            if self.handle_admin(request).await? {
417                break;
418            }
419        }
420        Ok(())
421    }
422
423    // Returns true if we should close the connection.
424    async fn handle_admin(&self, req: AdminRequest) -> Result<bool, Error> {
425        match req {
426            AdminRequest::Shutdown { responder } => {
427                async move {
428                    info!("Received shutdown request");
429                    self.shutdown().await;
430                    responder
431                        .send()
432                        .unwrap_or_else(|error| warn!(error:?; "Failed to send shutdown response"));
433                }
434                .trace(trace_future_args!("Admin::Shutdown"))
435                .await;
436                return Ok(true);
437            }
438        }
439    }
440
441    /// Handles fuchsia.fxfs.Debug requests, providing live debugging internals of the running
442    /// filesystem.
443    async fn handle_debug_requests(&self, mut stream: DebugRequestStream) -> Result<(), Error> {
444        while let Some(request) = stream.try_next().await.context("Reading request")? {
445            let state = self.state.lock().await;
446            let (fs, volumes) = match &*state {
447                State::ComponentStarted => {
448                    info!("Debug commands are not valid unless component is started.");
449                    bail!("Component not started");
450                }
451                State::Running(RunningState { fs, volumes, .. }) => {
452                    (fs.deref().deref().clone(), volumes.clone())
453                }
454            };
455            handle_debug_request(fs, volumes, request).await?;
456        }
457        Ok(())
458    }
459
460    async fn shutdown(&self) {
461        self.state.lock().await.stop(&self.export_dir).await;
462        info!("Filesystem terminated");
463    }
464
465    async fn handle_volumes_requests(&self, mut stream: VolumesRequestStream) {
466        let volumes =
467            if let State::Running(RunningState { volumes, .. }) = &*self.state.lock().await {
468                volumes.clone()
469            } else {
470                let _ = stream.into_inner().0.shutdown_with_epitaph(zx::Status::BAD_STATE);
471                return;
472            };
473        while let Ok(Some(request)) = stream.try_next().await {
474            match request {
475                VolumesRequest::Create {
476                    name,
477                    outgoing_directory,
478                    create_options,
479                    mount_options,
480                    responder,
481                } => {
482                    async {
483                        info!(
484                            name = name.as_str();
485                            "Create {}volume",
486                            if mount_options.crypt.is_some() { "encrypted " } else { "" }
487                        );
488                        responder
489                            .send(
490                                volumes
491                                    .create_and_serve_volume(
492                                        &name,
493                                        outgoing_directory.into_channel().into(),
494                                        mount_options,
495                                        create_options,
496                                    )
497                                    .await
498                                    .map_err(map_to_raw_status),
499                            )
500                            .unwrap_or_else(
501                                |error| warn!(error:?; "Failed to send volume creation response"),
502                            );
503                    }
504                    .trace(trace_future_args!("Volumes::Create"))
505                    .await;
506                }
507                VolumesRequest::Remove { name, responder } => {
508                    async {
509                        info!(name = name.as_str(); "Remove volume");
510                        responder
511                            .send(volumes.remove_volume(&name).await.map_err(map_to_raw_status))
512                            .unwrap_or_else(
513                                |error| warn!(error:?; "Failed to send volume removal response"),
514                            );
515                    }
516                    .trace(trace_future_args!("Volumes::Remove"))
517                    .await;
518                }
519                VolumesRequest::GetInfo { responder } => {
520                    async move {
521                        responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw())).unwrap_or_else(
522                            |error| warn!(error:?; "Failed to send volume removal response"),
523                        )
524                    }
525                    .trace(trace_future_args!("Volumes::GetInfo"))
526                    .await;
527                }
528            }
529        }
530    }
531
532    async fn handle_lifecycle_requests(&self, lifecycle_channel: zx::Channel) -> Result<(), Error> {
533        let mut stream =
534            LifecycleRequestStream::from_channel(fasync::Channel::from_channel(lifecycle_channel));
535        match stream.try_next().await.context("Reading request")? {
536            Some(LifecycleRequest::Stop { .. }) => {
537                info!("Received Lifecycle::Stop request");
538                self.shutdown().await;
539            }
540            None => {}
541        }
542        Ok(())
543    }
544
545    async fn handle_volume_installer_requests(
546        &self,
547        mut stream: VolumeInstallerRequestStream,
548    ) -> Result<(), Error> {
549        let volumes =
550            if let State::Running(RunningState { volumes, .. }) = &*self.state.lock().await {
551                volumes.clone()
552            } else {
553                let _ = stream.into_inner().0.shutdown_with_epitaph(zx::Status::BAD_STATE);
554                return Err(zx::Status::BAD_STATE).context("fxfs component not running");
555            };
556        while let Some(request) = stream.try_next().await.context("reading request")? {
557            match request {
558                VolumeInstallerRequest::Install { src, image_file, dst, responder } => {
559                    async {
560                        let response = volumes.install_volume(&src, &image_file, &dst).await;
561                        responder.send(response.map_err(|error| {
562                            error!(error:?; "install failed");
563                            map_to_raw_status(error)
564                        }))
565                    }
566                    .trace(trace_future_args!("VolumeInstaller::Install"))
567                    .await?;
568                }
569            }
570        }
571        Ok(())
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    use super::{Component, new_block_client};
578    use fidl::endpoints::Proxy;
579    use fidl_fuchsia_fs::AdminMarker;
580    use fidl_fuchsia_fs_startup::{
581        CreateOptions, MountOptions, StartOptions, StartupMarker, VolumesMarker,
582    };
583    use fidl_fuchsia_io as fio;
584    use fidl_fuchsia_process_lifecycle::{LifecycleMarker, LifecycleProxy};
585    use fuchsia_async as fasync;
586    use fuchsia_component_client::connect_to_protocol_at_dir_svc;
587    use fuchsia_fs::directory::readdir;
588    use futures::future::{BoxFuture, FusedFuture, FutureExt};
589    use futures::{pin_mut, select};
590    use fxfs::filesystem::FxFilesystem;
591    use fxfs::object_store::volume::root_volume;
592    use fxfs::object_store::{NewChildStoreOptions, StoreOptions};
593    use std::pin::Pin;
594    use std::sync::Arc;
595    use storage_device::DeviceHolder;
596    use storage_device::block_device::BlockDevice;
597    use test_vmo_backed_block_server::VmoBackedServer;
598
599    async fn run_test(
600        callback: impl Fn(&fio::DirectoryProxy, LifecycleProxy) -> BoxFuture<'static, ()>,
601    ) -> (Pin<Box<impl FusedFuture>>, Arc<VmoBackedServer>) {
602        const BLOCK_SIZE: u32 = 512;
603        let block_server = Arc::new(
604            VmoBackedServer::new(16384, BLOCK_SIZE, &[]).expect("Failed to create VmoBackedServer"),
605        );
606
607        {
608            let fs = FxFilesystem::new_empty(DeviceHolder::new(
609                BlockDevice::new(
610                    new_block_client(block_server.connect())
611                        .await
612                        .expect("Unable to create block client"),
613                    false,
614                )
615                .await
616                .unwrap(),
617            ))
618            .await
619            .expect("FxFilesystem::new_empty failed");
620            {
621                let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
622                root_volume
623                    .new_volume("default", NewChildStoreOptions::default())
624                    .await
625                    .expect("Create volume failed");
626            }
627            fs.close().await.expect("close failed");
628        }
629
630        let (client_end, server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
631
632        let (lifecycle_client, lifecycle_server) =
633            fidl::endpoints::create_proxy::<LifecycleMarker>();
634
635        let mut component_task = Box::pin(
636            async {
637                Component::new()
638                    .run(server_end.into_channel(), Some(lifecycle_server.into_channel()))
639                    .await
640                    .expect("Failed to run component");
641            }
642            .fuse(),
643        );
644
645        let startup_proxy = connect_to_protocol_at_dir_svc::<StartupMarker>(&client_end)
646            .expect("Unable to connect to Startup protocol");
647        let block_server_connection = block_server.connect();
648        let task = async {
649            startup_proxy
650                .start(block_server_connection, &StartOptions::default())
651                .await
652                .expect("Start failed (FIDL)")
653                .expect("Start failed");
654            callback(&client_end, lifecycle_client).await;
655        }
656        .fuse();
657
658        pin_mut!(task);
659
660        loop {
661            select! {
662                () = component_task => {},
663                () = task => break,
664            }
665        }
666
667        (component_task, block_server)
668    }
669
670    #[fuchsia::test(threads = 2)]
671    async fn test_shutdown() {
672        let (component_task, _block_server) = run_test(|client, _| {
673            let admin_proxy = connect_to_protocol_at_dir_svc::<AdminMarker>(client)
674                .expect("Unable to connect to Admin protocol");
675            async move {
676                admin_proxy.shutdown().await.expect("shutdown failed");
677            }
678            .boxed()
679        })
680        .await;
681        assert!(!component_task.is_terminated());
682    }
683
684    #[fuchsia::test(threads = 2)]
685    async fn test_lifecycle_stop() {
686        let (component_task, _block_server) = run_test(|_, lifecycle_client| {
687            lifecycle_client.stop().expect("Stop failed");
688            async move {
689                fasync::OnSignals::new(
690                    &lifecycle_client.into_channel().expect("into_channel failed"),
691                    zx::Signals::CHANNEL_PEER_CLOSED,
692                )
693                .await
694                .expect("OnSignals failed");
695            }
696            .boxed()
697        })
698        .await;
699        component_task.await;
700    }
701
702    #[fuchsia::test(threads = 2)]
703    async fn test_create_and_remove() {
704        let (component_task, _block_server) = run_test(|client, _| {
705            let volumes_proxy = connect_to_protocol_at_dir_svc::<VolumesMarker>(client)
706                .expect("Unable to connect to Volumes protocol");
707
708            let fs_admin_proxy = connect_to_protocol_at_dir_svc::<AdminMarker>(client)
709                .expect("Unable to connect to Admin protocol");
710
711            async move {
712                let (dir_proxy, server_end) =
713                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
714                volumes_proxy
715                    .create("test", server_end, CreateOptions::default(), MountOptions::default())
716                    .await
717                    .expect("fidl failed")
718                    .expect("create failed");
719
720                // This should fail whilst the volume is mounted.
721                volumes_proxy
722                    .remove("test")
723                    .await
724                    .expect("fidl failed")
725                    .expect_err("remove succeeded");
726
727                let volume_admin_proxy = connect_to_protocol_at_dir_svc::<AdminMarker>(&dir_proxy)
728                    .expect("Unable to connect to Admin protocol");
729                volume_admin_proxy.shutdown().await.expect("shutdown failed");
730
731                // Creating another volume with the same name should fail.
732                let (_dir_proxy, server_end) =
733                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
734                volumes_proxy
735                    .create("test", server_end, CreateOptions::default(), MountOptions::default())
736                    .await
737                    .expect("fidl failed")
738                    .expect_err("create succeeded");
739
740                volumes_proxy.remove("test").await.expect("fidl failed").expect("remove failed");
741
742                // Removing a non-existent volume should fail.
743                volumes_proxy
744                    .remove("test")
745                    .await
746                    .expect("fidl failed")
747                    .expect_err("remove failed");
748
749                // Create the same volume again and it should now succeed.
750                let (_dir_proxy, server_end) =
751                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
752                volumes_proxy
753                    .create("test", server_end, CreateOptions::default(), MountOptions::default())
754                    .await
755                    .expect("fidl failed")
756                    .expect("create failed");
757
758                fs_admin_proxy.shutdown().await.expect("shutdown failed");
759            }
760            .boxed()
761        })
762        .await;
763        component_task.await;
764    }
765
766    #[fuchsia::test(threads = 2)]
767    async fn test_volumes_enumeration() {
768        let (component_task, _block_server) = run_test(|client, _| {
769            let volumes_proxy = connect_to_protocol_at_dir_svc::<VolumesMarker>(client)
770                .expect("Unable to connect to Volumes protocol");
771
772            let (volumes_dir_proxy, server_end) =
773                fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
774            client
775                .open("volumes", fio::PERM_READABLE, &Default::default(), server_end.into_channel())
776                .expect("open failed");
777
778            let fs_admin_proxy = connect_to_protocol_at_dir_svc::<AdminMarker>(client)
779                .expect("Unable to connect to Admin protocol");
780
781            async move {
782                let (_dir_proxy, server_end) =
783                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
784                volumes_proxy
785                    .create("test", server_end, CreateOptions::default(), MountOptions::default())
786                    .await
787                    .expect("fidl failed")
788                    .expect("create failed");
789
790                let entries = readdir(&volumes_dir_proxy).await.expect("readdir failed");
791                let mut entry_names = entries.iter().map(|d| d.name.as_str()).collect::<Vec<_>>();
792                entry_names.sort();
793                assert_eq!(entry_names, ["default", "test"]);
794
795                fs_admin_proxy.shutdown().await.expect("shutdown failed");
796            }
797            .boxed()
798        })
799        .await;
800        component_task.await;
801    }
802
803    #[fuchsia::test(threads = 2)]
804    async fn test_create_with_guid() {
805        let (component_task, block_server) = run_test(|client, _| {
806            let volumes_proxy = connect_to_protocol_at_dir_svc::<VolumesMarker>(client)
807                .expect("Unable to connect to Volumes protocol");
808            let admin_proxy = connect_to_protocol_at_dir_svc::<AdminMarker>(client)
809                .expect("Unable to connect to Admin protocol");
810            async move {
811                let (_dir_proxy, server_end) =
812                    fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
813                let guid = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
814                volumes_proxy
815                    .create(
816                        "test_guid",
817                        server_end,
818                        CreateOptions { guid: Some(guid), ..Default::default() },
819                        MountOptions::default(),
820                    )
821                    .await
822                    .expect("fidl failed")
823                    .expect("create failed");
824                admin_proxy.shutdown().await.expect("shutdown failed");
825            }
826            .boxed()
827        })
828        .await;
829        component_task.await;
830
831        // Verify the GUID
832        let fs = FxFilesystem::open(DeviceHolder::new(
833            BlockDevice::new(
834                new_block_client(block_server.connect())
835                    .await
836                    .expect("Unable to create block client"),
837                false,
838            )
839            .await
840            .unwrap(),
841        ))
842        .await
843        .expect("FxFilesystem::open failed");
844        {
845            let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
846            let vol = root_volume
847                .volume("test_guid", StoreOptions::default())
848                .await
849                .expect("Open volume failed");
850            assert_eq!(vol.guid(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]);
851        }
852        fs.close().await.expect("close failed");
853    }
854}