1use crate::config::Config;
6use crate::gpt::GptManager;
7use anyhow::{Context as _, Error};
8use block_client::RemoteBlockClient;
9use fidl::endpoints::{DiscoverableProtocolMarker as _, RequestStream as _, ServiceMarker as _};
10use futures::lock::Mutex as AsyncMutex;
11use futures::stream::TryStreamExt as _;
12use std::sync::Arc;
13use vfs::directory::helper::DirectlyMutable as _;
14use vfs::execution_scope::ExecutionScope;
15use {
16 fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup, fidl_fuchsia_io as fio,
17 fidl_fuchsia_process_lifecycle as flifecycle, fidl_fuchsia_storage_block as fblock,
18 fidl_fuchsia_storage_partitions as fpartitions, fuchsia_async as fasync,
19};
20
21pub struct StorageHostService {
22 state: AsyncMutex<State>,
23
24 scope: ExecutionScope,
26
27 export_dir: Arc<vfs::directory::immutable::Simple>,
29
30 partitions_dir: Arc<vfs::directory::immutable::Simple>,
32}
33
34#[derive(Default)]
35enum State {
36 #[default]
37 Stopped,
38 NeedsFormatting(Config, fblock::BlockProxy),
41 Running(Arc<GptManager>),
42}
43
44impl State {
45 fn is_stopped(&self) -> bool {
46 if let Self::Stopped = self { true } else { false }
47 }
48}
49
50impl StorageHostService {
51 pub fn new() -> Arc<Self> {
52 let export_dir = vfs::directory::immutable::simple();
53 let partitions_dir = vfs::directory::immutable::simple();
54 Arc::new(Self {
55 state: Default::default(),
56 scope: ExecutionScope::new(),
57 export_dir,
58 partitions_dir,
59 })
60 }
61
62 pub async fn run(
63 self: Arc<Self>,
64 outgoing_dir: zx::Channel,
65 lifecycle_channel: Option<zx::Channel>,
66 ) -> Result<(), Error> {
67 let svc_dir = vfs::directory::immutable::simple();
68 self.export_dir.add_entry("svc", svc_dir.clone()).expect("Unable to create svc dir");
69
70 svc_dir
71 .add_entry(
72 fpartitions::PartitionServiceMarker::SERVICE_NAME,
73 self.partitions_dir.clone(),
74 )
75 .unwrap();
76 let weak = Arc::downgrade(&self);
77 svc_dir
78 .add_entry(
79 fstartup::StartupMarker::PROTOCOL_NAME,
80 vfs::service::host(move |requests| {
81 let weak = weak.clone();
82 async move {
83 if let Some(me) = weak.upgrade() {
84 let _ = me.handle_start_requests(requests).await;
85 }
86 }
87 }),
88 )
89 .unwrap();
90 let weak = Arc::downgrade(&self);
91 svc_dir
92 .add_entry(
93 fpartitions::PartitionsAdminMarker::PROTOCOL_NAME,
94 vfs::service::host(move |requests| {
95 let weak = weak.clone();
96 async move {
97 if let Some(me) = weak.upgrade() {
98 let _ = me.handle_partitions_admin_requests(requests).await;
99 }
100 }
101 }),
102 )
103 .unwrap();
104 let weak = Arc::downgrade(&self);
105 svc_dir
106 .add_entry(
107 fpartitions::PartitionsManagerMarker::PROTOCOL_NAME,
108 vfs::service::host(move |requests| {
109 let weak = weak.clone();
110 async move {
111 if let Some(me) = weak.upgrade() {
112 let _ = me.handle_partitions_manager_requests(requests).await;
113 }
114 }
115 }),
116 )
117 .unwrap();
118 let weak = Arc::downgrade(&self);
119 svc_dir
120 .add_entry(
121 ffs::AdminMarker::PROTOCOL_NAME,
122 vfs::service::host(move |requests| {
123 let weak = weak.clone();
124 async move {
125 if let Some(me) = weak.upgrade() {
126 let _ = me.handle_admin_requests(requests).await;
127 }
128 }
129 }),
130 )
131 .unwrap();
132
133 vfs::directory::serve_on(
134 self.export_dir.clone(),
135 fio::PERM_READABLE | fio::PERM_WRITABLE | fio::PERM_EXECUTABLE,
136 self.scope.clone(),
137 fidl::endpoints::ServerEnd::new(outgoing_dir),
138 );
139
140 if let Some(channel) = lifecycle_channel {
141 let me = self.clone();
142 self.scope.spawn(async move {
143 if let Err(e) = me.handle_lifecycle_requests(channel).await {
144 log::warn!(error:? = e; "handle_lifecycle_requests");
145 }
146 });
147 }
148
149 self.scope.wait().await;
150
151 Ok(())
152 }
153
154 async fn handle_start_requests(
155 self: Arc<Self>,
156 mut stream: fstartup::StartupRequestStream,
157 ) -> Result<(), Error> {
158 while let Some(request) = stream.try_next().await.context("Reading request")? {
159 log::debug!(request:?; "");
160 match request {
161 fstartup::StartupRequest::Start { device, options, responder } => {
162 responder
163 .send(
164 self.start(device.into_proxy(), options.into())
165 .await
166 .map_err(|status| status.into_raw()),
167 )
168 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
169 }
170 fstartup::StartupRequest::Format { responder, .. } => {
171 responder
172 .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
173 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
174 }
175 fstartup::StartupRequest::Check { responder, .. } => {
176 responder
177 .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
178 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
179 }
180 }
181 }
182 Ok(())
183 }
184
185 async fn start(
186 self: &Arc<Self>,
187 device: fblock::BlockProxy,
188 config: Config,
189 ) -> Result<(), zx::Status> {
190 log::info!(config:?; "GPT starting");
191 let mut state = self.state.lock().await;
192 if !state.is_stopped() {
193 log::warn!("Device already bound");
194 return Err(zx::Status::ALREADY_BOUND);
195 }
196
197 *state = match GptManager::new_with_config(
206 device.clone(),
207 self.partitions_dir.clone(),
208 config.clone(),
209 )
210 .await
211 {
212 Ok(runner) => State::Running(runner),
213 Err(err) => {
214 log::warn!(err:?; "Failed to load GPT. Reformatting may be required.");
218 State::NeedsFormatting(config, device)
219 }
220 };
221 Ok(())
222 }
223
224 async fn handle_partitions_manager_requests(
225 self: Arc<Self>,
226 mut stream: fpartitions::PartitionsManagerRequestStream,
227 ) -> Result<(), Error> {
228 while let Some(request) = stream.try_next().await.context("Reading request")? {
229 log::debug!(request:?; "");
230 match request {
231 fpartitions::PartitionsManagerRequest::GetBlockInfo { responder } => {
232 responder
233 .send(self.get_block_info().await.map_err(|status| status.into_raw()))
234 .unwrap_or_else(
235 |e| log::error!(e:?; "Failed to send GetBlockInfo response"),
236 );
237 }
238 fpartitions::PartitionsManagerRequest::CreateTransaction { responder } => {
239 responder
240 .send(self.create_transaction().await.map_err(|status| status.into_raw()))
241 .unwrap_or_else(
242 |e| log::error!(e:?; "Failed to send CreateTransaction response"),
243 );
244 }
245 fpartitions::PartitionsManagerRequest::CommitTransaction {
246 transaction,
247 responder,
248 } => {
249 responder
250 .send(
251 self.commit_transaction(transaction)
252 .await
253 .map_err(|status| status.into_raw()),
254 )
255 .unwrap_or_else(
256 |e| log::error!(e:?; "Failed to send CommitTransaction response"),
257 );
258 }
259 fpartitions::PartitionsManagerRequest::AddPartition { payload, responder } => {
260 responder
261 .send(self.add_partition(payload).await.map_err(|status| status.into_raw()))
262 .unwrap_or_else(
263 |e| log::error!(e:?; "Failed to send AddPartition response"),
264 );
265 }
266 }
267 }
268 Ok(())
269 }
270
271 async fn get_block_info(&self) -> Result<(u64, u32), zx::Status> {
272 let state = self.state.lock().await;
273 match &*state {
274 State::Stopped => return Err(zx::Status::BAD_STATE),
275 State::NeedsFormatting(_, block) => {
276 let info = block
277 .get_info()
278 .await
279 .map_err(|err| {
280 log::error!(err:?; "get_block_info: failed to query block info");
281 zx::Status::IO
282 })?
283 .map_err(zx::Status::from_raw)?;
284 Ok((info.block_count, info.block_size))
285 }
286 State::Running(gpt) => Ok((gpt.block_count(), gpt.block_size())),
287 }
288 }
289
290 async fn create_transaction(&self) -> Result<zx::EventPair, zx::Status> {
291 let gpt_manager = self.gpt_manager().await?;
292 gpt_manager.create_transaction().await
293 }
294
295 async fn commit_transaction(&self, transaction: zx::EventPair) -> Result<(), zx::Status> {
296 let gpt_manager = self.gpt_manager().await?;
297 gpt_manager.commit_transaction(transaction).await
298 }
299
300 async fn add_partition(
301 &self,
302 request: fpartitions::PartitionsManagerAddPartitionRequest,
303 ) -> Result<(), zx::Status> {
304 let gpt_manager = self.gpt_manager().await?;
305 gpt_manager.add_partition(request).await
306 }
307
308 async fn handle_partitions_admin_requests(
309 self: Arc<Self>,
310 mut stream: fpartitions::PartitionsAdminRequestStream,
311 ) -> Result<(), Error> {
312 while let Some(request) = stream.try_next().await.context("Reading request")? {
313 log::debug!(request:?; "");
314 match request {
315 fpartitions::PartitionsAdminRequest::ResetPartitionTable {
316 partitions,
317 responder,
318 } => {
319 responder
320 .send(
321 self.reset_partition_table(partitions)
322 .await
323 .map_err(|status| status.into_raw()),
324 )
325 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
326 }
327 }
328 }
329 Ok(())
330 }
331
332 async fn reset_partition_table(
333 &self,
334 partitions: Vec<fpartitions::PartitionInfo>,
335 ) -> Result<(), zx::Status> {
336 fn convert_partition_info(info: fpartitions::PartitionInfo) -> gpt::PartitionInfo {
337 gpt::PartitionInfo {
338 label: info.name,
339 type_guid: gpt::Guid::from_bytes(info.type_guid.value),
340 instance_guid: gpt::Guid::from_bytes(info.instance_guid.value),
341 start_block: info.start_block,
342 num_blocks: info.num_blocks,
343 flags: info.flags,
344 }
345 }
346 let partitions = partitions.into_iter().map(convert_partition_info).collect::<Vec<_>>();
347
348 let mut state = self.state.lock().await;
349 match &mut *state {
350 State::Stopped => return Err(zx::Status::BAD_STATE),
351 State::NeedsFormatting(config, block) => {
352 log::info!("reset_partition_table: Reformatting GPT.");
353 let client = Arc::new(RemoteBlockClient::new(block.clone()).await?);
354
355 log::info!("reset_partition_table: Reformatting GPT...");
356 gpt::Gpt::format(client, partitions).await.map_err(|err| {
357 log::error!(err:?; "reset_partition_table: failed to init GPT");
358 zx::Status::IO
359 })?;
360 *state = State::Running(
361 GptManager::new_with_config(
362 block.clone(),
363 self.partitions_dir.clone(),
364 config.clone(),
365 )
366 .await
367 .map_err(|err| {
368 log::error!(err:?; "reset_partition_table: failed to re-launch GPT");
369 zx::Status::BAD_STATE
370 })?,
371 );
372 }
373 State::Running(gpt) => {
374 log::info!("reset_partition_table: Updating GPT.");
375 gpt.reset_partition_table(partitions).await?;
376 }
377 }
378 Ok(())
379 }
380
381 async fn handle_admin_requests(
382 &self,
383 mut stream: ffs::AdminRequestStream,
384 ) -> Result<(), Error> {
385 if let Some(request) = stream.try_next().await.context("Reading request")? {
386 match request {
387 ffs::AdminRequest::Shutdown { responder } => {
388 log::info!("Received Admin::Shutdown request");
389 self.shutdown().await;
390 responder
391 .send()
392 .unwrap_or_else(|e| log::error!(e:?; "Failed to send shutdown response"));
393 log::info!("Admin shutdown complete");
394 }
395 }
396 }
397 Ok(())
398 }
399
400 async fn handle_lifecycle_requests(&self, lifecycle_channel: zx::Channel) -> Result<(), Error> {
401 let mut stream = flifecycle::LifecycleRequestStream::from_channel(
402 fasync::Channel::from_channel(lifecycle_channel),
403 );
404 match stream.try_next().await.context("Reading request")? {
405 Some(flifecycle::LifecycleRequest::Stop { .. }) => {
406 log::info!("Received Lifecycle::Stop request");
407 self.shutdown().await;
408 log::info!("Lifecycle shutdown complete");
409 }
410 None => {}
411 }
412 Ok(())
413 }
414
415 async fn gpt_manager(&self) -> Result<Arc<GptManager>, zx::Status> {
416 match &*self.state.lock().await {
417 State::Stopped | State::NeedsFormatting(_, _) => Err(zx::Status::BAD_STATE),
418 State::Running(gpt) => Ok(gpt.clone()),
419 }
420 }
421
422 async fn shutdown(&self) {
423 let mut state = self.state.lock().await;
424 if let State::Running(gpt) = std::mem::take(&mut *state) {
425 gpt.shutdown().await;
426 }
427 self.scope.shutdown();
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::StorageHostService;
434 use block_client::RemoteBlockClient;
435 use fidl::endpoints::Proxy as _;
436 use fidl_fuchsia_process_lifecycle::LifecycleMarker;
437 use fuchsia_component::client::connect_to_protocol_at_dir_svc;
438 use futures::FutureExt as _;
439 use gpt::{Gpt, Guid, PartitionInfo};
440 use std::sync::Arc;
441 use vmo_backed_block_server::{VmoBackedServer, VmoBackedServerTestingExt as _};
442 use {
443 fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup, fidl_fuchsia_io as fio,
444 fidl_fuchsia_storage_block as fblock, fidl_fuchsia_storage_partitions as fpartitions,
445 fuchsia_async as fasync,
446 };
447
448 async fn setup_server(
449 block_size: u32,
450 block_count: u64,
451 partitions: Vec<PartitionInfo>,
452 ) -> Arc<VmoBackedServer> {
453 let vmo = zx::Vmo::create(block_size as u64 * block_count).unwrap();
454 let server = Arc::new(VmoBackedServer::from_vmo(512, vmo));
455 {
456 let (block_client, block_server) =
457 fidl::endpoints::create_proxy::<fblock::BlockMarker>();
458 let volume_stream = fidl::endpoints::ServerEnd::<fblock::BlockMarker>::from(
459 block_server.into_channel(),
460 )
461 .into_stream();
462 let server_clone = server.clone();
463 let _task = fasync::Task::spawn(async move { server_clone.serve(volume_stream).await });
464 let client = Arc::new(RemoteBlockClient::new(block_client).await.unwrap());
465 Gpt::format(client, partitions).await.unwrap();
466 }
467 server
468 }
469
470 #[fuchsia::test]
471 async fn lifecycle() {
472 let (outgoing_dir, outgoing_dir_server) =
473 fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
474 let (lifecycle_client, lifecycle_server) =
475 fidl::endpoints::create_proxy::<LifecycleMarker>();
476 let (block_client, block_server) =
477 fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
478 let volume_stream =
479 fidl::endpoints::ServerEnd::<fblock::BlockMarker>::from(block_server.into_channel())
480 .into_stream();
481
482 futures::join!(
483 async {
484 let client =
486 connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
487 .unwrap();
488 client
489 .start(block_client, &fstartup::StartOptions::default())
490 .await
491 .expect("FIDL error")
492 .expect("Start failed");
493 lifecycle_client.stop().expect("Stop failed");
494 fasync::OnSignals::new(
495 &lifecycle_client.into_channel().expect("into_channel failed"),
496 zx::Signals::CHANNEL_PEER_CLOSED,
497 )
498 .await
499 .expect("OnSignals failed");
500 },
501 async {
502 let service = StorageHostService::new();
504 service
505 .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
506 .await
507 .expect("Run failed");
508 },
509 async {
510 let server = setup_server(
512 512,
513 8,
514 vec![PartitionInfo {
515 label: "part".to_string(),
516 type_guid: Guid::from_bytes([0xabu8; 16]),
517 instance_guid: Guid::from_bytes([0xcdu8; 16]),
518 start_block: 4,
519 num_blocks: 1,
520 flags: 0,
521 }],
522 )
523 .await;
524 let _ = server.serve(volume_stream).await;
525 }
526 .fuse(),
527 );
528 }
529
530 #[fuchsia::test]
531 async fn admin_shutdown() {
532 let (outgoing_dir, outgoing_dir_server) =
533 fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
534 let (block_client, block_server) =
535 fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
536 let volume_stream =
537 fidl::endpoints::ServerEnd::<fblock::BlockMarker>::from(block_server.into_channel())
538 .into_stream();
539
540 futures::join!(
541 async {
542 let client =
544 connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
545 .unwrap();
546 let admin_client =
547 connect_to_protocol_at_dir_svc::<ffs::AdminMarker>(&outgoing_dir).unwrap();
548 client
549 .start(block_client, &fstartup::StartOptions::default())
550 .await
551 .expect("FIDL error")
552 .expect("Start failed");
553 admin_client.shutdown().await.expect("admin shutdown failed");
554 fasync::OnSignals::new(
555 &admin_client.into_channel().expect("into_channel failed"),
556 zx::Signals::CHANNEL_PEER_CLOSED,
557 )
558 .await
559 .expect("OnSignals failed");
560 },
561 async {
562 let service = StorageHostService::new();
564 service.run(outgoing_dir_server.into_channel(), None).await.expect("Run failed");
565 },
566 async {
567 let server = setup_server(
569 512,
570 8,
571 vec![PartitionInfo {
572 label: "part".to_string(),
573 type_guid: Guid::from_bytes([0xabu8; 16]),
574 instance_guid: Guid::from_bytes([0xcdu8; 16]),
575 start_block: 4,
576 num_blocks: 1,
577 flags: 0,
578 }],
579 )
580 .await;
581 let _ = server.serve(volume_stream).await;
582 }
583 .fuse(),
584 );
585 }
586
587 #[fuchsia::test]
588 async fn transaction_lifecycle() {
589 let (outgoing_dir, outgoing_dir_server) =
590 fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
591 let (lifecycle_client, lifecycle_server) =
592 fidl::endpoints::create_proxy::<LifecycleMarker>();
593 let (block_client, block_server) =
594 fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
595 let volume_stream =
596 fidl::endpoints::ServerEnd::<fblock::BlockMarker>::from(block_server.into_channel())
597 .into_stream();
598
599 futures::join!(
600 async {
601 connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
603 .unwrap()
604 .start(block_client, &fstartup::StartOptions::default())
605 .await
606 .expect("FIDL error")
607 .expect("Start failed");
608
609 let pm_client = connect_to_protocol_at_dir_svc::<
610 fpartitions::PartitionsManagerMarker,
611 >(&outgoing_dir)
612 .unwrap();
613 let transaction = pm_client
614 .create_transaction()
615 .await
616 .expect("FIDL error")
617 .expect("create_transaction failed");
618
619 pm_client
620 .create_transaction()
621 .await
622 .expect("FIDL error")
623 .expect_err("create_transaction should fail while other txn exists");
624
625 pm_client
626 .commit_transaction(transaction)
627 .await
628 .expect("FIDL error")
629 .expect("commit_transaction failed");
630
631 {
632 let _transaction = pm_client
633 .create_transaction()
634 .await
635 .expect("FIDL error")
636 .expect("create_transaction should succeed after committing txn");
637 }
638
639 pm_client
640 .create_transaction()
641 .await
642 .expect("FIDL error")
643 .expect("create_transaction should succeed after dropping txn");
644
645 lifecycle_client.stop().expect("Stop failed");
646 fasync::OnSignals::new(
647 &lifecycle_client.into_channel().expect("into_channel failed"),
648 zx::Signals::CHANNEL_PEER_CLOSED,
649 )
650 .await
651 .expect("OnSignals failed");
652 },
653 async {
654 let service = StorageHostService::new();
656 service
657 .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
658 .await
659 .expect("Run failed");
660 },
661 async {
662 let server = setup_server(
664 512,
665 16,
666 vec![PartitionInfo {
667 label: "part".to_string(),
668 type_guid: Guid::from_bytes([0xabu8; 16]),
669 instance_guid: Guid::from_bytes([0xcdu8; 16]),
670 start_block: 4,
671 num_blocks: 1,
672 flags: 0,
673 }],
674 )
675 .await;
676 let _ = server.serve(volume_stream).await;
677 }
678 .fuse(),
679 );
680 }
681}