1use crate::gpt::GptManager;
6use anyhow::{Context as _, Error};
7use block_client::RemoteBlockClient;
8use fidl::endpoints::{DiscoverableProtocolMarker as _, RequestStream as _, ServiceMarker as _};
9use futures::lock::Mutex as AsyncMutex;
10use futures::stream::TryStreamExt as _;
11use std::sync::Arc;
12use vfs::directory::helper::DirectlyMutable as _;
13use vfs::execution_scope::ExecutionScope;
14use {
15 fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
16 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_io as fio,
17 fidl_fuchsia_process_lifecycle as flifecycle, fidl_fuchsia_storage_partitions as fpartitions,
18 fuchsia_async as fasync,
19};
20
21pub struct StorageHostService {
22 state: AsyncMutex<State>,
23
24 scope: ExecutionScope,
26
27 export_dir: Arc<vfs::directory::immutable::Simple>,
29
30 partitions_dir: Arc<vfs::directory::immutable::Simple>,
32}
33
34#[derive(Default)]
35enum State {
36 #[default]
37 Stopped,
38 NeedsFormatting(fblock::BlockProxy),
41 Running(Arc<GptManager>),
42}
43
44impl State {
45 fn is_stopped(&self) -> bool {
46 if let Self::Stopped = self {
47 true
48 } else {
49 false
50 }
51 }
52}
53
54impl StorageHostService {
55 pub fn new() -> Arc<Self> {
56 let export_dir = vfs::directory::immutable::simple();
57 let partitions_dir = vfs::directory::immutable::simple();
58 Arc::new(Self {
59 state: Default::default(),
60 scope: ExecutionScope::new(),
61 export_dir,
62 partitions_dir,
63 })
64 }
65
66 pub async fn run(
67 self: Arc<Self>,
68 outgoing_dir: zx::Channel,
69 lifecycle_channel: Option<zx::Channel>,
70 ) -> Result<(), Error> {
71 let svc_dir = vfs::directory::immutable::simple();
72 self.export_dir.add_entry("svc", svc_dir.clone()).expect("Unable to create svc dir");
73
74 svc_dir
75 .add_entry(
76 fpartitions::PartitionServiceMarker::SERVICE_NAME,
77 self.partitions_dir.clone(),
78 )
79 .unwrap();
80 let weak = Arc::downgrade(&self);
81 svc_dir
82 .add_entry(
83 fstartup::StartupMarker::PROTOCOL_NAME,
84 vfs::service::host(move |requests| {
85 let weak = weak.clone();
86 async move {
87 if let Some(me) = weak.upgrade() {
88 let _ = me.handle_start_requests(requests).await;
89 }
90 }
91 }),
92 )
93 .unwrap();
94 let weak = Arc::downgrade(&self);
95 svc_dir
96 .add_entry(
97 fpartitions::PartitionsAdminMarker::PROTOCOL_NAME,
98 vfs::service::host(move |requests| {
99 let weak = weak.clone();
100 async move {
101 if let Some(me) = weak.upgrade() {
102 let _ = me.handle_partitions_admin_requests(requests).await;
103 }
104 }
105 }),
106 )
107 .unwrap();
108 let weak = Arc::downgrade(&self);
109 svc_dir
110 .add_entry(
111 fpartitions::PartitionsManagerMarker::PROTOCOL_NAME,
112 vfs::service::host(move |requests| {
113 let weak = weak.clone();
114 async move {
115 if let Some(me) = weak.upgrade() {
116 let _ = me.handle_partitions_manager_requests(requests).await;
117 }
118 }
119 }),
120 )
121 .unwrap();
122 let weak = Arc::downgrade(&self);
123 svc_dir
124 .add_entry(
125 ffs::AdminMarker::PROTOCOL_NAME,
126 vfs::service::host(move |requests| {
127 let weak = weak.clone();
128 async move {
129 if let Some(me) = weak.upgrade() {
130 let _ = me.handle_admin_requests(requests).await;
131 }
132 }
133 }),
134 )
135 .unwrap();
136
137 vfs::directory::serve_on(
138 self.export_dir.clone(),
139 fio::PERM_READABLE | fio::PERM_WRITABLE | fio::PERM_EXECUTABLE,
140 self.scope.clone(),
141 fidl::endpoints::ServerEnd::new(outgoing_dir),
142 );
143
144 if let Some(channel) = lifecycle_channel {
145 let me = self.clone();
146 self.scope.spawn(async move {
147 if let Err(e) = me.handle_lifecycle_requests(channel).await {
148 log::warn!(error:? = e; "handle_lifecycle_requests");
149 }
150 });
151 }
152
153 self.scope.wait().await;
154
155 Ok(())
156 }
157
158 async fn handle_start_requests(
159 self: Arc<Self>,
160 mut stream: fstartup::StartupRequestStream,
161 ) -> Result<(), Error> {
162 while let Some(request) = stream.try_next().await.context("Reading request")? {
163 log::debug!(request:?; "");
164 match request {
165 fstartup::StartupRequest::Start { device, options: _, responder } => {
166 responder
167 .send(
168 self.start(device.into_proxy())
169 .await
170 .map_err(|status| status.into_raw()),
171 )
172 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
173 }
174 fstartup::StartupRequest::Format { responder, .. } => {
175 responder
176 .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
177 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
178 }
179 fstartup::StartupRequest::Check { responder, .. } => {
180 responder
181 .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
182 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
183 }
184 }
185 }
186 Ok(())
187 }
188
189 async fn start(self: &Arc<Self>, device: fblock::BlockProxy) -> Result<(), zx::Status> {
190 let mut state = self.state.lock().await;
191 if !state.is_stopped() {
192 log::warn!("Device already bound");
193 return Err(zx::Status::ALREADY_BOUND);
194 }
195
196 *state = match GptManager::new(device.clone(), self.partitions_dir.clone()).await {
205 Ok(runner) => State::Running(runner),
206 Err(err) => {
207 log::error!(err:?; "Failed to load GPT. Reformatting may be required.");
208 State::NeedsFormatting(device)
209 }
210 };
211 Ok(())
212 }
213
214 async fn handle_partitions_manager_requests(
215 self: Arc<Self>,
216 mut stream: fpartitions::PartitionsManagerRequestStream,
217 ) -> Result<(), Error> {
218 while let Some(request) = stream.try_next().await.context("Reading request")? {
219 log::debug!(request:?; "");
220 match request {
221 fpartitions::PartitionsManagerRequest::GetBlockInfo { responder } => {
222 responder
223 .send(self.get_block_info().await.map_err(|status| status.into_raw()))
224 .unwrap_or_else(
225 |e| log::error!(e:?; "Failed to send GetBlockInfo response"),
226 );
227 }
228 fpartitions::PartitionsManagerRequest::CreateTransaction { responder } => {
229 responder
230 .send(self.create_transaction().await.map_err(|status| status.into_raw()))
231 .unwrap_or_else(
232 |e| log::error!(e:?; "Failed to send CreateTransaction response"),
233 );
234 }
235 fpartitions::PartitionsManagerRequest::CommitTransaction {
236 transaction,
237 responder,
238 } => {
239 responder
240 .send(
241 self.commit_transaction(transaction)
242 .await
243 .map_err(|status| status.into_raw()),
244 )
245 .unwrap_or_else(
246 |e| log::error!(e:?; "Failed to send CommitTransaction response"),
247 );
248 }
249 fpartitions::PartitionsManagerRequest::AddPartition { payload, responder } => {
250 responder
251 .send(self.add_partition(payload).await.map_err(|status| status.into_raw()))
252 .unwrap_or_else(
253 |e| log::error!(e:?; "Failed to send AddPartition response"),
254 );
255 }
256 }
257 }
258 Ok(())
259 }
260
261 async fn get_block_info(&self) -> Result<(u64, u32), zx::Status> {
262 let state = self.state.lock().await;
263 match &*state {
264 State::Stopped => return Err(zx::Status::BAD_STATE),
265 State::NeedsFormatting(block) => {
266 let info = block
267 .get_info()
268 .await
269 .map_err(|err| {
270 log::error!(err:?; "get_block_info: failed to query block info");
271 zx::Status::IO
272 })?
273 .map_err(zx::Status::from_raw)?;
274 Ok((info.block_count, info.block_size))
275 }
276 State::Running(gpt) => Ok((gpt.block_count(), gpt.block_size())),
277 }
278 }
279
280 async fn create_transaction(&self) -> Result<zx::EventPair, zx::Status> {
281 let gpt_manager = self.gpt_manager().await?;
282 gpt_manager.create_transaction().await
283 }
284
285 async fn commit_transaction(&self, transaction: zx::EventPair) -> Result<(), zx::Status> {
286 let gpt_manager = self.gpt_manager().await?;
287 gpt_manager.commit_transaction(transaction).await
288 }
289
290 async fn add_partition(
291 &self,
292 request: fpartitions::PartitionsManagerAddPartitionRequest,
293 ) -> Result<(), zx::Status> {
294 let gpt_manager = self.gpt_manager().await?;
295 gpt_manager.add_partition(request).await
296 }
297
298 async fn handle_partitions_admin_requests(
299 self: Arc<Self>,
300 mut stream: fpartitions::PartitionsAdminRequestStream,
301 ) -> Result<(), Error> {
302 while let Some(request) = stream.try_next().await.context("Reading request")? {
303 log::debug!(request:?; "");
304 match request {
305 fpartitions::PartitionsAdminRequest::ResetPartitionTable {
306 partitions,
307 responder,
308 } => {
309 responder
310 .send(
311 self.reset_partition_table(partitions)
312 .await
313 .map_err(|status| status.into_raw()),
314 )
315 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
316 }
317 }
318 }
319 Ok(())
320 }
321
322 async fn reset_partition_table(
323 &self,
324 partitions: Vec<fpartitions::PartitionInfo>,
325 ) -> Result<(), zx::Status> {
326 fn convert_partition_info(info: fpartitions::PartitionInfo) -> gpt::PartitionInfo {
327 gpt::PartitionInfo {
328 label: info.name,
329 type_guid: gpt::Guid::from_bytes(info.type_guid.value),
330 instance_guid: gpt::Guid::from_bytes(info.instance_guid.value),
331 start_block: info.start_block,
332 num_blocks: info.num_blocks,
333 flags: info.flags,
334 }
335 }
336 let partitions = partitions.into_iter().map(convert_partition_info).collect::<Vec<_>>();
337
338 let mut state = self.state.lock().await;
339 match &mut *state {
340 State::Stopped => return Err(zx::Status::BAD_STATE),
341 State::NeedsFormatting(block) => {
342 log::info!("reset_partition_table: Reformatting GPT.");
343 let client = Arc::new(RemoteBlockClient::new(&*block).await?);
344
345 log::info!("reset_partition_table: Reformatting GPT...");
346 gpt::Gpt::format(client, partitions).await.map_err(|err| {
347 log::error!(err:?; "reset_partition_table: failed to init GPT");
348 zx::Status::IO
349 })?;
350 *state = State::Running(
351 GptManager::new(block.clone(), self.partitions_dir.clone()).await.map_err(
352 |err| {
353 log::error!(err:?; "reset_partition_table: failed to re-launch GPT");
354 zx::Status::BAD_STATE
355 },
356 )?,
357 );
358 }
359 State::Running(gpt) => {
360 log::info!("reset_partition_table: Updating GPT.");
361 gpt.reset_partition_table(partitions).await?;
362 }
363 }
364 Ok(())
365 }
366
367 async fn handle_admin_requests(
368 &self,
369 mut stream: ffs::AdminRequestStream,
370 ) -> Result<(), Error> {
371 if let Some(request) = stream.try_next().await.context("Reading request")? {
372 match request {
373 ffs::AdminRequest::Shutdown { responder } => {
374 log::info!("Received Admin::Shutdown request");
375 self.shutdown().await;
376 responder
377 .send()
378 .unwrap_or_else(|e| log::error!(e:?; "Failed to send shutdown response"));
379 log::info!("Admin shutdown complete");
380 }
381 }
382 }
383 Ok(())
384 }
385
386 async fn handle_lifecycle_requests(&self, lifecycle_channel: zx::Channel) -> Result<(), Error> {
387 let mut stream = flifecycle::LifecycleRequestStream::from_channel(
388 fasync::Channel::from_channel(lifecycle_channel),
389 );
390 match stream.try_next().await.context("Reading request")? {
391 Some(flifecycle::LifecycleRequest::Stop { .. }) => {
392 log::info!("Received Lifecycle::Stop request");
393 self.shutdown().await;
394 log::info!("Lifecycle shutdown complete");
395 }
396 None => {}
397 }
398 Ok(())
399 }
400
401 async fn gpt_manager(&self) -> Result<Arc<GptManager>, zx::Status> {
402 match &*self.state.lock().await {
403 State::Stopped | State::NeedsFormatting(_) => Err(zx::Status::BAD_STATE),
404 State::Running(gpt) => Ok(gpt.clone()),
405 }
406 }
407
408 async fn shutdown(&self) {
409 let mut state = self.state.lock().await;
410 if let State::Running(gpt) = std::mem::take(&mut *state) {
411 gpt.shutdown().await;
412 }
413 self.scope.shutdown();
414 }
415}
416
417#[cfg(test)]
418mod tests {
419 use super::StorageHostService;
420 use block_client::RemoteBlockClient;
421 use fake_block_server::FakeServer;
422 use fidl::endpoints::Proxy as _;
423 use fidl_fuchsia_process_lifecycle::LifecycleMarker;
424 use fuchsia_component::client::connect_to_protocol_at_dir_svc;
425 use futures::FutureExt as _;
426 use gpt::{Gpt, Guid, PartitionInfo};
427 use std::sync::Arc;
428 use {
429 fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
430 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
431 fidl_fuchsia_io as fio, fidl_fuchsia_storage_partitions as fpartitions,
432 fuchsia_async as fasync,
433 };
434
435 async fn setup_server(
436 block_size: u32,
437 block_count: u64,
438 partitions: Vec<PartitionInfo>,
439 ) -> Arc<FakeServer> {
440 let vmo = zx::Vmo::create(block_size as u64 * block_count).unwrap();
441 let server = Arc::new(FakeServer::from_vmo(512, vmo));
442 {
443 let (block_client, block_server) =
444 fidl::endpoints::create_proxy::<fblock::BlockMarker>();
445 let volume_stream = fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(
446 block_server.into_channel(),
447 )
448 .into_stream();
449 let server_clone = server.clone();
450 let _task = fasync::Task::spawn(async move { server_clone.serve(volume_stream).await });
451 let client = Arc::new(RemoteBlockClient::new(block_client).await.unwrap());
452 Gpt::format(client, partitions).await.unwrap();
453 }
454 server
455 }
456
457 #[fuchsia::test]
458 async fn lifecycle() {
459 let (outgoing_dir, outgoing_dir_server) =
460 fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
461 let (lifecycle_client, lifecycle_server) =
462 fidl::endpoints::create_proxy::<LifecycleMarker>();
463 let (block_client, block_server) =
464 fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
465 let volume_stream =
466 fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
467 .into_stream();
468
469 futures::join!(
470 async {
471 let client =
473 connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
474 .unwrap();
475 client
476 .start(
477 block_client,
478 fstartup::StartOptions {
479 read_only: false,
480 verbose: false,
481 fsck_after_every_transaction: false,
482 write_compression_algorithm:
483 fstartup::CompressionAlgorithm::ZstdChunked,
484 write_compression_level: 0,
485 cache_eviction_policy_override: fstartup::EvictionPolicyOverride::None,
486 startup_profiling_seconds: 0,
487 },
488 )
489 .await
490 .expect("FIDL error")
491 .expect("Start failed");
492 lifecycle_client.stop().expect("Stop failed");
493 fasync::OnSignals::new(
494 &lifecycle_client.into_channel().expect("into_channel failed"),
495 zx::Signals::CHANNEL_PEER_CLOSED,
496 )
497 .await
498 .expect("OnSignals failed");
499 },
500 async {
501 let service = StorageHostService::new();
503 service
504 .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
505 .await
506 .expect("Run failed");
507 },
508 async {
509 let server = setup_server(
511 512,
512 8,
513 vec![PartitionInfo {
514 label: "part".to_string(),
515 type_guid: Guid::from_bytes([0xabu8; 16]),
516 instance_guid: Guid::from_bytes([0xcdu8; 16]),
517 start_block: 4,
518 num_blocks: 1,
519 flags: 0,
520 }],
521 )
522 .await;
523 let _ = server.serve(volume_stream).await;
524 }
525 .fuse(),
526 );
527 }
528
529 #[fuchsia::test]
530 async fn admin_shutdown() {
531 let (outgoing_dir, outgoing_dir_server) =
532 fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
533 let (block_client, block_server) =
534 fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
535 let volume_stream =
536 fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
537 .into_stream();
538
539 futures::join!(
540 async {
541 let client =
543 connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
544 .unwrap();
545 let admin_client =
546 connect_to_protocol_at_dir_svc::<ffs::AdminMarker>(&outgoing_dir).unwrap();
547 client
548 .start(
549 block_client,
550 fstartup::StartOptions {
551 read_only: false,
552 verbose: false,
553 fsck_after_every_transaction: false,
554 write_compression_algorithm:
555 fstartup::CompressionAlgorithm::ZstdChunked,
556 write_compression_level: 0,
557 cache_eviction_policy_override: fstartup::EvictionPolicyOverride::None,
558 startup_profiling_seconds: 0,
559 },
560 )
561 .await
562 .expect("FIDL error")
563 .expect("Start failed");
564 admin_client.shutdown().await.expect("admin shutdown failed");
565 fasync::OnSignals::new(
566 &admin_client.into_channel().expect("into_channel failed"),
567 zx::Signals::CHANNEL_PEER_CLOSED,
568 )
569 .await
570 .expect("OnSignals failed");
571 },
572 async {
573 let service = StorageHostService::new();
575 service.run(outgoing_dir_server.into_channel(), None).await.expect("Run failed");
576 },
577 async {
578 let server = setup_server(
580 512,
581 8,
582 vec![PartitionInfo {
583 label: "part".to_string(),
584 type_guid: Guid::from_bytes([0xabu8; 16]),
585 instance_guid: Guid::from_bytes([0xcdu8; 16]),
586 start_block: 4,
587 num_blocks: 1,
588 flags: 0,
589 }],
590 )
591 .await;
592 let _ = server.serve(volume_stream).await;
593 }
594 .fuse(),
595 );
596 }
597
598 #[fuchsia::test]
599 async fn transaction_lifecycle() {
600 let (outgoing_dir, outgoing_dir_server) =
601 fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
602 let (lifecycle_client, lifecycle_server) =
603 fidl::endpoints::create_proxy::<LifecycleMarker>();
604 let (block_client, block_server) =
605 fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
606 let volume_stream =
607 fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
608 .into_stream();
609
610 futures::join!(
611 async {
612 connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
614 .unwrap()
615 .start(
616 block_client,
617 fstartup::StartOptions {
618 read_only: false,
619 verbose: false,
620 fsck_after_every_transaction: false,
621 write_compression_algorithm:
622 fstartup::CompressionAlgorithm::ZstdChunked,
623 write_compression_level: 0,
624 cache_eviction_policy_override: fstartup::EvictionPolicyOverride::None,
625 startup_profiling_seconds: 0,
626 },
627 )
628 .await
629 .expect("FIDL error")
630 .expect("Start failed");
631
632 let pm_client = connect_to_protocol_at_dir_svc::<
633 fpartitions::PartitionsManagerMarker,
634 >(&outgoing_dir)
635 .unwrap();
636 let transaction = pm_client
637 .create_transaction()
638 .await
639 .expect("FIDL error")
640 .expect("create_transaction failed");
641
642 pm_client
643 .create_transaction()
644 .await
645 .expect("FIDL error")
646 .expect_err("create_transaction should fail while other txn exists");
647
648 pm_client
649 .commit_transaction(transaction)
650 .await
651 .expect("FIDL error")
652 .expect("commit_transaction failed");
653
654 {
655 let _transaction = pm_client
656 .create_transaction()
657 .await
658 .expect("FIDL error")
659 .expect("create_transaction should succeed after committing txn");
660 }
661
662 pm_client
663 .create_transaction()
664 .await
665 .expect("FIDL error")
666 .expect("create_transaction should succeed after dropping txn");
667
668 lifecycle_client.stop().expect("Stop failed");
669 fasync::OnSignals::new(
670 &lifecycle_client.into_channel().expect("into_channel failed"),
671 zx::Signals::CHANNEL_PEER_CLOSED,
672 )
673 .await
674 .expect("OnSignals failed");
675 },
676 async {
677 let service = StorageHostService::new();
679 service
680 .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
681 .await
682 .expect("Run failed");
683 },
684 async {
685 let server = setup_server(
687 512,
688 16,
689 vec![PartitionInfo {
690 label: "part".to_string(),
691 type_guid: Guid::from_bytes([0xabu8; 16]),
692 instance_guid: Guid::from_bytes([0xcdu8; 16]),
693 start_block: 4,
694 num_blocks: 1,
695 flags: 0,
696 }],
697 )
698 .await;
699 let _ = server.serve(volume_stream).await;
700 }
701 .fuse(),
702 );
703 }
704}