gpt_component/
service.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::gpt::GptManager;
6use anyhow::{Context as _, Error};
7use block_client::RemoteBlockClient;
8use fidl::endpoints::{DiscoverableProtocolMarker as _, RequestStream as _, ServiceMarker as _};
9use futures::lock::Mutex as AsyncMutex;
10use futures::stream::TryStreamExt as _;
11use std::sync::Arc;
12use vfs::directory::helper::DirectlyMutable as _;
13use vfs::execution_scope::ExecutionScope;
14use {
15    fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
16    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_io as fio,
17    fidl_fuchsia_process_lifecycle as flifecycle, fidl_fuchsia_storage_partitions as fpartitions,
18    fuchsia_async as fasync,
19};
20
21pub struct StorageHostService {
22    state: AsyncMutex<State>,
23
24    // The execution scope of the pseudo filesystem.
25    scope: ExecutionScope,
26
27    // The root of the pseudo filesystem for the component.
28    export_dir: Arc<vfs::directory::immutable::Simple>,
29
30    // A directory where partitions are published.
31    partitions_dir: Arc<vfs::directory::immutable::Simple>,
32}
33
34#[derive(Default)]
35enum State {
36    #[default]
37    Stopped,
38    /// The GPT is malformed and needs to be reformatted with ResetPartitionTables before it can be
39    /// used.  The component will publish an empty partitions directory.
40    NeedsFormatting(fblock::BlockProxy),
41    Running(Arc<GptManager>),
42}
43
44impl State {
45    fn is_stopped(&self) -> bool {
46        if let Self::Stopped = self {
47            true
48        } else {
49            false
50        }
51    }
52}
53
54impl StorageHostService {
55    pub fn new() -> Arc<Self> {
56        let export_dir = vfs::directory::immutable::simple();
57        let partitions_dir = vfs::directory::immutable::simple();
58        Arc::new(Self {
59            state: Default::default(),
60            scope: ExecutionScope::new(),
61            export_dir,
62            partitions_dir,
63        })
64    }
65
66    pub async fn run(
67        self: Arc<Self>,
68        outgoing_dir: zx::Channel,
69        lifecycle_channel: Option<zx::Channel>,
70    ) -> Result<(), Error> {
71        let svc_dir = vfs::directory::immutable::simple();
72        self.export_dir.add_entry("svc", svc_dir.clone()).expect("Unable to create svc dir");
73
74        svc_dir
75            .add_entry(
76                fpartitions::PartitionServiceMarker::SERVICE_NAME,
77                self.partitions_dir.clone(),
78            )
79            .unwrap();
80        let weak = Arc::downgrade(&self);
81        svc_dir
82            .add_entry(
83                fstartup::StartupMarker::PROTOCOL_NAME,
84                vfs::service::host(move |requests| {
85                    let weak = weak.clone();
86                    async move {
87                        if let Some(me) = weak.upgrade() {
88                            let _ = me.handle_start_requests(requests).await;
89                        }
90                    }
91                }),
92            )
93            .unwrap();
94        let weak = Arc::downgrade(&self);
95        svc_dir
96            .add_entry(
97                fpartitions::PartitionsAdminMarker::PROTOCOL_NAME,
98                vfs::service::host(move |requests| {
99                    let weak = weak.clone();
100                    async move {
101                        if let Some(me) = weak.upgrade() {
102                            let _ = me.handle_partitions_admin_requests(requests).await;
103                        }
104                    }
105                }),
106            )
107            .unwrap();
108        let weak = Arc::downgrade(&self);
109        svc_dir
110            .add_entry(
111                fpartitions::PartitionsManagerMarker::PROTOCOL_NAME,
112                vfs::service::host(move |requests| {
113                    let weak = weak.clone();
114                    async move {
115                        if let Some(me) = weak.upgrade() {
116                            let _ = me.handle_partitions_manager_requests(requests).await;
117                        }
118                    }
119                }),
120            )
121            .unwrap();
122        let weak = Arc::downgrade(&self);
123        svc_dir
124            .add_entry(
125                ffs::AdminMarker::PROTOCOL_NAME,
126                vfs::service::host(move |requests| {
127                    let weak = weak.clone();
128                    async move {
129                        if let Some(me) = weak.upgrade() {
130                            let _ = me.handle_admin_requests(requests).await;
131                        }
132                    }
133                }),
134            )
135            .unwrap();
136
137        vfs::directory::serve_on(
138            self.export_dir.clone(),
139            fio::PERM_READABLE | fio::PERM_WRITABLE | fio::PERM_EXECUTABLE,
140            self.scope.clone(),
141            fidl::endpoints::ServerEnd::new(outgoing_dir),
142        );
143
144        if let Some(channel) = lifecycle_channel {
145            let me = self.clone();
146            self.scope.spawn(async move {
147                if let Err(e) = me.handle_lifecycle_requests(channel).await {
148                    log::warn!(error:? = e; "handle_lifecycle_requests");
149                }
150            });
151        }
152
153        self.scope.wait().await;
154
155        Ok(())
156    }
157
158    async fn handle_start_requests(
159        self: Arc<Self>,
160        mut stream: fstartup::StartupRequestStream,
161    ) -> Result<(), Error> {
162        while let Some(request) = stream.try_next().await.context("Reading request")? {
163            log::debug!(request:?; "");
164            match request {
165                fstartup::StartupRequest::Start { device, options: _, responder } => {
166                    responder
167                        .send(
168                            self.start(device.into_proxy())
169                                .await
170                                .map_err(|status| status.into_raw()),
171                        )
172                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
173                }
174                fstartup::StartupRequest::Format { responder, .. } => {
175                    responder
176                        .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
177                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
178                }
179                fstartup::StartupRequest::Check { responder, .. } => {
180                    responder
181                        .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
182                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
183                }
184            }
185        }
186        Ok(())
187    }
188
189    async fn start(self: &Arc<Self>, device: fblock::BlockProxy) -> Result<(), zx::Status> {
190        let mut state = self.state.lock().await;
191        if !state.is_stopped() {
192            log::warn!("Device already bound");
193            return Err(zx::Status::ALREADY_BOUND);
194        }
195
196        // TODO(https://fxbug.dev/339491886): It would be better if `start` failed on a malformed
197        // device, rather than hiding this state from fshost.  However, fs_management isn't well set
198        // up to deal with this, because it ties the outgoing directory to a successful return from
199        // `start` (see `ServingMultiVolumeFilesystem`), and fshost resolves the queued requests for
200        // the filesystem only after `start` is successful.  We should refactor fs_management and
201        // fshost to better support this case, which might require changing how queueing works so
202        // there's more flexibility to either resolve queueing when the component starts up (what we
203        // need here), or when `Start` is successful (what a filesystem like Fxfs needs).
204        *state = match GptManager::new(device.clone(), self.partitions_dir.clone()).await {
205            Ok(runner) => State::Running(runner),
206            Err(err) => {
207                log::error!(err:?; "Failed to load GPT.  Reformatting may be required.");
208                State::NeedsFormatting(device)
209            }
210        };
211        Ok(())
212    }
213
214    async fn handle_partitions_manager_requests(
215        self: Arc<Self>,
216        mut stream: fpartitions::PartitionsManagerRequestStream,
217    ) -> Result<(), Error> {
218        while let Some(request) = stream.try_next().await.context("Reading request")? {
219            log::debug!(request:?; "");
220            match request {
221                fpartitions::PartitionsManagerRequest::GetBlockInfo { responder } => {
222                    responder
223                        .send(self.get_block_info().await.map_err(|status| status.into_raw()))
224                        .unwrap_or_else(
225                            |e| log::error!(e:?; "Failed to send GetBlockInfo response"),
226                        );
227                }
228                fpartitions::PartitionsManagerRequest::CreateTransaction { responder } => {
229                    responder
230                        .send(self.create_transaction().await.map_err(|status| status.into_raw()))
231                        .unwrap_or_else(
232                            |e| log::error!(e:?; "Failed to send CreateTransaction response"),
233                        );
234                }
235                fpartitions::PartitionsManagerRequest::CommitTransaction {
236                    transaction,
237                    responder,
238                } => {
239                    responder
240                        .send(
241                            self.commit_transaction(transaction)
242                                .await
243                                .map_err(|status| status.into_raw()),
244                        )
245                        .unwrap_or_else(
246                            |e| log::error!(e:?; "Failed to send CommitTransaction response"),
247                        );
248                }
249                fpartitions::PartitionsManagerRequest::AddPartition { payload, responder } => {
250                    responder
251                        .send(self.add_partition(payload).await.map_err(|status| status.into_raw()))
252                        .unwrap_or_else(
253                            |e| log::error!(e:?; "Failed to send AddPartition response"),
254                        );
255                }
256            }
257        }
258        Ok(())
259    }
260
261    async fn get_block_info(&self) -> Result<(u64, u32), zx::Status> {
262        let state = self.state.lock().await;
263        match &*state {
264            State::Stopped => return Err(zx::Status::BAD_STATE),
265            State::NeedsFormatting(block) => {
266                let info = block
267                    .get_info()
268                    .await
269                    .map_err(|err| {
270                        log::error!(err:?; "get_block_info: failed to query block info");
271                        zx::Status::IO
272                    })?
273                    .map_err(zx::Status::from_raw)?;
274                Ok((info.block_count, info.block_size))
275            }
276            State::Running(gpt) => Ok((gpt.block_count(), gpt.block_size())),
277        }
278    }
279
280    async fn create_transaction(&self) -> Result<zx::EventPair, zx::Status> {
281        let gpt_manager = self.gpt_manager().await?;
282        gpt_manager.create_transaction().await
283    }
284
285    async fn commit_transaction(&self, transaction: zx::EventPair) -> Result<(), zx::Status> {
286        let gpt_manager = self.gpt_manager().await?;
287        gpt_manager.commit_transaction(transaction).await
288    }
289
290    async fn add_partition(
291        &self,
292        request: fpartitions::PartitionsManagerAddPartitionRequest,
293    ) -> Result<(), zx::Status> {
294        let gpt_manager = self.gpt_manager().await?;
295        gpt_manager.add_partition(request).await
296    }
297
298    async fn handle_partitions_admin_requests(
299        self: Arc<Self>,
300        mut stream: fpartitions::PartitionsAdminRequestStream,
301    ) -> Result<(), Error> {
302        while let Some(request) = stream.try_next().await.context("Reading request")? {
303            log::debug!(request:?; "");
304            match request {
305                fpartitions::PartitionsAdminRequest::ResetPartitionTable {
306                    partitions,
307                    responder,
308                } => {
309                    responder
310                        .send(
311                            self.reset_partition_table(partitions)
312                                .await
313                                .map_err(|status| status.into_raw()),
314                        )
315                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
316                }
317            }
318        }
319        Ok(())
320    }
321
322    async fn reset_partition_table(
323        &self,
324        partitions: Vec<fpartitions::PartitionInfo>,
325    ) -> Result<(), zx::Status> {
326        fn convert_partition_info(info: fpartitions::PartitionInfo) -> gpt::PartitionInfo {
327            gpt::PartitionInfo {
328                label: info.name,
329                type_guid: gpt::Guid::from_bytes(info.type_guid.value),
330                instance_guid: gpt::Guid::from_bytes(info.instance_guid.value),
331                start_block: info.start_block,
332                num_blocks: info.num_blocks,
333                flags: info.flags,
334            }
335        }
336        let partitions = partitions.into_iter().map(convert_partition_info).collect::<Vec<_>>();
337
338        let mut state = self.state.lock().await;
339        match &mut *state {
340            State::Stopped => return Err(zx::Status::BAD_STATE),
341            State::NeedsFormatting(block) => {
342                log::info!("reset_partition_table: Reformatting GPT.");
343                let client = Arc::new(RemoteBlockClient::new(&*block).await?);
344
345                log::info!("reset_partition_table: Reformatting GPT...");
346                gpt::Gpt::format(client, partitions).await.map_err(|err| {
347                    log::error!(err:?; "reset_partition_table: failed to init GPT");
348                    zx::Status::IO
349                })?;
350                *state = State::Running(
351                    GptManager::new(block.clone(), self.partitions_dir.clone()).await.map_err(
352                        |err| {
353                            log::error!(err:?; "reset_partition_table: failed to re-launch GPT");
354                            zx::Status::BAD_STATE
355                        },
356                    )?,
357                );
358            }
359            State::Running(gpt) => {
360                log::info!("reset_partition_table: Updating GPT.");
361                gpt.reset_partition_table(partitions).await?;
362            }
363        }
364        Ok(())
365    }
366
367    async fn handle_admin_requests(
368        &self,
369        mut stream: ffs::AdminRequestStream,
370    ) -> Result<(), Error> {
371        if let Some(request) = stream.try_next().await.context("Reading request")? {
372            match request {
373                ffs::AdminRequest::Shutdown { responder } => {
374                    log::info!("Received Admin::Shutdown request");
375                    self.shutdown().await;
376                    responder
377                        .send()
378                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send shutdown response"));
379                    log::info!("Admin shutdown complete");
380                }
381            }
382        }
383        Ok(())
384    }
385
386    async fn handle_lifecycle_requests(&self, lifecycle_channel: zx::Channel) -> Result<(), Error> {
387        let mut stream = flifecycle::LifecycleRequestStream::from_channel(
388            fasync::Channel::from_channel(lifecycle_channel),
389        );
390        match stream.try_next().await.context("Reading request")? {
391            Some(flifecycle::LifecycleRequest::Stop { .. }) => {
392                log::info!("Received Lifecycle::Stop request");
393                self.shutdown().await;
394                log::info!("Lifecycle shutdown complete");
395            }
396            None => {}
397        }
398        Ok(())
399    }
400
401    async fn gpt_manager(&self) -> Result<Arc<GptManager>, zx::Status> {
402        match &*self.state.lock().await {
403            State::Stopped | State::NeedsFormatting(_) => Err(zx::Status::BAD_STATE),
404            State::Running(gpt) => Ok(gpt.clone()),
405        }
406    }
407
408    async fn shutdown(&self) {
409        let mut state = self.state.lock().await;
410        if let State::Running(gpt) = std::mem::take(&mut *state) {
411            gpt.shutdown().await;
412        }
413        self.scope.shutdown();
414    }
415}
416
417#[cfg(test)]
418mod tests {
419    use super::StorageHostService;
420    use block_client::RemoteBlockClient;
421    use fake_block_server::FakeServer;
422    use fidl::endpoints::Proxy as _;
423    use fidl_fuchsia_process_lifecycle::LifecycleMarker;
424    use fuchsia_component::client::connect_to_protocol_at_dir_svc;
425    use futures::FutureExt as _;
426    use gpt::{Gpt, Guid, PartitionInfo};
427    use std::sync::Arc;
428    use {
429        fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
430        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
431        fidl_fuchsia_io as fio, fidl_fuchsia_storage_partitions as fpartitions,
432        fuchsia_async as fasync,
433    };
434
435    async fn setup_server(
436        block_size: u32,
437        block_count: u64,
438        partitions: Vec<PartitionInfo>,
439    ) -> Arc<FakeServer> {
440        let vmo = zx::Vmo::create(block_size as u64 * block_count).unwrap();
441        let server = Arc::new(FakeServer::from_vmo(512, vmo));
442        {
443            let (block_client, block_server) =
444                fidl::endpoints::create_proxy::<fblock::BlockMarker>();
445            let volume_stream = fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(
446                block_server.into_channel(),
447            )
448            .into_stream();
449            let server_clone = server.clone();
450            let _task = fasync::Task::spawn(async move { server_clone.serve(volume_stream).await });
451            let client = Arc::new(RemoteBlockClient::new(block_client).await.unwrap());
452            Gpt::format(client, partitions).await.unwrap();
453        }
454        server
455    }
456
457    #[fuchsia::test]
458    async fn lifecycle() {
459        let (outgoing_dir, outgoing_dir_server) =
460            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
461        let (lifecycle_client, lifecycle_server) =
462            fidl::endpoints::create_proxy::<LifecycleMarker>();
463        let (block_client, block_server) =
464            fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
465        let volume_stream =
466            fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
467                .into_stream();
468
469        futures::join!(
470            async {
471                // Client
472                let client =
473                    connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
474                        .unwrap();
475                client
476                    .start(
477                        block_client,
478                        fstartup::StartOptions {
479                            read_only: false,
480                            verbose: false,
481                            fsck_after_every_transaction: false,
482                            write_compression_algorithm:
483                                fstartup::CompressionAlgorithm::ZstdChunked,
484                            write_compression_level: 0,
485                            cache_eviction_policy_override: fstartup::EvictionPolicyOverride::None,
486                            startup_profiling_seconds: 0,
487                        },
488                    )
489                    .await
490                    .expect("FIDL error")
491                    .expect("Start failed");
492                lifecycle_client.stop().expect("Stop failed");
493                fasync::OnSignals::new(
494                    &lifecycle_client.into_channel().expect("into_channel failed"),
495                    zx::Signals::CHANNEL_PEER_CLOSED,
496                )
497                .await
498                .expect("OnSignals failed");
499            },
500            async {
501                // Server
502                let service = StorageHostService::new();
503                service
504                    .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
505                    .await
506                    .expect("Run failed");
507            },
508            async {
509                // Block device
510                let server = setup_server(
511                    512,
512                    8,
513                    vec![PartitionInfo {
514                        label: "part".to_string(),
515                        type_guid: Guid::from_bytes([0xabu8; 16]),
516                        instance_guid: Guid::from_bytes([0xcdu8; 16]),
517                        start_block: 4,
518                        num_blocks: 1,
519                        flags: 0,
520                    }],
521                )
522                .await;
523                let _ = server.serve(volume_stream).await;
524            }
525            .fuse(),
526        );
527    }
528
529    #[fuchsia::test]
530    async fn admin_shutdown() {
531        let (outgoing_dir, outgoing_dir_server) =
532            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
533        let (block_client, block_server) =
534            fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
535        let volume_stream =
536            fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
537                .into_stream();
538
539        futures::join!(
540            async {
541                // Client
542                let client =
543                    connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
544                        .unwrap();
545                let admin_client =
546                    connect_to_protocol_at_dir_svc::<ffs::AdminMarker>(&outgoing_dir).unwrap();
547                client
548                    .start(
549                        block_client,
550                        fstartup::StartOptions {
551                            read_only: false,
552                            verbose: false,
553                            fsck_after_every_transaction: false,
554                            write_compression_algorithm:
555                                fstartup::CompressionAlgorithm::ZstdChunked,
556                            write_compression_level: 0,
557                            cache_eviction_policy_override: fstartup::EvictionPolicyOverride::None,
558                            startup_profiling_seconds: 0,
559                        },
560                    )
561                    .await
562                    .expect("FIDL error")
563                    .expect("Start failed");
564                admin_client.shutdown().await.expect("admin shutdown failed");
565                fasync::OnSignals::new(
566                    &admin_client.into_channel().expect("into_channel failed"),
567                    zx::Signals::CHANNEL_PEER_CLOSED,
568                )
569                .await
570                .expect("OnSignals failed");
571            },
572            async {
573                // Server
574                let service = StorageHostService::new();
575                service.run(outgoing_dir_server.into_channel(), None).await.expect("Run failed");
576            },
577            async {
578                // Block device
579                let server = setup_server(
580                    512,
581                    8,
582                    vec![PartitionInfo {
583                        label: "part".to_string(),
584                        type_guid: Guid::from_bytes([0xabu8; 16]),
585                        instance_guid: Guid::from_bytes([0xcdu8; 16]),
586                        start_block: 4,
587                        num_blocks: 1,
588                        flags: 0,
589                    }],
590                )
591                .await;
592                let _ = server.serve(volume_stream).await;
593            }
594            .fuse(),
595        );
596    }
597
598    #[fuchsia::test]
599    async fn transaction_lifecycle() {
600        let (outgoing_dir, outgoing_dir_server) =
601            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
602        let (lifecycle_client, lifecycle_server) =
603            fidl::endpoints::create_proxy::<LifecycleMarker>();
604        let (block_client, block_server) =
605            fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
606        let volume_stream =
607            fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
608                .into_stream();
609
610        futures::join!(
611            async {
612                // Client
613                connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
614                    .unwrap()
615                    .start(
616                        block_client,
617                        fstartup::StartOptions {
618                            read_only: false,
619                            verbose: false,
620                            fsck_after_every_transaction: false,
621                            write_compression_algorithm:
622                                fstartup::CompressionAlgorithm::ZstdChunked,
623                            write_compression_level: 0,
624                            cache_eviction_policy_override: fstartup::EvictionPolicyOverride::None,
625                            startup_profiling_seconds: 0,
626                        },
627                    )
628                    .await
629                    .expect("FIDL error")
630                    .expect("Start failed");
631
632                let pm_client = connect_to_protocol_at_dir_svc::<
633                    fpartitions::PartitionsManagerMarker,
634                >(&outgoing_dir)
635                .unwrap();
636                let transaction = pm_client
637                    .create_transaction()
638                    .await
639                    .expect("FIDL error")
640                    .expect("create_transaction failed");
641
642                pm_client
643                    .create_transaction()
644                    .await
645                    .expect("FIDL error")
646                    .expect_err("create_transaction should fail while other txn exists");
647
648                pm_client
649                    .commit_transaction(transaction)
650                    .await
651                    .expect("FIDL error")
652                    .expect("commit_transaction failed");
653
654                {
655                    let _transaction = pm_client
656                        .create_transaction()
657                        .await
658                        .expect("FIDL error")
659                        .expect("create_transaction should succeed after committing txn");
660                }
661
662                pm_client
663                    .create_transaction()
664                    .await
665                    .expect("FIDL error")
666                    .expect("create_transaction should succeed after dropping txn");
667
668                lifecycle_client.stop().expect("Stop failed");
669                fasync::OnSignals::new(
670                    &lifecycle_client.into_channel().expect("into_channel failed"),
671                    zx::Signals::CHANNEL_PEER_CLOSED,
672                )
673                .await
674                .expect("OnSignals failed");
675            },
676            async {
677                // Server
678                let service = StorageHostService::new();
679                service
680                    .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
681                    .await
682                    .expect("Run failed");
683            },
684            async {
685                // Block device
686                let server = setup_server(
687                    512,
688                    16,
689                    vec![PartitionInfo {
690                        label: "part".to_string(),
691                        type_guid: Guid::from_bytes([0xabu8; 16]),
692                        instance_guid: Guid::from_bytes([0xcdu8; 16]),
693                        start_block: 4,
694                        num_blocks: 1,
695                        flags: 0,
696                    }],
697                )
698                .await;
699                let _ = server.serve(volume_stream).await;
700            }
701            .fuse(),
702        );
703    }
704}