1use fidl::endpoints::ServerEnd;
13use fidl_fuchsia_hardware_block::{BlockProxy, MAX_TRANSFER_UNBOUNDED};
14use fidl_fuchsia_hardware_block_partition::PartitionProxy;
15use fidl_fuchsia_hardware_block_volume::VolumeProxy;
16use fuchsia_sync::Mutex;
17use futures::channel::oneshot;
18use futures::executor::block_on;
19use std::collections::HashMap;
20use std::future::Future;
21use std::hash::{Hash, Hasher};
22use std::mem::MaybeUninit;
23use std::num::NonZero;
24use std::ops::{DerefMut, Range};
25use std::pin::Pin;
26use std::sync::atomic::{AtomicU16, Ordering};
27use std::sync::{Arc, LazyLock};
28use std::task::{Context, Poll, Waker};
29use zx::sys::zx_handle_t;
30use zx::{self as zx, HandleBased as _};
31use {
32 fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
33 fuchsia_async as fasync, storage_trace as trace,
34};
35
36pub use cache::Cache;
37
38pub use block::Flag as BlockFlags;
39
40pub use block_protocol::*;
41
42pub mod cache;
43
44const TEMP_VMO_SIZE: usize = 65536;
45
46pub const NO_TRACE_ID: u64 = 0;
48
49pub use block_driver::{BlockIoFlag, BlockOpcode};
50
51fn fidl_to_status(error: fidl::Error) -> zx::Status {
52 match error {
53 fidl::Error::ClientChannelClosed { status, .. } => status,
54 _ => zx::Status::INTERNAL,
55 }
56}
57
58fn opcode_str(opcode: u8) -> &'static str {
59 match BlockOpcode::from_primitive(opcode) {
60 Some(BlockOpcode::Read) => "read",
61 Some(BlockOpcode::Write) => "write",
62 Some(BlockOpcode::Flush) => "flush",
63 Some(BlockOpcode::Trim) => "trim",
64 Some(BlockOpcode::CloseVmo) => "close_vmo",
65 None => "unknown",
66 }
67}
68
69fn generate_trace_flow_id(request_id: u32) -> u64 {
72 static SELF_HANDLE: LazyLock<zx_handle_t> =
73 LazyLock::new(|| fuchsia_runtime::process_self().raw_handle());
74 *SELF_HANDLE as u64 + (request_id as u64) << 32
75}
76
77pub enum BufferSlice<'a> {
78 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
79 Memory(&'a [u8]),
80}
81
82impl<'a> BufferSlice<'a> {
83 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
84 BufferSlice::VmoId { vmo_id, offset, length }
85 }
86}
87
88impl<'a> From<&'a [u8]> for BufferSlice<'a> {
89 fn from(buf: &'a [u8]) -> Self {
90 BufferSlice::Memory(buf)
91 }
92}
93
94pub enum MutableBufferSlice<'a> {
95 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
96 Memory(&'a mut [u8]),
97}
98
99impl<'a> MutableBufferSlice<'a> {
100 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
101 MutableBufferSlice::VmoId { vmo_id, offset, length }
102 }
103}
104
105impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
106 fn from(buf: &'a mut [u8]) -> Self {
107 MutableBufferSlice::Memory(buf)
108 }
109}
110
111#[derive(Default)]
112struct RequestState {
113 result: Option<zx::Status>,
114 waker: Option<Waker>,
115}
116
117#[derive(Default)]
118struct FifoState {
119 fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
121
122 next_request_id: u32,
124
125 queue: std::collections::VecDeque<BlockFifoRequest>,
127
128 map: HashMap<u32, RequestState>,
130
131 poller_waker: Option<Waker>,
133
134 attach_barrier: bool,
136}
137
138impl FifoState {
139 fn terminate(&mut self) {
140 self.fifo.take();
141 for (_, request_state) in self.map.iter_mut() {
142 request_state.result.get_or_insert(zx::Status::CANCELED);
143 if let Some(waker) = request_state.waker.take() {
144 waker.wake();
145 }
146 }
147 if let Some(waker) = self.poller_waker.take() {
148 waker.wake();
149 }
150 }
151
152 fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
154 let fifo = if let Some(fifo) = self.fifo.as_ref() {
155 fifo
156 } else {
157 return true;
158 };
159
160 loop {
161 let slice = self.queue.as_slices().0;
162 if slice.is_empty() {
163 return false;
164 }
165 match fifo.try_write(context, slice) {
166 Poll::Ready(Ok(sent)) => {
167 self.queue.drain(0..sent);
168 }
169 Poll::Ready(Err(_)) => {
170 self.terminate();
171 return true;
172 }
173 Poll::Pending => {
174 return false;
175 }
176 }
177 }
178 }
179}
180
181type FifoStateRef = Arc<Mutex<FifoState>>;
182
183struct ResponseFuture {
185 request_id: u32,
186 fifo_state: FifoStateRef,
187}
188
189impl ResponseFuture {
190 fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
191 ResponseFuture { request_id, fifo_state }
192 }
193}
194
195impl Future for ResponseFuture {
196 type Output = Result<(), zx::Status>;
197
198 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
199 let mut state = self.fifo_state.lock();
200 let request_state = state.map.get_mut(&self.request_id).unwrap();
201 if let Some(result) = request_state.result {
202 Poll::Ready(result.into())
203 } else {
204 request_state.waker.replace(context.waker().clone());
205 Poll::Pending
206 }
207 }
208}
209
210impl Drop for ResponseFuture {
211 fn drop(&mut self) {
212 self.fifo_state.lock().map.remove(&self.request_id).unwrap();
213 }
214}
215
216#[derive(Debug)]
218#[must_use]
219pub struct VmoId(AtomicU16);
220
221impl VmoId {
222 pub fn new(id: u16) -> Self {
224 Self(AtomicU16::new(id))
225 }
226
227 pub fn take(&self) -> Self {
229 Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
230 }
231
232 pub fn is_valid(&self) -> bool {
233 self.id() != block_driver::BLOCK_VMOID_INVALID
234 }
235
236 #[must_use]
238 pub fn into_id(self) -> u16 {
239 self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
240 }
241
242 pub fn id(&self) -> u16 {
243 self.0.load(Ordering::Relaxed)
244 }
245}
246
247impl PartialEq for VmoId {
248 fn eq(&self, other: &Self) -> bool {
249 self.id() == other.id()
250 }
251}
252
253impl Eq for VmoId {}
254
255impl Drop for VmoId {
256 fn drop(&mut self) {
257 assert_eq!(
258 self.0.load(Ordering::Relaxed),
259 block_driver::BLOCK_VMOID_INVALID,
260 "Did you forget to detach?"
261 );
262 }
263}
264
265impl Hash for VmoId {
266 fn hash<H: Hasher>(&self, state: &mut H) {
267 self.id().hash(state);
268 }
269}
270
271pub trait BlockClient: Send + Sync {
275 fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
277
278 fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
280
281 fn read_at(
283 &self,
284 buffer_slice: MutableBufferSlice<'_>,
285 device_offset: u64,
286 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
287 self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
288 }
289
290 fn read_at_with_opts(
291 &self,
292 buffer_slice: MutableBufferSlice<'_>,
293 device_offset: u64,
294 opts: ReadOptions,
295 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
296 self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
297 }
298
299 fn read_at_with_opts_traced(
300 &self,
301 buffer_slice: MutableBufferSlice<'_>,
302 device_offset: u64,
303 opts: ReadOptions,
304 trace_flow_id: u64,
305 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
306
307 fn write_at(
309 &self,
310 buffer_slice: BufferSlice<'_>,
311 device_offset: u64,
312 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
313 self.write_at_with_opts_traced(
314 buffer_slice,
315 device_offset,
316 WriteOptions::default(),
317 NO_TRACE_ID,
318 )
319 }
320
321 fn write_at_with_opts(
322 &self,
323 buffer_slice: BufferSlice<'_>,
324 device_offset: u64,
325 opts: WriteOptions,
326 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
327 self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
328 }
329
330 fn write_at_with_opts_traced(
331 &self,
332 buffer_slice: BufferSlice<'_>,
333 device_offset: u64,
334 opts: WriteOptions,
335 trace_flow_id: u64,
336 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
337
338 fn trim(
340 &self,
341 device_range: Range<u64>,
342 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
343 self.trim_traced(device_range, NO_TRACE_ID)
344 }
345
346 fn trim_traced(
347 &self,
348 device_range: Range<u64>,
349 trace_flow_id: u64,
350 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
351
352 fn barrier(&self);
357
358 fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
359 self.flush_traced(NO_TRACE_ID)
360 }
361
362 fn flush_traced(
364 &self,
365 trace_flow_id: u64,
366 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
367
368 fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
370
371 fn block_size(&self) -> u32;
373
374 fn block_count(&self) -> u64;
376
377 fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
379
380 fn block_flags(&self) -> BlockFlags;
382
383 fn is_connected(&self) -> bool;
385}
386
387struct Common {
388 block_size: u32,
389 block_count: u64,
390 max_transfer_blocks: Option<NonZero<u32>>,
391 block_flags: BlockFlags,
392 fifo_state: FifoStateRef,
393 temp_vmo: futures::lock::Mutex<zx::Vmo>,
394 temp_vmo_id: VmoId,
395}
396
397impl Common {
398 fn new(
399 fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
400 info: &block::BlockInfo,
401 temp_vmo: zx::Vmo,
402 temp_vmo_id: VmoId,
403 ) -> Self {
404 let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
405 fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
406 Self {
407 block_size: info.block_size,
408 block_count: info.block_count,
409 max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
410 NonZero::new(info.max_transfer_size / info.block_size)
411 } else {
412 None
413 },
414 block_flags: info.flags,
415 fifo_state,
416 temp_vmo: futures::lock::Mutex::new(temp_vmo),
417 temp_vmo_id,
418 }
419 }
420
421 fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
422 if bytes % self.block_size as u64 != 0 {
423 Err(zx::Status::INVALID_ARGS)
424 } else {
425 Ok(bytes / self.block_size as u64)
426 }
427 }
428
429 async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
431 async move {
432 let (request_id, trace_flow_id) = {
433 let mut state = self.fifo_state.lock();
434
435 let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
436 if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
437 && state.attach_barrier
438 {
439 flags |= BlockIoFlag::PRE_BARRIER;
440 request.command.flags = flags.bits();
441 state.attach_barrier = false;
442 }
443
444 if state.fifo.is_none() {
445 return Err(zx::Status::CANCELED);
447 }
448 trace::duration!(
449 c"storage",
450 c"block_client::send::start",
451 "op" => opcode_str(request.command.opcode)
452 );
453 let request_id = state.next_request_id;
454 state.next_request_id = state.next_request_id.overflowing_add(1).0;
455 assert!(
456 state.map.insert(request_id, RequestState::default()).is_none(),
457 "request id in use!"
458 );
459 request.reqid = request_id;
460 if request.trace_flow_id == NO_TRACE_ID {
461 request.trace_flow_id = generate_trace_flow_id(request_id);
462 }
463 let trace_flow_id = request.trace_flow_id;
464 trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
465 state.queue.push_back(request);
466 if let Some(waker) = state.poller_waker.clone() {
467 state.poll_send_requests(&mut Context::from_waker(&waker));
468 }
469 (request_id, trace_flow_id)
470 };
471 ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
472 trace::duration!(c"storage", c"block_client::send::end");
473 trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
474 Ok(())
475 }
476 .await
477 }
478
479 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
480 self.send(BlockFifoRequest {
481 command: BlockFifoCommand {
482 opcode: BlockOpcode::CloseVmo.into_primitive(),
483 flags: 0,
484 ..Default::default()
485 },
486 vmoid: vmo_id.into_id(),
487 ..Default::default()
488 })
489 .await
490 }
491
492 async fn read_at(
493 &self,
494 buffer_slice: MutableBufferSlice<'_>,
495 device_offset: u64,
496 opts: ReadOptions,
497 trace_flow_id: u64,
498 ) -> Result<(), zx::Status> {
499 match buffer_slice {
500 MutableBufferSlice::VmoId { vmo_id, offset, length } => {
501 self.send(BlockFifoRequest {
502 command: BlockFifoCommand {
503 opcode: BlockOpcode::Read.into_primitive(),
504 flags: 0,
505 ..Default::default()
506 },
507 vmoid: vmo_id.id(),
508 length: self
509 .to_blocks(length)?
510 .try_into()
511 .map_err(|_| zx::Status::INVALID_ARGS)?,
512 vmo_offset: self.to_blocks(offset)?,
513 dev_offset: self.to_blocks(device_offset)?,
514 trace_flow_id,
515 dun: opts.inline_crypto_options.dun,
516 slot: opts.inline_crypto_options.slot,
517 ..Default::default()
518 })
519 .await?
520 }
521 MutableBufferSlice::Memory(mut slice) => {
522 let temp_vmo = self.temp_vmo.lock().await;
523 let mut device_block = self.to_blocks(device_offset)?;
524 loop {
525 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
526 let block_count = self.to_blocks(to_do as u64)? as u32;
527 self.send(BlockFifoRequest {
528 command: BlockFifoCommand {
529 opcode: BlockOpcode::Read.into_primitive(),
530 flags: 0,
531 ..Default::default()
532 },
533 vmoid: self.temp_vmo_id.id(),
534 length: block_count,
535 vmo_offset: 0,
536 dev_offset: device_block,
537 trace_flow_id,
538 dun: opts.inline_crypto_options.dun,
539 slot: opts.inline_crypto_options.slot,
540 ..Default::default()
541 })
542 .await?;
543 temp_vmo.read(&mut slice[..to_do], 0)?;
544 if to_do == slice.len() {
545 break;
546 }
547 device_block += block_count as u64;
548 slice = &mut slice[to_do..];
549 }
550 }
551 }
552 Ok(())
553 }
554
555 async fn write_at(
556 &self,
557 buffer_slice: BufferSlice<'_>,
558 device_offset: u64,
559 opts: WriteOptions,
560 trace_flow_id: u64,
561 ) -> Result<(), zx::Status> {
562 let mut flags = BlockIoFlag::empty();
563
564 if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
565 flags |= BlockIoFlag::FORCE_ACCESS;
566 }
567
568 if opts.flags.contains(WriteFlags::PRE_BARRIER) {
569 flags |= BlockIoFlag::PRE_BARRIER;
570 }
571
572 match buffer_slice {
573 BufferSlice::VmoId { vmo_id, offset, length } => {
574 self.send(BlockFifoRequest {
575 command: BlockFifoCommand {
576 opcode: BlockOpcode::Write.into_primitive(),
577 flags: flags.bits(),
578 ..Default::default()
579 },
580 vmoid: vmo_id.id(),
581 length: self
582 .to_blocks(length)?
583 .try_into()
584 .map_err(|_| zx::Status::INVALID_ARGS)?,
585 vmo_offset: self.to_blocks(offset)?,
586 dev_offset: self.to_blocks(device_offset)?,
587 trace_flow_id,
588 dun: opts.inline_crypto_options.dun,
589 slot: opts.inline_crypto_options.slot,
590 ..Default::default()
591 })
592 .await?;
593 }
594 BufferSlice::Memory(mut slice) => {
595 let temp_vmo = self.temp_vmo.lock().await;
596 let mut device_block = self.to_blocks(device_offset)?;
597 loop {
598 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
599 let block_count = self.to_blocks(to_do as u64)? as u32;
600 temp_vmo.write(&slice[..to_do], 0)?;
601 self.send(BlockFifoRequest {
602 command: BlockFifoCommand {
603 opcode: BlockOpcode::Write.into_primitive(),
604 flags: flags.bits(),
605 ..Default::default()
606 },
607 vmoid: self.temp_vmo_id.id(),
608 length: block_count,
609 vmo_offset: 0,
610 dev_offset: device_block,
611 trace_flow_id,
612 dun: opts.inline_crypto_options.dun,
613 slot: opts.inline_crypto_options.slot,
614 ..Default::default()
615 })
616 .await?;
617 if to_do == slice.len() {
618 break;
619 }
620 device_block += block_count as u64;
621 slice = &slice[to_do..];
622 }
623 }
624 }
625 Ok(())
626 }
627
628 async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
629 let length = self.to_blocks(device_range.end - device_range.start)? as u32;
630 let dev_offset = self.to_blocks(device_range.start)?;
631 self.send(BlockFifoRequest {
632 command: BlockFifoCommand {
633 opcode: BlockOpcode::Trim.into_primitive(),
634 flags: 0,
635 ..Default::default()
636 },
637 vmoid: block_driver::BLOCK_VMOID_INVALID,
638 length,
639 dev_offset,
640 trace_flow_id,
641 ..Default::default()
642 })
643 .await
644 }
645
646 async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
647 self.send(BlockFifoRequest {
648 command: BlockFifoCommand {
649 opcode: BlockOpcode::Flush.into_primitive(),
650 flags: 0,
651 ..Default::default()
652 },
653 vmoid: block_driver::BLOCK_VMOID_INVALID,
654 trace_flow_id,
655 ..Default::default()
656 })
657 .await
658 }
659
660 fn barrier(&self) {
661 self.fifo_state.lock().attach_barrier = true;
662 }
663
664 fn block_size(&self) -> u32 {
665 self.block_size
666 }
667
668 fn block_count(&self) -> u64 {
669 self.block_count
670 }
671
672 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
673 self.max_transfer_blocks.clone()
674 }
675
676 fn block_flags(&self) -> BlockFlags {
677 self.block_flags
678 }
679
680 fn is_connected(&self) -> bool {
681 self.fifo_state.lock().fifo.is_some()
682 }
683}
684
685impl Drop for Common {
686 fn drop(&mut self) {
687 let _ = self.temp_vmo_id.take().into_id();
690 self.fifo_state.lock().terminate();
691 }
692}
693
694pub struct RemoteBlockClient {
696 session: block::SessionProxy,
697 common: Common,
698}
699
700pub trait AsBlockProxy {
701 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
702
703 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
704}
705
706impl<T: AsBlockProxy> AsBlockProxy for &T {
707 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
708 AsBlockProxy::get_info(*self)
709 }
710 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
711 AsBlockProxy::open_session(*self, session)
712 }
713}
714
715macro_rules! impl_as_block_proxy {
716 ($name:ident) => {
717 impl AsBlockProxy for $name {
718 async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
719 $name::get_info(self).await
720 }
721
722 fn open_session(
723 &self,
724 session: ServerEnd<block::SessionMarker>,
725 ) -> Result<(), fidl::Error> {
726 $name::open_session(self, session)
727 }
728 }
729 };
730}
731
732impl_as_block_proxy!(BlockProxy);
733impl_as_block_proxy!(PartitionProxy);
734impl_as_block_proxy!(VolumeProxy);
735
736impl RemoteBlockClient {
737 pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
739 let info =
740 remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
741 let (session, server) = fidl::endpoints::create_proxy();
742 let () = remote.open_session(server).map_err(fidl_to_status)?;
743 Self::from_session(info, session).await
744 }
745
746 pub async fn from_session(
747 info: block::BlockInfo,
748 session: block::SessionProxy,
749 ) -> Result<Self, zx::Status> {
750 let fifo =
751 session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
752 let fifo = fasync::Fifo::from_fifo(fifo);
753 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
754 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
755 let vmo_id =
756 session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
757 let vmo_id = VmoId::new(vmo_id.id);
758 Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
759 }
760}
761
762impl BlockClient for RemoteBlockClient {
763 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
764 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
765 let vmo_id = self
766 .session
767 .attach_vmo(dup)
768 .await
769 .map_err(fidl_to_status)?
770 .map_err(zx::Status::from_raw)?;
771 Ok(VmoId::new(vmo_id.id))
772 }
773
774 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
775 self.common.detach_vmo(vmo_id).await
776 }
777
778 async fn read_at_with_opts_traced(
779 &self,
780 buffer_slice: MutableBufferSlice<'_>,
781 device_offset: u64,
782 opts: ReadOptions,
783 trace_flow_id: u64,
784 ) -> Result<(), zx::Status> {
785 self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
786 }
787
788 async fn write_at_with_opts_traced(
789 &self,
790 buffer_slice: BufferSlice<'_>,
791 device_offset: u64,
792 opts: WriteOptions,
793 trace_flow_id: u64,
794 ) -> Result<(), zx::Status> {
795 self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
796 }
797
798 async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
799 self.common.trim(range, trace_flow_id).await
800 }
801
802 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
803 self.common.flush(trace_flow_id).await
804 }
805
806 fn barrier(&self) {
807 self.common.barrier()
808 }
809
810 async fn close(&self) -> Result<(), zx::Status> {
811 let () =
812 self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
813 Ok(())
814 }
815
816 fn block_size(&self) -> u32 {
817 self.common.block_size()
818 }
819
820 fn block_count(&self) -> u64 {
821 self.common.block_count()
822 }
823
824 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
825 self.common.max_transfer_blocks()
826 }
827
828 fn block_flags(&self) -> BlockFlags {
829 self.common.block_flags()
830 }
831
832 fn is_connected(&self) -> bool {
833 self.common.is_connected()
834 }
835}
836
837pub struct RemoteBlockClientSync {
838 session: block::SessionSynchronousProxy,
839 common: Common,
840}
841
842impl RemoteBlockClientSync {
843 pub fn new(
847 client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
848 ) -> Result<Self, zx::Status> {
849 let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
850 let info = remote
851 .get_info(zx::MonotonicInstant::INFINITE)
852 .map_err(fidl_to_status)?
853 .map_err(zx::Status::from_raw)?;
854 let (client, server) = fidl::endpoints::create_endpoints();
855 let () = remote.open_session(server).map_err(fidl_to_status)?;
856 let session = block::SessionSynchronousProxy::new(client.into_channel());
857 let fifo = session
858 .get_fifo(zx::MonotonicInstant::INFINITE)
859 .map_err(fidl_to_status)?
860 .map_err(zx::Status::from_raw)?;
861 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
862 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
863 let vmo_id = session
864 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
865 .map_err(fidl_to_status)?
866 .map_err(zx::Status::from_raw)?;
867 let vmo_id = VmoId::new(vmo_id.id);
868
869 let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
872 std::thread::spawn(move || {
873 let mut executor = fasync::LocalExecutor::default();
874 let fifo = fasync::Fifo::from_fifo(fifo);
875 let common = Common::new(fifo, &info, temp_vmo, vmo_id);
876 let fifo_state = common.fifo_state.clone();
877 let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
878 executor.run_singlethreaded(FifoPoller { fifo_state });
879 });
880 block_on(receiver).map_err(|_| zx::Status::CANCELED)?
881 }
882
883 pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
884 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
885 let vmo_id = self
886 .session
887 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
888 .map_err(fidl_to_status)?
889 .map_err(zx::Status::from_raw)?;
890 Ok(VmoId::new(vmo_id.id))
891 }
892
893 pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
894 block_on(self.common.detach_vmo(vmo_id))
895 }
896
897 pub fn read_at(
898 &self,
899 buffer_slice: MutableBufferSlice<'_>,
900 device_offset: u64,
901 ) -> Result<(), zx::Status> {
902 block_on(self.common.read_at(
903 buffer_slice,
904 device_offset,
905 ReadOptions::default(),
906 NO_TRACE_ID,
907 ))
908 }
909
910 pub fn write_at(
911 &self,
912 buffer_slice: BufferSlice<'_>,
913 device_offset: u64,
914 ) -> Result<(), zx::Status> {
915 block_on(self.common.write_at(
916 buffer_slice,
917 device_offset,
918 WriteOptions::default(),
919 NO_TRACE_ID,
920 ))
921 }
922
923 pub fn flush(&self) -> Result<(), zx::Status> {
924 block_on(self.common.flush(NO_TRACE_ID))
925 }
926
927 pub fn close(&self) -> Result<(), zx::Status> {
928 let () = self
929 .session
930 .close(zx::MonotonicInstant::INFINITE)
931 .map_err(fidl_to_status)?
932 .map_err(zx::Status::from_raw)?;
933 Ok(())
934 }
935
936 pub fn block_size(&self) -> u32 {
937 self.common.block_size()
938 }
939
940 pub fn block_count(&self) -> u64 {
941 self.common.block_count()
942 }
943
944 pub fn is_connected(&self) -> bool {
945 self.common.is_connected()
946 }
947}
948
949impl Drop for RemoteBlockClientSync {
950 fn drop(&mut self) {
951 let _ = self.close();
953 }
954}
955
956struct FifoPoller {
958 fifo_state: FifoStateRef,
959}
960
961impl Future for FifoPoller {
962 type Output = ();
963
964 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
965 let mut state_lock = self.fifo_state.lock();
966 let state = state_lock.deref_mut(); if state.poll_send_requests(context) {
970 return Poll::Ready(());
971 }
972
973 let fifo = state.fifo.as_ref().unwrap(); loop {
976 let mut response = MaybeUninit::uninit();
977 match fifo.try_read(context, &mut response) {
978 Poll::Pending => {
979 state.poller_waker = Some(context.waker().clone());
980 return Poll::Pending;
981 }
982 Poll::Ready(Ok(_)) => {
983 let response = unsafe { response.assume_init() };
984 let request_id = response.reqid;
985 if let Some(request_state) = state.map.get_mut(&request_id) {
987 request_state.result.replace(zx::Status::from_raw(response.status));
988 if let Some(waker) = request_state.waker.take() {
989 waker.wake();
990 }
991 }
992 }
993 Poll::Ready(Err(_)) => {
994 state.terminate();
995 return Poll::Ready(());
996 }
997 }
998 }
999 }
1000}
1001
1002#[cfg(test)]
1003mod tests {
1004 use super::{
1005 BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
1006 RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
1007 };
1008 use block_protocol::ReadOptions;
1009 use block_server::{BlockServer, DeviceInfo, PartitionInfo};
1010 use fidl::endpoints::RequestStream as _;
1011 use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
1012 use futures::join;
1013 use futures::stream::StreamExt as _;
1014 use futures::stream::futures_unordered::FuturesUnordered;
1015 use ramdevice_client::RamdiskClient;
1016 use std::borrow::Cow;
1017 use std::num::NonZero;
1018 use std::sync::Arc;
1019 use std::sync::atomic::{AtomicBool, Ordering};
1020 use {fidl_fuchsia_hardware_block as block, fuchsia_async as fasync};
1021
1022 const RAMDISK_BLOCK_SIZE: u64 = 1024;
1023 const RAMDISK_BLOCK_COUNT: u64 = 1024;
1024
1025 pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
1026 let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
1027 .await
1028 .expect("RamdiskClient::create failed");
1029 let client_end = ramdisk.open().expect("ramdisk.open failed");
1030 let proxy = client_end.into_proxy();
1031 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1032 assert_eq!(block_client.block_size(), 1024);
1033 let client_end = ramdisk.open().expect("ramdisk.open failed");
1034 let proxy = client_end.into_proxy();
1035 (ramdisk, proxy, block_client)
1036 }
1037
1038 #[fuchsia::test]
1039 async fn test_against_ram_disk() {
1040 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1041
1042 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1043 vmo.write(b"hello", 5).expect("vmo.write failed");
1044 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1045 block_client
1046 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1047 .await
1048 .expect("write_at failed");
1049 block_client
1050 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1051 .await
1052 .expect("read_at failed");
1053 let mut buf: [u8; 5] = Default::default();
1054 vmo.read(&mut buf, 1029).expect("vmo.read failed");
1055 assert_eq!(&buf, b"hello");
1056 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1057 }
1058
1059 #[fuchsia::test]
1060 async fn test_alignment() {
1061 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1062 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1063 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1064 block_client
1065 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1066 .await
1067 .expect_err("expected failure due to bad alignment");
1068 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1069 }
1070
1071 #[fuchsia::test]
1072 async fn test_parallel_io() {
1073 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1074 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1075 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1076 let mut reads = Vec::new();
1077 for _ in 0..1024 {
1078 reads.push(
1079 block_client
1080 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1081 .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1082 );
1083 }
1084 futures::future::join_all(reads).await;
1085 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1086 }
1087
1088 #[fuchsia::test]
1089 async fn test_closed_device() {
1090 let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1091 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1092 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1093 let mut reads = Vec::new();
1094 for _ in 0..1024 {
1095 reads.push(
1096 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1097 );
1098 }
1099 assert!(block_client.is_connected());
1100 let _ = futures::join!(futures::future::join_all(reads), async {
1101 ramdisk.destroy().await.expect("ramdisk.destroy failed")
1102 });
1103 while block_client
1105 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1106 .await
1107 .is_ok()
1108 {}
1109
1110 while block_client.is_connected() {
1113 fasync::Timer::new(fasync::MonotonicInstant::after(
1115 zx::MonotonicDuration::from_millis(500),
1116 ))
1117 .await;
1118 }
1119
1120 assert_eq!(block_client.is_connected(), false);
1122 let _ = block_client.detach_vmo(vmo_id).await;
1123 }
1124
1125 #[fuchsia::test]
1126 async fn test_cancelled_reads() {
1127 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1128 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1129 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1130 {
1131 let mut reads = FuturesUnordered::new();
1132 for _ in 0..1024 {
1133 reads.push(
1134 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1135 );
1136 }
1137 for _ in 0..500 {
1139 reads.next().await;
1140 }
1141 }
1142 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1143 }
1144
1145 #[fuchsia::test]
1146 async fn test_parallel_large_read_and_write_with_memory_succeds() {
1147 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1148 let block_client_ref = &block_client;
1149 let test_one = |offset, len, fill| async move {
1150 let buf = vec![fill; len];
1151 block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1152 let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1154 block_client_ref
1155 .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1156 .await
1157 .expect("read_at failed");
1158 assert_eq!(
1159 &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1160 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1161 );
1162 assert_eq!(
1163 &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1164 &buf[..]
1165 );
1166 assert_eq!(
1167 &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1168 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1169 );
1170 };
1171 const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1172 join!(
1173 test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1174 test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1175 );
1176 }
1177
1178 struct FakeBlockServer<'a> {
1182 server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1183 channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1184 fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1185 }
1186
1187 impl<'a> FakeBlockServer<'a> {
1188 fn new(
1200 server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1201 channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1202 fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1203 ) -> FakeBlockServer<'a> {
1204 FakeBlockServer {
1205 server_channel: Some(server_channel),
1206 channel_handler: Box::new(channel_handler),
1207 fifo_handler: Box::new(fifo_handler),
1208 }
1209 }
1210
1211 async fn run(&mut self) {
1213 let server = self.server_channel.take().unwrap();
1214
1215 let (server_fifo, client_fifo) =
1217 zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1218 .expect("Fifo::create failed");
1219 let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1220
1221 let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1222 let fifo_future = Abortable::new(
1223 async {
1224 let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1225 let (mut reader, mut writer) = fifo.async_io();
1226 let mut request = BlockFifoRequest::default();
1227 loop {
1228 match reader.read_entries(&mut request).await {
1229 Ok(1) => {}
1230 Err(zx::Status::PEER_CLOSED) => break,
1231 Err(e) => panic!("read_entry failed {:?}", e),
1232 _ => unreachable!(),
1233 };
1234
1235 let response = self.fifo_handler.as_ref()(request);
1236 writer
1237 .write_entries(std::slice::from_ref(&response))
1238 .await
1239 .expect("write_entries failed");
1240 }
1241 },
1242 fifo_future_abort_registration,
1243 );
1244
1245 let channel_future = async {
1246 server
1247 .into_stream()
1248 .for_each_concurrent(None, |request| async {
1249 let request = request.expect("unexpected fidl error");
1250
1251 match request {
1252 block::BlockRequest::GetInfo { responder } => {
1253 responder
1254 .send(Ok(&block::BlockInfo {
1255 block_count: 1024,
1256 block_size: 512,
1257 max_transfer_size: 1024 * 1024,
1258 flags: block::Flag::empty(),
1259 }))
1260 .expect("send failed");
1261 }
1262 block::BlockRequest::OpenSession { session, control_handle: _ } => {
1263 let stream = session.into_stream();
1264 stream
1265 .for_each(|request| async {
1266 let request = request.expect("unexpected fidl error");
1267 if self.channel_handler.as_ref()(&request) {
1270 return;
1271 }
1272 match request {
1273 block::SessionRequest::GetFifo { responder } => {
1274 match maybe_server_fifo.lock().take() {
1275 Some(fifo) => {
1276 responder.send(Ok(fifo.downcast()))
1277 }
1278 None => responder.send(Err(
1279 zx::Status::NO_RESOURCES.into_raw(),
1280 )),
1281 }
1282 .expect("send failed")
1283 }
1284 block::SessionRequest::AttachVmo {
1285 vmo: _,
1286 responder,
1287 } => responder
1288 .send(Ok(&block::VmoId { id: 1 }))
1289 .expect("send failed"),
1290 block::SessionRequest::Close { responder } => {
1291 fifo_future_abort.abort();
1292 responder.send(Ok(())).expect("send failed")
1293 }
1294 }
1295 })
1296 .await
1297 }
1298 _ => panic!("Unexpected message"),
1299 }
1300 })
1301 .await;
1302 };
1303
1304 let _result = join!(fifo_future, channel_future);
1305 }
1307 }
1308
1309 #[fuchsia::test]
1310 async fn test_block_close_is_called() {
1311 let close_called = fuchsia_sync::Mutex::new(false);
1312 let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1313
1314 std::thread::spawn(move || {
1315 let _block_client =
1316 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1317 });
1319
1320 let channel_handler = |request: &block::SessionRequest| -> bool {
1321 if let block::SessionRequest::Close { .. } = request {
1322 *close_called.lock() = true;
1323 }
1324 false
1325 };
1326 FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1327
1328 assert!(*close_called.lock());
1330 }
1331
1332 #[fuchsia::test]
1333 async fn test_block_flush_is_called() {
1334 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1335
1336 struct Interface {
1337 flush_called: Arc<AtomicBool>,
1338 }
1339 impl block_server::async_interface::Interface for Interface {
1340 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1341 Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1342 device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1343 max_transfer_blocks: None,
1344 block_range: Some(0..1000),
1345 type_guid: [0; 16],
1346 instance_guid: [0; 16],
1347 name: "foo".to_string(),
1348 flags: 0,
1349 })))
1350 }
1351
1352 async fn read(
1353 &self,
1354 _device_block_offset: u64,
1355 _block_count: u32,
1356 _vmo: &Arc<zx::Vmo>,
1357 _vmo_offset: u64,
1358 _opts: ReadOptions,
1359 _trace_flow_id: Option<NonZero<u64>>,
1360 ) -> Result<(), zx::Status> {
1361 unreachable!();
1362 }
1363
1364 fn barrier(&self) -> Result<(), zx::Status> {
1365 unreachable!()
1366 }
1367
1368 async fn write(
1369 &self,
1370 _device_block_offset: u64,
1371 _block_count: u32,
1372 _vmo: &Arc<zx::Vmo>,
1373 _vmo_offset: u64,
1374 _opts: WriteOptions,
1375 _trace_flow_id: Option<NonZero<u64>>,
1376 ) -> Result<(), zx::Status> {
1377 unreachable!();
1378 }
1379
1380 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1381 self.flush_called.store(true, Ordering::Relaxed);
1382 Ok(())
1383 }
1384
1385 async fn trim(
1386 &self,
1387 _device_block_offset: u64,
1388 _block_count: u32,
1389 _trace_flow_id: Option<NonZero<u64>>,
1390 ) -> Result<(), zx::Status> {
1391 unreachable!();
1392 }
1393 }
1394
1395 let flush_called = Arc::new(AtomicBool::new(false));
1396
1397 futures::join!(
1398 async {
1399 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1400
1401 block_client.flush().await.expect("flush failed");
1402 },
1403 async {
1404 let block_server = BlockServer::new(
1405 512,
1406 Arc::new(Interface { flush_called: flush_called.clone() }),
1407 );
1408 block_server.handle_requests(stream.cast_stream()).await.unwrap();
1409 }
1410 );
1411
1412 assert!(flush_called.load(Ordering::Relaxed));
1413 }
1414
1415 #[fuchsia::test]
1416 async fn test_trace_flow_ids_set() {
1417 let (proxy, server) = fidl::endpoints::create_proxy();
1418
1419 futures::join!(
1420 async {
1421 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1422 block_client.flush().await.expect("flush failed");
1423 },
1424 async {
1425 let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1426 let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1427 if request.trace_flow_id > 0 {
1428 *flow_id.lock() = Some(request.trace_flow_id);
1429 }
1430 BlockFifoResponse {
1431 status: zx::Status::OK.into_raw(),
1432 reqid: request.reqid,
1433 ..Default::default()
1434 }
1435 };
1436 FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1437 assert!(flow_id.lock().is_some());
1439 }
1440 );
1441 }
1442}