gpt_component/
service.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::config::Config;
6use crate::gpt::GptManager;
7use anyhow::{Context as _, Error};
8use block_client::RemoteBlockClient;
9use fidl::endpoints::{DiscoverableProtocolMarker as _, RequestStream as _, ServiceMarker as _};
10use futures::lock::Mutex as AsyncMutex;
11use futures::stream::TryStreamExt as _;
12use std::sync::Arc;
13use vfs::directory::helper::DirectlyMutable as _;
14use vfs::execution_scope::ExecutionScope;
15use {
16    fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup, fidl_fuchsia_io as fio,
17    fidl_fuchsia_process_lifecycle as flifecycle, fidl_fuchsia_storage_block as fblock,
18    fidl_fuchsia_storage_partitions as fpartitions, fuchsia_async as fasync,
19};
20
21pub struct StorageHostService {
22    state: AsyncMutex<State>,
23
24    // The execution scope of the pseudo filesystem.
25    scope: ExecutionScope,
26
27    // The root of the pseudo filesystem for the component.
28    export_dir: Arc<vfs::directory::immutable::Simple>,
29
30    // A directory where partitions are published.
31    partitions_dir: Arc<vfs::directory::immutable::Simple>,
32}
33
34#[derive(Default)]
35enum State {
36    #[default]
37    Stopped,
38    /// The GPT is malformed and needs to be reformatted with ResetPartitionTables before it can be
39    /// used.  The component will publish an empty partitions directory.
40    NeedsFormatting(Config, fblock::BlockProxy),
41    Running(Arc<GptManager>),
42}
43
44impl State {
45    fn is_stopped(&self) -> bool {
46        if let Self::Stopped = self { true } else { false }
47    }
48}
49
50impl StorageHostService {
51    pub fn new() -> Arc<Self> {
52        let export_dir = vfs::directory::immutable::simple();
53        let partitions_dir = vfs::directory::immutable::simple();
54        Arc::new(Self {
55            state: Default::default(),
56            scope: ExecutionScope::new(),
57            export_dir,
58            partitions_dir,
59        })
60    }
61
62    pub async fn run(
63        self: Arc<Self>,
64        outgoing_dir: zx::Channel,
65        lifecycle_channel: Option<zx::Channel>,
66    ) -> Result<(), Error> {
67        let svc_dir = vfs::directory::immutable::simple();
68        self.export_dir.add_entry("svc", svc_dir.clone()).expect("Unable to create svc dir");
69
70        svc_dir
71            .add_entry(
72                fpartitions::PartitionServiceMarker::SERVICE_NAME,
73                self.partitions_dir.clone(),
74            )
75            .unwrap();
76        let weak = Arc::downgrade(&self);
77        svc_dir
78            .add_entry(
79                fstartup::StartupMarker::PROTOCOL_NAME,
80                vfs::service::host(move |requests| {
81                    let weak = weak.clone();
82                    async move {
83                        if let Some(me) = weak.upgrade() {
84                            let _ = me.handle_start_requests(requests).await;
85                        }
86                    }
87                }),
88            )
89            .unwrap();
90        let weak = Arc::downgrade(&self);
91        svc_dir
92            .add_entry(
93                fpartitions::PartitionsAdminMarker::PROTOCOL_NAME,
94                vfs::service::host(move |requests| {
95                    let weak = weak.clone();
96                    async move {
97                        if let Some(me) = weak.upgrade() {
98                            let _ = me.handle_partitions_admin_requests(requests).await;
99                        }
100                    }
101                }),
102            )
103            .unwrap();
104        let weak = Arc::downgrade(&self);
105        svc_dir
106            .add_entry(
107                fpartitions::PartitionsManagerMarker::PROTOCOL_NAME,
108                vfs::service::host(move |requests| {
109                    let weak = weak.clone();
110                    async move {
111                        if let Some(me) = weak.upgrade() {
112                            let _ = me.handle_partitions_manager_requests(requests).await;
113                        }
114                    }
115                }),
116            )
117            .unwrap();
118        let weak = Arc::downgrade(&self);
119        svc_dir
120            .add_entry(
121                ffs::AdminMarker::PROTOCOL_NAME,
122                vfs::service::host(move |requests| {
123                    let weak = weak.clone();
124                    async move {
125                        if let Some(me) = weak.upgrade() {
126                            let _ = me.handle_admin_requests(requests).await;
127                        }
128                    }
129                }),
130            )
131            .unwrap();
132
133        vfs::directory::serve_on(
134            self.export_dir.clone(),
135            fio::PERM_READABLE | fio::PERM_WRITABLE | fio::PERM_EXECUTABLE,
136            self.scope.clone(),
137            fidl::endpoints::ServerEnd::new(outgoing_dir),
138        );
139
140        if let Some(channel) = lifecycle_channel {
141            let me = self.clone();
142            self.scope.spawn(async move {
143                if let Err(e) = me.handle_lifecycle_requests(channel).await {
144                    log::warn!(error:? = e; "handle_lifecycle_requests");
145                }
146            });
147        }
148
149        self.scope.wait().await;
150
151        Ok(())
152    }
153
154    async fn handle_start_requests(
155        self: Arc<Self>,
156        mut stream: fstartup::StartupRequestStream,
157    ) -> Result<(), Error> {
158        while let Some(request) = stream.try_next().await.context("Reading request")? {
159            log::debug!(request:?; "");
160            match request {
161                fstartup::StartupRequest::Start { device, options, responder } => {
162                    responder
163                        .send(
164                            self.start(device.into_proxy(), options.into())
165                                .await
166                                .map_err(|status| status.into_raw()),
167                        )
168                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
169                }
170                fstartup::StartupRequest::Format { responder, .. } => {
171                    responder
172                        .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
173                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
174                }
175                fstartup::StartupRequest::Check { responder, .. } => {
176                    responder
177                        .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
178                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
179                }
180            }
181        }
182        Ok(())
183    }
184
185    async fn start(
186        self: &Arc<Self>,
187        device: fblock::BlockProxy,
188        config: Config,
189    ) -> Result<(), zx::Status> {
190        log::info!(config:?; "GPT starting");
191        let mut state = self.state.lock().await;
192        if !state.is_stopped() {
193            log::warn!("Device already bound");
194            return Err(zx::Status::ALREADY_BOUND);
195        }
196
197        // TODO(https://fxbug.dev/339491886): It would be better if `start` failed on a malformed
198        // device, rather than hiding this state from fshost.  However, fs_management isn't well set
199        // up to deal with this, because it ties the outgoing directory to a successful return from
200        // `start` (see `ServingMultiVolumeFilesystem`), and fshost resolves the queued requests for
201        // the filesystem only after `start` is successful.  We should refactor fs_management and
202        // fshost to better support this case, which might require changing how queueing works so
203        // there's more flexibility to either resolve queueing when the component starts up (what we
204        // need here), or when `Start` is successful (what a filesystem like Fxfs needs).
205        *state = match GptManager::new_with_config(
206            device.clone(),
207            self.partitions_dir.clone(),
208            config.clone(),
209        )
210        .await
211        {
212            Ok(runner) => State::Running(runner),
213            Err(err) => {
214                // This is a warning because, as described above, we can't return an error from
215                // here, so there are normal flows that can print this error message that don't
216                // indicate a bug or incorrect behavior.
217                log::warn!(err:?; "Failed to load GPT.  Reformatting may be required.");
218                State::NeedsFormatting(config, device)
219            }
220        };
221        Ok(())
222    }
223
224    async fn handle_partitions_manager_requests(
225        self: Arc<Self>,
226        mut stream: fpartitions::PartitionsManagerRequestStream,
227    ) -> Result<(), Error> {
228        while let Some(request) = stream.try_next().await.context("Reading request")? {
229            log::debug!(request:?; "");
230            match request {
231                fpartitions::PartitionsManagerRequest::GetBlockInfo { responder } => {
232                    responder
233                        .send(self.get_block_info().await.map_err(|status| status.into_raw()))
234                        .unwrap_or_else(
235                            |e| log::error!(e:?; "Failed to send GetBlockInfo response"),
236                        );
237                }
238                fpartitions::PartitionsManagerRequest::CreateTransaction { responder } => {
239                    responder
240                        .send(self.create_transaction().await.map_err(|status| status.into_raw()))
241                        .unwrap_or_else(
242                            |e| log::error!(e:?; "Failed to send CreateTransaction response"),
243                        );
244                }
245                fpartitions::PartitionsManagerRequest::CommitTransaction {
246                    transaction,
247                    responder,
248                } => {
249                    responder
250                        .send(
251                            self.commit_transaction(transaction)
252                                .await
253                                .map_err(|status| status.into_raw()),
254                        )
255                        .unwrap_or_else(
256                            |e| log::error!(e:?; "Failed to send CommitTransaction response"),
257                        );
258                }
259                fpartitions::PartitionsManagerRequest::AddPartition { payload, responder } => {
260                    responder
261                        .send(self.add_partition(payload).await.map_err(|status| status.into_raw()))
262                        .unwrap_or_else(
263                            |e| log::error!(e:?; "Failed to send AddPartition response"),
264                        );
265                }
266            }
267        }
268        Ok(())
269    }
270
271    async fn get_block_info(&self) -> Result<(u64, u32), zx::Status> {
272        let state = self.state.lock().await;
273        match &*state {
274            State::Stopped => return Err(zx::Status::BAD_STATE),
275            State::NeedsFormatting(_, block) => {
276                let info = block
277                    .get_info()
278                    .await
279                    .map_err(|err| {
280                        log::error!(err:?; "get_block_info: failed to query block info");
281                        zx::Status::IO
282                    })?
283                    .map_err(zx::Status::from_raw)?;
284                Ok((info.block_count, info.block_size))
285            }
286            State::Running(gpt) => Ok((gpt.block_count(), gpt.block_size())),
287        }
288    }
289
290    async fn create_transaction(&self) -> Result<zx::EventPair, zx::Status> {
291        let gpt_manager = self.gpt_manager().await?;
292        gpt_manager.create_transaction().await
293    }
294
295    async fn commit_transaction(&self, transaction: zx::EventPair) -> Result<(), zx::Status> {
296        let gpt_manager = self.gpt_manager().await?;
297        gpt_manager.commit_transaction(transaction).await
298    }
299
300    async fn add_partition(
301        &self,
302        request: fpartitions::PartitionsManagerAddPartitionRequest,
303    ) -> Result<(), zx::Status> {
304        let gpt_manager = self.gpt_manager().await?;
305        gpt_manager.add_partition(request).await
306    }
307
308    async fn handle_partitions_admin_requests(
309        self: Arc<Self>,
310        mut stream: fpartitions::PartitionsAdminRequestStream,
311    ) -> Result<(), Error> {
312        while let Some(request) = stream.try_next().await.context("Reading request")? {
313            log::debug!(request:?; "");
314            match request {
315                fpartitions::PartitionsAdminRequest::ResetPartitionTable {
316                    partitions,
317                    responder,
318                } => {
319                    responder
320                        .send(
321                            self.reset_partition_table(partitions)
322                                .await
323                                .map_err(|status| status.into_raw()),
324                        )
325                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
326                }
327            }
328        }
329        Ok(())
330    }
331
332    async fn reset_partition_table(
333        &self,
334        partitions: Vec<fpartitions::PartitionInfo>,
335    ) -> Result<(), zx::Status> {
336        fn convert_partition_info(info: fpartitions::PartitionInfo) -> gpt::PartitionInfo {
337            gpt::PartitionInfo {
338                label: info.name,
339                type_guid: gpt::Guid::from_bytes(info.type_guid.value),
340                instance_guid: gpt::Guid::from_bytes(info.instance_guid.value),
341                start_block: info.start_block,
342                num_blocks: info.num_blocks,
343                flags: info.flags,
344            }
345        }
346        let partitions = partitions.into_iter().map(convert_partition_info).collect::<Vec<_>>();
347
348        let mut state = self.state.lock().await;
349        match &mut *state {
350            State::Stopped => return Err(zx::Status::BAD_STATE),
351            State::NeedsFormatting(config, block) => {
352                log::info!("reset_partition_table: Reformatting GPT.");
353                let client = Arc::new(RemoteBlockClient::new(block.clone()).await?);
354
355                log::info!("reset_partition_table: Reformatting GPT...");
356                gpt::Gpt::format(client, partitions).await.map_err(|err| {
357                    log::error!(err:?; "reset_partition_table: failed to init GPT");
358                    zx::Status::IO
359                })?;
360                *state = State::Running(
361                    GptManager::new_with_config(
362                        block.clone(),
363                        self.partitions_dir.clone(),
364                        config.clone(),
365                    )
366                    .await
367                    .map_err(|err| {
368                        log::error!(err:?; "reset_partition_table: failed to re-launch GPT");
369                        zx::Status::BAD_STATE
370                    })?,
371                );
372            }
373            State::Running(gpt) => {
374                log::info!("reset_partition_table: Updating GPT.");
375                gpt.reset_partition_table(partitions).await?;
376            }
377        }
378        Ok(())
379    }
380
381    async fn handle_admin_requests(
382        &self,
383        mut stream: ffs::AdminRequestStream,
384    ) -> Result<(), Error> {
385        if let Some(request) = stream.try_next().await.context("Reading request")? {
386            match request {
387                ffs::AdminRequest::Shutdown { responder } => {
388                    log::info!("Received Admin::Shutdown request");
389                    self.shutdown().await;
390                    responder
391                        .send()
392                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send shutdown response"));
393                    log::info!("Admin shutdown complete");
394                }
395            }
396        }
397        Ok(())
398    }
399
400    async fn handle_lifecycle_requests(&self, lifecycle_channel: zx::Channel) -> Result<(), Error> {
401        let mut stream = flifecycle::LifecycleRequestStream::from_channel(
402            fasync::Channel::from_channel(lifecycle_channel),
403        );
404        match stream.try_next().await.context("Reading request")? {
405            Some(flifecycle::LifecycleRequest::Stop { .. }) => {
406                log::info!("Received Lifecycle::Stop request");
407                self.shutdown().await;
408                log::info!("Lifecycle shutdown complete");
409            }
410            None => {}
411        }
412        Ok(())
413    }
414
415    async fn gpt_manager(&self) -> Result<Arc<GptManager>, zx::Status> {
416        match &*self.state.lock().await {
417            State::Stopped | State::NeedsFormatting(_, _) => Err(zx::Status::BAD_STATE),
418            State::Running(gpt) => Ok(gpt.clone()),
419        }
420    }
421
422    async fn shutdown(&self) {
423        let mut state = self.state.lock().await;
424        if let State::Running(gpt) = std::mem::take(&mut *state) {
425            gpt.shutdown().await;
426        }
427        self.scope.shutdown();
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use super::StorageHostService;
434    use block_client::RemoteBlockClient;
435    use fidl::endpoints::Proxy as _;
436    use fidl_fuchsia_process_lifecycle::LifecycleMarker;
437    use fuchsia_component::client::connect_to_protocol_at_dir_svc;
438    use futures::FutureExt as _;
439    use gpt::{Gpt, Guid, PartitionInfo};
440    use std::sync::Arc;
441    use vmo_backed_block_server::{VmoBackedServer, VmoBackedServerTestingExt as _};
442    use {
443        fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup, fidl_fuchsia_io as fio,
444        fidl_fuchsia_storage_block as fblock, fidl_fuchsia_storage_partitions as fpartitions,
445        fuchsia_async as fasync,
446    };
447
448    async fn setup_server(
449        block_size: u32,
450        block_count: u64,
451        partitions: Vec<PartitionInfo>,
452    ) -> Arc<VmoBackedServer> {
453        let vmo = zx::Vmo::create(block_size as u64 * block_count).unwrap();
454        let server = Arc::new(VmoBackedServer::from_vmo(512, vmo));
455        {
456            let (block_client, block_server) =
457                fidl::endpoints::create_proxy::<fblock::BlockMarker>();
458            let volume_stream = fidl::endpoints::ServerEnd::<fblock::BlockMarker>::from(
459                block_server.into_channel(),
460            )
461            .into_stream();
462            let server_clone = server.clone();
463            let _task = fasync::Task::spawn(async move { server_clone.serve(volume_stream).await });
464            let client = Arc::new(RemoteBlockClient::new(block_client).await.unwrap());
465            Gpt::format(client, partitions).await.unwrap();
466        }
467        server
468    }
469
470    #[fuchsia::test]
471    async fn lifecycle() {
472        let (outgoing_dir, outgoing_dir_server) =
473            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
474        let (lifecycle_client, lifecycle_server) =
475            fidl::endpoints::create_proxy::<LifecycleMarker>();
476        let (block_client, block_server) =
477            fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
478        let volume_stream =
479            fidl::endpoints::ServerEnd::<fblock::BlockMarker>::from(block_server.into_channel())
480                .into_stream();
481
482        futures::join!(
483            async {
484                // Client
485                let client =
486                    connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
487                        .unwrap();
488                client
489                    .start(block_client, &fstartup::StartOptions::default())
490                    .await
491                    .expect("FIDL error")
492                    .expect("Start failed");
493                lifecycle_client.stop().expect("Stop failed");
494                fasync::OnSignals::new(
495                    &lifecycle_client.into_channel().expect("into_channel failed"),
496                    zx::Signals::CHANNEL_PEER_CLOSED,
497                )
498                .await
499                .expect("OnSignals failed");
500            },
501            async {
502                // Server
503                let service = StorageHostService::new();
504                service
505                    .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
506                    .await
507                    .expect("Run failed");
508            },
509            async {
510                // Block device
511                let server = setup_server(
512                    512,
513                    8,
514                    vec![PartitionInfo {
515                        label: "part".to_string(),
516                        type_guid: Guid::from_bytes([0xabu8; 16]),
517                        instance_guid: Guid::from_bytes([0xcdu8; 16]),
518                        start_block: 4,
519                        num_blocks: 1,
520                        flags: 0,
521                    }],
522                )
523                .await;
524                let _ = server.serve(volume_stream).await;
525            }
526            .fuse(),
527        );
528    }
529
530    #[fuchsia::test]
531    async fn admin_shutdown() {
532        let (outgoing_dir, outgoing_dir_server) =
533            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
534        let (block_client, block_server) =
535            fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
536        let volume_stream =
537            fidl::endpoints::ServerEnd::<fblock::BlockMarker>::from(block_server.into_channel())
538                .into_stream();
539
540        futures::join!(
541            async {
542                // Client
543                let client =
544                    connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
545                        .unwrap();
546                let admin_client =
547                    connect_to_protocol_at_dir_svc::<ffs::AdminMarker>(&outgoing_dir).unwrap();
548                client
549                    .start(block_client, &fstartup::StartOptions::default())
550                    .await
551                    .expect("FIDL error")
552                    .expect("Start failed");
553                admin_client.shutdown().await.expect("admin shutdown failed");
554                fasync::OnSignals::new(
555                    &admin_client.into_channel().expect("into_channel failed"),
556                    zx::Signals::CHANNEL_PEER_CLOSED,
557                )
558                .await
559                .expect("OnSignals failed");
560            },
561            async {
562                // Server
563                let service = StorageHostService::new();
564                service.run(outgoing_dir_server.into_channel(), None).await.expect("Run failed");
565            },
566            async {
567                // Block device
568                let server = setup_server(
569                    512,
570                    8,
571                    vec![PartitionInfo {
572                        label: "part".to_string(),
573                        type_guid: Guid::from_bytes([0xabu8; 16]),
574                        instance_guid: Guid::from_bytes([0xcdu8; 16]),
575                        start_block: 4,
576                        num_blocks: 1,
577                        flags: 0,
578                    }],
579                )
580                .await;
581                let _ = server.serve(volume_stream).await;
582            }
583            .fuse(),
584        );
585    }
586
587    #[fuchsia::test]
588    async fn transaction_lifecycle() {
589        let (outgoing_dir, outgoing_dir_server) =
590            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
591        let (lifecycle_client, lifecycle_server) =
592            fidl::endpoints::create_proxy::<LifecycleMarker>();
593        let (block_client, block_server) =
594            fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
595        let volume_stream =
596            fidl::endpoints::ServerEnd::<fblock::BlockMarker>::from(block_server.into_channel())
597                .into_stream();
598
599        futures::join!(
600            async {
601                // Client
602                connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
603                    .unwrap()
604                    .start(block_client, &fstartup::StartOptions::default())
605                    .await
606                    .expect("FIDL error")
607                    .expect("Start failed");
608
609                let pm_client = connect_to_protocol_at_dir_svc::<
610                    fpartitions::PartitionsManagerMarker,
611                >(&outgoing_dir)
612                .unwrap();
613                let transaction = pm_client
614                    .create_transaction()
615                    .await
616                    .expect("FIDL error")
617                    .expect("create_transaction failed");
618
619                pm_client
620                    .create_transaction()
621                    .await
622                    .expect("FIDL error")
623                    .expect_err("create_transaction should fail while other txn exists");
624
625                pm_client
626                    .commit_transaction(transaction)
627                    .await
628                    .expect("FIDL error")
629                    .expect("commit_transaction failed");
630
631                {
632                    let _transaction = pm_client
633                        .create_transaction()
634                        .await
635                        .expect("FIDL error")
636                        .expect("create_transaction should succeed after committing txn");
637                }
638
639                pm_client
640                    .create_transaction()
641                    .await
642                    .expect("FIDL error")
643                    .expect("create_transaction should succeed after dropping txn");
644
645                lifecycle_client.stop().expect("Stop failed");
646                fasync::OnSignals::new(
647                    &lifecycle_client.into_channel().expect("into_channel failed"),
648                    zx::Signals::CHANNEL_PEER_CLOSED,
649                )
650                .await
651                .expect("OnSignals failed");
652            },
653            async {
654                // Server
655                let service = StorageHostService::new();
656                service
657                    .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
658                    .await
659                    .expect("Run failed");
660            },
661            async {
662                // Block device
663                let server = setup_server(
664                    512,
665                    16,
666                    vec![PartitionInfo {
667                        label: "part".to_string(),
668                        type_guid: Guid::from_bytes([0xabu8; 16]),
669                        instance_guid: Guid::from_bytes([0xcdu8; 16]),
670                        start_block: 4,
671                        num_blocks: 1,
672                        flags: 0,
673                    }],
674                )
675                .await;
676                let _ = server.serve(volume_stream).await;
677            }
678            .fuse(),
679        );
680    }
681}