gpt_component/
service.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::config::Config;
6use crate::gpt::GptManager;
7use anyhow::{Context as _, Error};
8use block_client::RemoteBlockClient;
9use fidl::endpoints::{DiscoverableProtocolMarker as _, RequestStream as _, ServiceMarker as _};
10use futures::lock::Mutex as AsyncMutex;
11use futures::stream::TryStreamExt as _;
12use std::sync::Arc;
13use vfs::directory::helper::DirectlyMutable as _;
14use vfs::execution_scope::ExecutionScope;
15use {
16    fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
17    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_io as fio,
18    fidl_fuchsia_process_lifecycle as flifecycle, fidl_fuchsia_storage_partitions as fpartitions,
19    fuchsia_async as fasync,
20};
21
22pub struct StorageHostService {
23    state: AsyncMutex<State>,
24
25    // The execution scope of the pseudo filesystem.
26    scope: ExecutionScope,
27
28    // The root of the pseudo filesystem for the component.
29    export_dir: Arc<vfs::directory::immutable::Simple>,
30
31    // A directory where partitions are published.
32    partitions_dir: Arc<vfs::directory::immutable::Simple>,
33}
34
35#[derive(Default)]
36enum State {
37    #[default]
38    Stopped,
39    /// The GPT is malformed and needs to be reformatted with ResetPartitionTables before it can be
40    /// used.  The component will publish an empty partitions directory.
41    NeedsFormatting(Config, fblock::BlockProxy),
42    Running(Arc<GptManager>),
43}
44
45impl State {
46    fn is_stopped(&self) -> bool {
47        if let Self::Stopped = self { true } else { false }
48    }
49}
50
51impl StorageHostService {
52    pub fn new() -> Arc<Self> {
53        let export_dir = vfs::directory::immutable::simple();
54        let partitions_dir = vfs::directory::immutable::simple();
55        Arc::new(Self {
56            state: Default::default(),
57            scope: ExecutionScope::new(),
58            export_dir,
59            partitions_dir,
60        })
61    }
62
63    pub async fn run(
64        self: Arc<Self>,
65        outgoing_dir: zx::Channel,
66        lifecycle_channel: Option<zx::Channel>,
67    ) -> Result<(), Error> {
68        let svc_dir = vfs::directory::immutable::simple();
69        self.export_dir.add_entry("svc", svc_dir.clone()).expect("Unable to create svc dir");
70
71        svc_dir
72            .add_entry(
73                fpartitions::PartitionServiceMarker::SERVICE_NAME,
74                self.partitions_dir.clone(),
75            )
76            .unwrap();
77        let weak = Arc::downgrade(&self);
78        svc_dir
79            .add_entry(
80                fstartup::StartupMarker::PROTOCOL_NAME,
81                vfs::service::host(move |requests| {
82                    let weak = weak.clone();
83                    async move {
84                        if let Some(me) = weak.upgrade() {
85                            let _ = me.handle_start_requests(requests).await;
86                        }
87                    }
88                }),
89            )
90            .unwrap();
91        let weak = Arc::downgrade(&self);
92        svc_dir
93            .add_entry(
94                fpartitions::PartitionsAdminMarker::PROTOCOL_NAME,
95                vfs::service::host(move |requests| {
96                    let weak = weak.clone();
97                    async move {
98                        if let Some(me) = weak.upgrade() {
99                            let _ = me.handle_partitions_admin_requests(requests).await;
100                        }
101                    }
102                }),
103            )
104            .unwrap();
105        let weak = Arc::downgrade(&self);
106        svc_dir
107            .add_entry(
108                fpartitions::PartitionsManagerMarker::PROTOCOL_NAME,
109                vfs::service::host(move |requests| {
110                    let weak = weak.clone();
111                    async move {
112                        if let Some(me) = weak.upgrade() {
113                            let _ = me.handle_partitions_manager_requests(requests).await;
114                        }
115                    }
116                }),
117            )
118            .unwrap();
119        let weak = Arc::downgrade(&self);
120        svc_dir
121            .add_entry(
122                ffs::AdminMarker::PROTOCOL_NAME,
123                vfs::service::host(move |requests| {
124                    let weak = weak.clone();
125                    async move {
126                        if let Some(me) = weak.upgrade() {
127                            let _ = me.handle_admin_requests(requests).await;
128                        }
129                    }
130                }),
131            )
132            .unwrap();
133
134        vfs::directory::serve_on(
135            self.export_dir.clone(),
136            fio::PERM_READABLE | fio::PERM_WRITABLE | fio::PERM_EXECUTABLE,
137            self.scope.clone(),
138            fidl::endpoints::ServerEnd::new(outgoing_dir),
139        );
140
141        if let Some(channel) = lifecycle_channel {
142            let me = self.clone();
143            self.scope.spawn(async move {
144                if let Err(e) = me.handle_lifecycle_requests(channel).await {
145                    log::warn!(error:? = e; "handle_lifecycle_requests");
146                }
147            });
148        }
149
150        self.scope.wait().await;
151
152        Ok(())
153    }
154
155    async fn handle_start_requests(
156        self: Arc<Self>,
157        mut stream: fstartup::StartupRequestStream,
158    ) -> Result<(), Error> {
159        while let Some(request) = stream.try_next().await.context("Reading request")? {
160            log::debug!(request:?; "");
161            match request {
162                fstartup::StartupRequest::Start { device, options, responder } => {
163                    responder
164                        .send(
165                            self.start(device.into_proxy(), options.into())
166                                .await
167                                .map_err(|status| status.into_raw()),
168                        )
169                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
170                }
171                fstartup::StartupRequest::Format { responder, .. } => {
172                    responder
173                        .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
174                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
175                }
176                fstartup::StartupRequest::Check { responder, .. } => {
177                    responder
178                        .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
179                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
180                }
181            }
182        }
183        Ok(())
184    }
185
186    async fn start(
187        self: &Arc<Self>,
188        device: fblock::BlockProxy,
189        config: Config,
190    ) -> Result<(), zx::Status> {
191        log::info!(config:?; "GPT starting");
192        let mut state = self.state.lock().await;
193        if !state.is_stopped() {
194            log::warn!("Device already bound");
195            return Err(zx::Status::ALREADY_BOUND);
196        }
197
198        // TODO(https://fxbug.dev/339491886): It would be better if `start` failed on a malformed
199        // device, rather than hiding this state from fshost.  However, fs_management isn't well set
200        // up to deal with this, because it ties the outgoing directory to a successful return from
201        // `start` (see `ServingMultiVolumeFilesystem`), and fshost resolves the queued requests for
202        // the filesystem only after `start` is successful.  We should refactor fs_management and
203        // fshost to better support this case, which might require changing how queueing works so
204        // there's more flexibility to either resolve queueing when the component starts up (what we
205        // need here), or when `Start` is successful (what a filesystem like Fxfs needs).
206        *state = match GptManager::new_with_config(
207            device.clone(),
208            self.partitions_dir.clone(),
209            config.clone(),
210        )
211        .await
212        {
213            Ok(runner) => State::Running(runner),
214            Err(err) => {
215                // This is a warning because, as described above, we can't return an error from
216                // here, so there are normal flows that can print this error message that don't
217                // indicate a bug or incorrect behavior.
218                log::warn!(err:?; "Failed to load GPT.  Reformatting may be required.");
219                State::NeedsFormatting(config, device)
220            }
221        };
222        Ok(())
223    }
224
225    async fn handle_partitions_manager_requests(
226        self: Arc<Self>,
227        mut stream: fpartitions::PartitionsManagerRequestStream,
228    ) -> Result<(), Error> {
229        while let Some(request) = stream.try_next().await.context("Reading request")? {
230            log::debug!(request:?; "");
231            match request {
232                fpartitions::PartitionsManagerRequest::GetBlockInfo { responder } => {
233                    responder
234                        .send(self.get_block_info().await.map_err(|status| status.into_raw()))
235                        .unwrap_or_else(
236                            |e| log::error!(e:?; "Failed to send GetBlockInfo response"),
237                        );
238                }
239                fpartitions::PartitionsManagerRequest::CreateTransaction { responder } => {
240                    responder
241                        .send(self.create_transaction().await.map_err(|status| status.into_raw()))
242                        .unwrap_or_else(
243                            |e| log::error!(e:?; "Failed to send CreateTransaction response"),
244                        );
245                }
246                fpartitions::PartitionsManagerRequest::CommitTransaction {
247                    transaction,
248                    responder,
249                } => {
250                    responder
251                        .send(
252                            self.commit_transaction(transaction)
253                                .await
254                                .map_err(|status| status.into_raw()),
255                        )
256                        .unwrap_or_else(
257                            |e| log::error!(e:?; "Failed to send CommitTransaction response"),
258                        );
259                }
260                fpartitions::PartitionsManagerRequest::AddPartition { payload, responder } => {
261                    responder
262                        .send(self.add_partition(payload).await.map_err(|status| status.into_raw()))
263                        .unwrap_or_else(
264                            |e| log::error!(e:?; "Failed to send AddPartition response"),
265                        );
266                }
267            }
268        }
269        Ok(())
270    }
271
272    async fn get_block_info(&self) -> Result<(u64, u32), zx::Status> {
273        let state = self.state.lock().await;
274        match &*state {
275            State::Stopped => return Err(zx::Status::BAD_STATE),
276            State::NeedsFormatting(_, block) => {
277                let info = block
278                    .get_info()
279                    .await
280                    .map_err(|err| {
281                        log::error!(err:?; "get_block_info: failed to query block info");
282                        zx::Status::IO
283                    })?
284                    .map_err(zx::Status::from_raw)?;
285                Ok((info.block_count, info.block_size))
286            }
287            State::Running(gpt) => Ok((gpt.block_count(), gpt.block_size())),
288        }
289    }
290
291    async fn create_transaction(&self) -> Result<zx::EventPair, zx::Status> {
292        let gpt_manager = self.gpt_manager().await?;
293        gpt_manager.create_transaction().await
294    }
295
296    async fn commit_transaction(&self, transaction: zx::EventPair) -> Result<(), zx::Status> {
297        let gpt_manager = self.gpt_manager().await?;
298        gpt_manager.commit_transaction(transaction).await
299    }
300
301    async fn add_partition(
302        &self,
303        request: fpartitions::PartitionsManagerAddPartitionRequest,
304    ) -> Result<(), zx::Status> {
305        let gpt_manager = self.gpt_manager().await?;
306        gpt_manager.add_partition(request).await
307    }
308
309    async fn handle_partitions_admin_requests(
310        self: Arc<Self>,
311        mut stream: fpartitions::PartitionsAdminRequestStream,
312    ) -> Result<(), Error> {
313        while let Some(request) = stream.try_next().await.context("Reading request")? {
314            log::debug!(request:?; "");
315            match request {
316                fpartitions::PartitionsAdminRequest::ResetPartitionTable {
317                    partitions,
318                    responder,
319                } => {
320                    responder
321                        .send(
322                            self.reset_partition_table(partitions)
323                                .await
324                                .map_err(|status| status.into_raw()),
325                        )
326                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
327                }
328            }
329        }
330        Ok(())
331    }
332
333    async fn reset_partition_table(
334        &self,
335        partitions: Vec<fpartitions::PartitionInfo>,
336    ) -> Result<(), zx::Status> {
337        fn convert_partition_info(info: fpartitions::PartitionInfo) -> gpt::PartitionInfo {
338            gpt::PartitionInfo {
339                label: info.name,
340                type_guid: gpt::Guid::from_bytes(info.type_guid.value),
341                instance_guid: gpt::Guid::from_bytes(info.instance_guid.value),
342                start_block: info.start_block,
343                num_blocks: info.num_blocks,
344                flags: info.flags,
345            }
346        }
347        let partitions = partitions.into_iter().map(convert_partition_info).collect::<Vec<_>>();
348
349        let mut state = self.state.lock().await;
350        match &mut *state {
351            State::Stopped => return Err(zx::Status::BAD_STATE),
352            State::NeedsFormatting(config, block) => {
353                log::info!("reset_partition_table: Reformatting GPT.");
354                let client = Arc::new(RemoteBlockClient::new(&*block).await?);
355
356                log::info!("reset_partition_table: Reformatting GPT...");
357                gpt::Gpt::format(client, partitions).await.map_err(|err| {
358                    log::error!(err:?; "reset_partition_table: failed to init GPT");
359                    zx::Status::IO
360                })?;
361                *state = State::Running(
362                    GptManager::new_with_config(
363                        block.clone(),
364                        self.partitions_dir.clone(),
365                        config.clone(),
366                    )
367                    .await
368                    .map_err(|err| {
369                        log::error!(err:?; "reset_partition_table: failed to re-launch GPT");
370                        zx::Status::BAD_STATE
371                    })?,
372                );
373            }
374            State::Running(gpt) => {
375                log::info!("reset_partition_table: Updating GPT.");
376                gpt.reset_partition_table(partitions).await?;
377            }
378        }
379        Ok(())
380    }
381
382    async fn handle_admin_requests(
383        &self,
384        mut stream: ffs::AdminRequestStream,
385    ) -> Result<(), Error> {
386        if let Some(request) = stream.try_next().await.context("Reading request")? {
387            match request {
388                ffs::AdminRequest::Shutdown { responder } => {
389                    log::info!("Received Admin::Shutdown request");
390                    self.shutdown().await;
391                    responder
392                        .send()
393                        .unwrap_or_else(|e| log::error!(e:?; "Failed to send shutdown response"));
394                    log::info!("Admin shutdown complete");
395                }
396            }
397        }
398        Ok(())
399    }
400
401    async fn handle_lifecycle_requests(&self, lifecycle_channel: zx::Channel) -> Result<(), Error> {
402        let mut stream = flifecycle::LifecycleRequestStream::from_channel(
403            fasync::Channel::from_channel(lifecycle_channel),
404        );
405        match stream.try_next().await.context("Reading request")? {
406            Some(flifecycle::LifecycleRequest::Stop { .. }) => {
407                log::info!("Received Lifecycle::Stop request");
408                self.shutdown().await;
409                log::info!("Lifecycle shutdown complete");
410            }
411            None => {}
412        }
413        Ok(())
414    }
415
416    async fn gpt_manager(&self) -> Result<Arc<GptManager>, zx::Status> {
417        match &*self.state.lock().await {
418            State::Stopped | State::NeedsFormatting(_, _) => Err(zx::Status::BAD_STATE),
419            State::Running(gpt) => Ok(gpt.clone()),
420        }
421    }
422
423    async fn shutdown(&self) {
424        let mut state = self.state.lock().await;
425        if let State::Running(gpt) = std::mem::take(&mut *state) {
426            gpt.shutdown().await;
427        }
428        self.scope.shutdown();
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use super::StorageHostService;
435    use block_client::RemoteBlockClient;
436    use fidl::endpoints::Proxy as _;
437    use fidl_fuchsia_process_lifecycle::LifecycleMarker;
438    use fuchsia_component::client::connect_to_protocol_at_dir_svc;
439    use futures::FutureExt as _;
440    use gpt::{Gpt, Guid, PartitionInfo};
441    use std::sync::Arc;
442    use vmo_backed_block_server::{VmoBackedServer, VmoBackedServerTestingExt as _};
443    use {
444        fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
445        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
446        fidl_fuchsia_io as fio, fidl_fuchsia_storage_partitions as fpartitions,
447        fuchsia_async as fasync,
448    };
449
450    async fn setup_server(
451        block_size: u32,
452        block_count: u64,
453        partitions: Vec<PartitionInfo>,
454    ) -> Arc<VmoBackedServer> {
455        let vmo = zx::Vmo::create(block_size as u64 * block_count).unwrap();
456        let server = Arc::new(VmoBackedServer::from_vmo(512, vmo));
457        {
458            let (block_client, block_server) =
459                fidl::endpoints::create_proxy::<fblock::BlockMarker>();
460            let volume_stream = fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(
461                block_server.into_channel(),
462            )
463            .into_stream();
464            let server_clone = server.clone();
465            let _task = fasync::Task::spawn(async move { server_clone.serve(volume_stream).await });
466            let client = Arc::new(RemoteBlockClient::new(block_client).await.unwrap());
467            Gpt::format(client, partitions).await.unwrap();
468        }
469        server
470    }
471
472    #[fuchsia::test]
473    async fn lifecycle() {
474        let (outgoing_dir, outgoing_dir_server) =
475            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
476        let (lifecycle_client, lifecycle_server) =
477            fidl::endpoints::create_proxy::<LifecycleMarker>();
478        let (block_client, block_server) =
479            fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
480        let volume_stream =
481            fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
482                .into_stream();
483
484        futures::join!(
485            async {
486                // Client
487                let client =
488                    connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
489                        .unwrap();
490                client
491                    .start(block_client, &fstartup::StartOptions::default())
492                    .await
493                    .expect("FIDL error")
494                    .expect("Start failed");
495                lifecycle_client.stop().expect("Stop failed");
496                fasync::OnSignals::new(
497                    &lifecycle_client.into_channel().expect("into_channel failed"),
498                    zx::Signals::CHANNEL_PEER_CLOSED,
499                )
500                .await
501                .expect("OnSignals failed");
502            },
503            async {
504                // Server
505                let service = StorageHostService::new();
506                service
507                    .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
508                    .await
509                    .expect("Run failed");
510            },
511            async {
512                // Block device
513                let server = setup_server(
514                    512,
515                    8,
516                    vec![PartitionInfo {
517                        label: "part".to_string(),
518                        type_guid: Guid::from_bytes([0xabu8; 16]),
519                        instance_guid: Guid::from_bytes([0xcdu8; 16]),
520                        start_block: 4,
521                        num_blocks: 1,
522                        flags: 0,
523                    }],
524                )
525                .await;
526                let _ = server.serve(volume_stream).await;
527            }
528            .fuse(),
529        );
530    }
531
532    #[fuchsia::test]
533    async fn admin_shutdown() {
534        let (outgoing_dir, outgoing_dir_server) =
535            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
536        let (block_client, block_server) =
537            fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
538        let volume_stream =
539            fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
540                .into_stream();
541
542        futures::join!(
543            async {
544                // Client
545                let client =
546                    connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
547                        .unwrap();
548                let admin_client =
549                    connect_to_protocol_at_dir_svc::<ffs::AdminMarker>(&outgoing_dir).unwrap();
550                client
551                    .start(block_client, &fstartup::StartOptions::default())
552                    .await
553                    .expect("FIDL error")
554                    .expect("Start failed");
555                admin_client.shutdown().await.expect("admin shutdown failed");
556                fasync::OnSignals::new(
557                    &admin_client.into_channel().expect("into_channel failed"),
558                    zx::Signals::CHANNEL_PEER_CLOSED,
559                )
560                .await
561                .expect("OnSignals failed");
562            },
563            async {
564                // Server
565                let service = StorageHostService::new();
566                service.run(outgoing_dir_server.into_channel(), None).await.expect("Run failed");
567            },
568            async {
569                // Block device
570                let server = setup_server(
571                    512,
572                    8,
573                    vec![PartitionInfo {
574                        label: "part".to_string(),
575                        type_guid: Guid::from_bytes([0xabu8; 16]),
576                        instance_guid: Guid::from_bytes([0xcdu8; 16]),
577                        start_block: 4,
578                        num_blocks: 1,
579                        flags: 0,
580                    }],
581                )
582                .await;
583                let _ = server.serve(volume_stream).await;
584            }
585            .fuse(),
586        );
587    }
588
589    #[fuchsia::test]
590    async fn transaction_lifecycle() {
591        let (outgoing_dir, outgoing_dir_server) =
592            fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
593        let (lifecycle_client, lifecycle_server) =
594            fidl::endpoints::create_proxy::<LifecycleMarker>();
595        let (block_client, block_server) =
596            fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
597        let volume_stream =
598            fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
599                .into_stream();
600
601        futures::join!(
602            async {
603                // Client
604                connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
605                    .unwrap()
606                    .start(block_client, &fstartup::StartOptions::default())
607                    .await
608                    .expect("FIDL error")
609                    .expect("Start failed");
610
611                let pm_client = connect_to_protocol_at_dir_svc::<
612                    fpartitions::PartitionsManagerMarker,
613                >(&outgoing_dir)
614                .unwrap();
615                let transaction = pm_client
616                    .create_transaction()
617                    .await
618                    .expect("FIDL error")
619                    .expect("create_transaction failed");
620
621                pm_client
622                    .create_transaction()
623                    .await
624                    .expect("FIDL error")
625                    .expect_err("create_transaction should fail while other txn exists");
626
627                pm_client
628                    .commit_transaction(transaction)
629                    .await
630                    .expect("FIDL error")
631                    .expect("commit_transaction failed");
632
633                {
634                    let _transaction = pm_client
635                        .create_transaction()
636                        .await
637                        .expect("FIDL error")
638                        .expect("create_transaction should succeed after committing txn");
639                }
640
641                pm_client
642                    .create_transaction()
643                    .await
644                    .expect("FIDL error")
645                    .expect("create_transaction should succeed after dropping txn");
646
647                lifecycle_client.stop().expect("Stop failed");
648                fasync::OnSignals::new(
649                    &lifecycle_client.into_channel().expect("into_channel failed"),
650                    zx::Signals::CHANNEL_PEER_CLOSED,
651                )
652                .await
653                .expect("OnSignals failed");
654            },
655            async {
656                // Server
657                let service = StorageHostService::new();
658                service
659                    .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
660                    .await
661                    .expect("Run failed");
662            },
663            async {
664                // Block device
665                let server = setup_server(
666                    512,
667                    16,
668                    vec![PartitionInfo {
669                        label: "part".to_string(),
670                        type_guid: Guid::from_bytes([0xabu8; 16]),
671                        instance_guid: Guid::from_bytes([0xcdu8; 16]),
672                        start_block: 4,
673                        num_blocks: 1,
674                        flags: 0,
675                    }],
676                )
677                .await;
678                let _ = server.serve(volume_stream).await;
679            }
680            .fuse(),
681        );
682    }
683}