1use crate::config::Config;
6use crate::gpt::GptManager;
7use anyhow::{Context as _, Error};
8use block_client::RemoteBlockClient;
9use fidl::endpoints::{DiscoverableProtocolMarker as _, RequestStream as _, ServiceMarker as _};
10use futures::lock::Mutex as AsyncMutex;
11use futures::stream::TryStreamExt as _;
12use std::sync::Arc;
13use vfs::directory::helper::DirectlyMutable as _;
14use vfs::execution_scope::ExecutionScope;
15use {
16 fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
17 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_io as fio,
18 fidl_fuchsia_process_lifecycle as flifecycle, fidl_fuchsia_storage_partitions as fpartitions,
19 fuchsia_async as fasync,
20};
21
22pub struct StorageHostService {
23 state: AsyncMutex<State>,
24
25 scope: ExecutionScope,
27
28 export_dir: Arc<vfs::directory::immutable::Simple>,
30
31 partitions_dir: Arc<vfs::directory::immutable::Simple>,
33}
34
35#[derive(Default)]
36enum State {
37 #[default]
38 Stopped,
39 NeedsFormatting(Config, fblock::BlockProxy),
42 Running(Arc<GptManager>),
43}
44
45impl State {
46 fn is_stopped(&self) -> bool {
47 if let Self::Stopped = self { true } else { false }
48 }
49}
50
51impl StorageHostService {
52 pub fn new() -> Arc<Self> {
53 let export_dir = vfs::directory::immutable::simple();
54 let partitions_dir = vfs::directory::immutable::simple();
55 Arc::new(Self {
56 state: Default::default(),
57 scope: ExecutionScope::new(),
58 export_dir,
59 partitions_dir,
60 })
61 }
62
63 pub async fn run(
64 self: Arc<Self>,
65 outgoing_dir: zx::Channel,
66 lifecycle_channel: Option<zx::Channel>,
67 ) -> Result<(), Error> {
68 let svc_dir = vfs::directory::immutable::simple();
69 self.export_dir.add_entry("svc", svc_dir.clone()).expect("Unable to create svc dir");
70
71 svc_dir
72 .add_entry(
73 fpartitions::PartitionServiceMarker::SERVICE_NAME,
74 self.partitions_dir.clone(),
75 )
76 .unwrap();
77 let weak = Arc::downgrade(&self);
78 svc_dir
79 .add_entry(
80 fstartup::StartupMarker::PROTOCOL_NAME,
81 vfs::service::host(move |requests| {
82 let weak = weak.clone();
83 async move {
84 if let Some(me) = weak.upgrade() {
85 let _ = me.handle_start_requests(requests).await;
86 }
87 }
88 }),
89 )
90 .unwrap();
91 let weak = Arc::downgrade(&self);
92 svc_dir
93 .add_entry(
94 fpartitions::PartitionsAdminMarker::PROTOCOL_NAME,
95 vfs::service::host(move |requests| {
96 let weak = weak.clone();
97 async move {
98 if let Some(me) = weak.upgrade() {
99 let _ = me.handle_partitions_admin_requests(requests).await;
100 }
101 }
102 }),
103 )
104 .unwrap();
105 let weak = Arc::downgrade(&self);
106 svc_dir
107 .add_entry(
108 fpartitions::PartitionsManagerMarker::PROTOCOL_NAME,
109 vfs::service::host(move |requests| {
110 let weak = weak.clone();
111 async move {
112 if let Some(me) = weak.upgrade() {
113 let _ = me.handle_partitions_manager_requests(requests).await;
114 }
115 }
116 }),
117 )
118 .unwrap();
119 let weak = Arc::downgrade(&self);
120 svc_dir
121 .add_entry(
122 ffs::AdminMarker::PROTOCOL_NAME,
123 vfs::service::host(move |requests| {
124 let weak = weak.clone();
125 async move {
126 if let Some(me) = weak.upgrade() {
127 let _ = me.handle_admin_requests(requests).await;
128 }
129 }
130 }),
131 )
132 .unwrap();
133
134 vfs::directory::serve_on(
135 self.export_dir.clone(),
136 fio::PERM_READABLE | fio::PERM_WRITABLE | fio::PERM_EXECUTABLE,
137 self.scope.clone(),
138 fidl::endpoints::ServerEnd::new(outgoing_dir),
139 );
140
141 if let Some(channel) = lifecycle_channel {
142 let me = self.clone();
143 self.scope.spawn(async move {
144 if let Err(e) = me.handle_lifecycle_requests(channel).await {
145 log::warn!(error:? = e; "handle_lifecycle_requests");
146 }
147 });
148 }
149
150 self.scope.wait().await;
151
152 Ok(())
153 }
154
155 async fn handle_start_requests(
156 self: Arc<Self>,
157 mut stream: fstartup::StartupRequestStream,
158 ) -> Result<(), Error> {
159 while let Some(request) = stream.try_next().await.context("Reading request")? {
160 log::debug!(request:?; "");
161 match request {
162 fstartup::StartupRequest::Start { device, options, responder } => {
163 responder
164 .send(
165 self.start(device.into_proxy(), options.into())
166 .await
167 .map_err(|status| status.into_raw()),
168 )
169 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
170 }
171 fstartup::StartupRequest::Format { responder, .. } => {
172 responder
173 .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
174 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
175 }
176 fstartup::StartupRequest::Check { responder, .. } => {
177 responder
178 .send(Err(zx::Status::NOT_SUPPORTED.into_raw()))
179 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Check response"));
180 }
181 }
182 }
183 Ok(())
184 }
185
186 async fn start(
187 self: &Arc<Self>,
188 device: fblock::BlockProxy,
189 config: Config,
190 ) -> Result<(), zx::Status> {
191 log::info!(config:?; "GPT starting");
192 let mut state = self.state.lock().await;
193 if !state.is_stopped() {
194 log::warn!("Device already bound");
195 return Err(zx::Status::ALREADY_BOUND);
196 }
197
198 *state = match GptManager::new_with_config(
207 device.clone(),
208 self.partitions_dir.clone(),
209 config.clone(),
210 )
211 .await
212 {
213 Ok(runner) => State::Running(runner),
214 Err(err) => {
215 log::warn!(err:?; "Failed to load GPT. Reformatting may be required.");
219 State::NeedsFormatting(config, device)
220 }
221 };
222 Ok(())
223 }
224
225 async fn handle_partitions_manager_requests(
226 self: Arc<Self>,
227 mut stream: fpartitions::PartitionsManagerRequestStream,
228 ) -> Result<(), Error> {
229 while let Some(request) = stream.try_next().await.context("Reading request")? {
230 log::debug!(request:?; "");
231 match request {
232 fpartitions::PartitionsManagerRequest::GetBlockInfo { responder } => {
233 responder
234 .send(self.get_block_info().await.map_err(|status| status.into_raw()))
235 .unwrap_or_else(
236 |e| log::error!(e:?; "Failed to send GetBlockInfo response"),
237 );
238 }
239 fpartitions::PartitionsManagerRequest::CreateTransaction { responder } => {
240 responder
241 .send(self.create_transaction().await.map_err(|status| status.into_raw()))
242 .unwrap_or_else(
243 |e| log::error!(e:?; "Failed to send CreateTransaction response"),
244 );
245 }
246 fpartitions::PartitionsManagerRequest::CommitTransaction {
247 transaction,
248 responder,
249 } => {
250 responder
251 .send(
252 self.commit_transaction(transaction)
253 .await
254 .map_err(|status| status.into_raw()),
255 )
256 .unwrap_or_else(
257 |e| log::error!(e:?; "Failed to send CommitTransaction response"),
258 );
259 }
260 fpartitions::PartitionsManagerRequest::AddPartition { payload, responder } => {
261 responder
262 .send(self.add_partition(payload).await.map_err(|status| status.into_raw()))
263 .unwrap_or_else(
264 |e| log::error!(e:?; "Failed to send AddPartition response"),
265 );
266 }
267 }
268 }
269 Ok(())
270 }
271
272 async fn get_block_info(&self) -> Result<(u64, u32), zx::Status> {
273 let state = self.state.lock().await;
274 match &*state {
275 State::Stopped => return Err(zx::Status::BAD_STATE),
276 State::NeedsFormatting(_, block) => {
277 let info = block
278 .get_info()
279 .await
280 .map_err(|err| {
281 log::error!(err:?; "get_block_info: failed to query block info");
282 zx::Status::IO
283 })?
284 .map_err(zx::Status::from_raw)?;
285 Ok((info.block_count, info.block_size))
286 }
287 State::Running(gpt) => Ok((gpt.block_count(), gpt.block_size())),
288 }
289 }
290
291 async fn create_transaction(&self) -> Result<zx::EventPair, zx::Status> {
292 let gpt_manager = self.gpt_manager().await?;
293 gpt_manager.create_transaction().await
294 }
295
296 async fn commit_transaction(&self, transaction: zx::EventPair) -> Result<(), zx::Status> {
297 let gpt_manager = self.gpt_manager().await?;
298 gpt_manager.commit_transaction(transaction).await
299 }
300
301 async fn add_partition(
302 &self,
303 request: fpartitions::PartitionsManagerAddPartitionRequest,
304 ) -> Result<(), zx::Status> {
305 let gpt_manager = self.gpt_manager().await?;
306 gpt_manager.add_partition(request).await
307 }
308
309 async fn handle_partitions_admin_requests(
310 self: Arc<Self>,
311 mut stream: fpartitions::PartitionsAdminRequestStream,
312 ) -> Result<(), Error> {
313 while let Some(request) = stream.try_next().await.context("Reading request")? {
314 log::debug!(request:?; "");
315 match request {
316 fpartitions::PartitionsAdminRequest::ResetPartitionTable {
317 partitions,
318 responder,
319 } => {
320 responder
321 .send(
322 self.reset_partition_table(partitions)
323 .await
324 .map_err(|status| status.into_raw()),
325 )
326 .unwrap_or_else(|e| log::error!(e:?; "Failed to send Start response"));
327 }
328 }
329 }
330 Ok(())
331 }
332
333 async fn reset_partition_table(
334 &self,
335 partitions: Vec<fpartitions::PartitionInfo>,
336 ) -> Result<(), zx::Status> {
337 fn convert_partition_info(info: fpartitions::PartitionInfo) -> gpt::PartitionInfo {
338 gpt::PartitionInfo {
339 label: info.name,
340 type_guid: gpt::Guid::from_bytes(info.type_guid.value),
341 instance_guid: gpt::Guid::from_bytes(info.instance_guid.value),
342 start_block: info.start_block,
343 num_blocks: info.num_blocks,
344 flags: info.flags,
345 }
346 }
347 let partitions = partitions.into_iter().map(convert_partition_info).collect::<Vec<_>>();
348
349 let mut state = self.state.lock().await;
350 match &mut *state {
351 State::Stopped => return Err(zx::Status::BAD_STATE),
352 State::NeedsFormatting(config, block) => {
353 log::info!("reset_partition_table: Reformatting GPT.");
354 let client = Arc::new(RemoteBlockClient::new(&*block).await?);
355
356 log::info!("reset_partition_table: Reformatting GPT...");
357 gpt::Gpt::format(client, partitions).await.map_err(|err| {
358 log::error!(err:?; "reset_partition_table: failed to init GPT");
359 zx::Status::IO
360 })?;
361 *state = State::Running(
362 GptManager::new_with_config(
363 block.clone(),
364 self.partitions_dir.clone(),
365 config.clone(),
366 )
367 .await
368 .map_err(|err| {
369 log::error!(err:?; "reset_partition_table: failed to re-launch GPT");
370 zx::Status::BAD_STATE
371 })?,
372 );
373 }
374 State::Running(gpt) => {
375 log::info!("reset_partition_table: Updating GPT.");
376 gpt.reset_partition_table(partitions).await?;
377 }
378 }
379 Ok(())
380 }
381
382 async fn handle_admin_requests(
383 &self,
384 mut stream: ffs::AdminRequestStream,
385 ) -> Result<(), Error> {
386 if let Some(request) = stream.try_next().await.context("Reading request")? {
387 match request {
388 ffs::AdminRequest::Shutdown { responder } => {
389 log::info!("Received Admin::Shutdown request");
390 self.shutdown().await;
391 responder
392 .send()
393 .unwrap_or_else(|e| log::error!(e:?; "Failed to send shutdown response"));
394 log::info!("Admin shutdown complete");
395 }
396 }
397 }
398 Ok(())
399 }
400
401 async fn handle_lifecycle_requests(&self, lifecycle_channel: zx::Channel) -> Result<(), Error> {
402 let mut stream = flifecycle::LifecycleRequestStream::from_channel(
403 fasync::Channel::from_channel(lifecycle_channel),
404 );
405 match stream.try_next().await.context("Reading request")? {
406 Some(flifecycle::LifecycleRequest::Stop { .. }) => {
407 log::info!("Received Lifecycle::Stop request");
408 self.shutdown().await;
409 log::info!("Lifecycle shutdown complete");
410 }
411 None => {}
412 }
413 Ok(())
414 }
415
416 async fn gpt_manager(&self) -> Result<Arc<GptManager>, zx::Status> {
417 match &*self.state.lock().await {
418 State::Stopped | State::NeedsFormatting(_, _) => Err(zx::Status::BAD_STATE),
419 State::Running(gpt) => Ok(gpt.clone()),
420 }
421 }
422
423 async fn shutdown(&self) {
424 let mut state = self.state.lock().await;
425 if let State::Running(gpt) = std::mem::take(&mut *state) {
426 gpt.shutdown().await;
427 }
428 self.scope.shutdown();
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use super::StorageHostService;
435 use block_client::RemoteBlockClient;
436 use fidl::endpoints::Proxy as _;
437 use fidl_fuchsia_process_lifecycle::LifecycleMarker;
438 use fuchsia_component::client::connect_to_protocol_at_dir_svc;
439 use futures::FutureExt as _;
440 use gpt::{Gpt, Guid, PartitionInfo};
441 use std::sync::Arc;
442 use vmo_backed_block_server::{VmoBackedServer, VmoBackedServerTestingExt as _};
443 use {
444 fidl_fuchsia_fs as ffs, fidl_fuchsia_fs_startup as fstartup,
445 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
446 fidl_fuchsia_io as fio, fidl_fuchsia_storage_partitions as fpartitions,
447 fuchsia_async as fasync,
448 };
449
450 async fn setup_server(
451 block_size: u32,
452 block_count: u64,
453 partitions: Vec<PartitionInfo>,
454 ) -> Arc<VmoBackedServer> {
455 let vmo = zx::Vmo::create(block_size as u64 * block_count).unwrap();
456 let server = Arc::new(VmoBackedServer::from_vmo(512, vmo));
457 {
458 let (block_client, block_server) =
459 fidl::endpoints::create_proxy::<fblock::BlockMarker>();
460 let volume_stream = fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(
461 block_server.into_channel(),
462 )
463 .into_stream();
464 let server_clone = server.clone();
465 let _task = fasync::Task::spawn(async move { server_clone.serve(volume_stream).await });
466 let client = Arc::new(RemoteBlockClient::new(block_client).await.unwrap());
467 Gpt::format(client, partitions).await.unwrap();
468 }
469 server
470 }
471
472 #[fuchsia::test]
473 async fn lifecycle() {
474 let (outgoing_dir, outgoing_dir_server) =
475 fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
476 let (lifecycle_client, lifecycle_server) =
477 fidl::endpoints::create_proxy::<LifecycleMarker>();
478 let (block_client, block_server) =
479 fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
480 let volume_stream =
481 fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
482 .into_stream();
483
484 futures::join!(
485 async {
486 let client =
488 connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
489 .unwrap();
490 client
491 .start(block_client, &fstartup::StartOptions::default())
492 .await
493 .expect("FIDL error")
494 .expect("Start failed");
495 lifecycle_client.stop().expect("Stop failed");
496 fasync::OnSignals::new(
497 &lifecycle_client.into_channel().expect("into_channel failed"),
498 zx::Signals::CHANNEL_PEER_CLOSED,
499 )
500 .await
501 .expect("OnSignals failed");
502 },
503 async {
504 let service = StorageHostService::new();
506 service
507 .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
508 .await
509 .expect("Run failed");
510 },
511 async {
512 let server = setup_server(
514 512,
515 8,
516 vec![PartitionInfo {
517 label: "part".to_string(),
518 type_guid: Guid::from_bytes([0xabu8; 16]),
519 instance_guid: Guid::from_bytes([0xcdu8; 16]),
520 start_block: 4,
521 num_blocks: 1,
522 flags: 0,
523 }],
524 )
525 .await;
526 let _ = server.serve(volume_stream).await;
527 }
528 .fuse(),
529 );
530 }
531
532 #[fuchsia::test]
533 async fn admin_shutdown() {
534 let (outgoing_dir, outgoing_dir_server) =
535 fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
536 let (block_client, block_server) =
537 fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
538 let volume_stream =
539 fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
540 .into_stream();
541
542 futures::join!(
543 async {
544 let client =
546 connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
547 .unwrap();
548 let admin_client =
549 connect_to_protocol_at_dir_svc::<ffs::AdminMarker>(&outgoing_dir).unwrap();
550 client
551 .start(block_client, &fstartup::StartOptions::default())
552 .await
553 .expect("FIDL error")
554 .expect("Start failed");
555 admin_client.shutdown().await.expect("admin shutdown failed");
556 fasync::OnSignals::new(
557 &admin_client.into_channel().expect("into_channel failed"),
558 zx::Signals::CHANNEL_PEER_CLOSED,
559 )
560 .await
561 .expect("OnSignals failed");
562 },
563 async {
564 let service = StorageHostService::new();
566 service.run(outgoing_dir_server.into_channel(), None).await.expect("Run failed");
567 },
568 async {
569 let server = setup_server(
571 512,
572 8,
573 vec![PartitionInfo {
574 label: "part".to_string(),
575 type_guid: Guid::from_bytes([0xabu8; 16]),
576 instance_guid: Guid::from_bytes([0xcdu8; 16]),
577 start_block: 4,
578 num_blocks: 1,
579 flags: 0,
580 }],
581 )
582 .await;
583 let _ = server.serve(volume_stream).await;
584 }
585 .fuse(),
586 );
587 }
588
589 #[fuchsia::test]
590 async fn transaction_lifecycle() {
591 let (outgoing_dir, outgoing_dir_server) =
592 fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
593 let (lifecycle_client, lifecycle_server) =
594 fidl::endpoints::create_proxy::<LifecycleMarker>();
595 let (block_client, block_server) =
596 fidl::endpoints::create_endpoints::<fblock::BlockMarker>();
597 let volume_stream =
598 fidl::endpoints::ServerEnd::<fvolume::VolumeMarker>::from(block_server.into_channel())
599 .into_stream();
600
601 futures::join!(
602 async {
603 connect_to_protocol_at_dir_svc::<fstartup::StartupMarker>(&outgoing_dir)
605 .unwrap()
606 .start(block_client, &fstartup::StartOptions::default())
607 .await
608 .expect("FIDL error")
609 .expect("Start failed");
610
611 let pm_client = connect_to_protocol_at_dir_svc::<
612 fpartitions::PartitionsManagerMarker,
613 >(&outgoing_dir)
614 .unwrap();
615 let transaction = pm_client
616 .create_transaction()
617 .await
618 .expect("FIDL error")
619 .expect("create_transaction failed");
620
621 pm_client
622 .create_transaction()
623 .await
624 .expect("FIDL error")
625 .expect_err("create_transaction should fail while other txn exists");
626
627 pm_client
628 .commit_transaction(transaction)
629 .await
630 .expect("FIDL error")
631 .expect("commit_transaction failed");
632
633 {
634 let _transaction = pm_client
635 .create_transaction()
636 .await
637 .expect("FIDL error")
638 .expect("create_transaction should succeed after committing txn");
639 }
640
641 pm_client
642 .create_transaction()
643 .await
644 .expect("FIDL error")
645 .expect("create_transaction should succeed after dropping txn");
646
647 lifecycle_client.stop().expect("Stop failed");
648 fasync::OnSignals::new(
649 &lifecycle_client.into_channel().expect("into_channel failed"),
650 zx::Signals::CHANNEL_PEER_CLOSED,
651 )
652 .await
653 .expect("OnSignals failed");
654 },
655 async {
656 let service = StorageHostService::new();
658 service
659 .run(outgoing_dir_server.into_channel(), Some(lifecycle_server.into_channel()))
660 .await
661 .expect("Run failed");
662 },
663 async {
664 let server = setup_server(
666 512,
667 16,
668 vec![PartitionInfo {
669 label: "part".to_string(),
670 type_guid: Guid::from_bytes([0xabu8; 16]),
671 instance_guid: Guid::from_bytes([0xcdu8; 16]),
672 start_block: 4,
673 num_blocks: 1,
674 flags: 0,
675 }],
676 )
677 .await;
678 let _ = server.serve(volume_stream).await;
679 }
680 .fuse(),
681 );
682 }
683}