1use fidl::endpoints::ServerEnd;
13use fidl_fuchsia_hardware_block::{BlockProxy, MAX_TRANSFER_UNBOUNDED};
14use fidl_fuchsia_hardware_block_partition::PartitionProxy;
15use fidl_fuchsia_hardware_block_volume::VolumeProxy;
16use fuchsia_sync::Mutex;
17use futures::channel::oneshot;
18use futures::executor::block_on;
19use lazy_static::lazy_static;
20use std::collections::HashMap;
21use std::future::Future;
22use std::hash::{Hash, Hasher};
23use std::mem::MaybeUninit;
24use std::num::NonZero;
25use std::ops::{DerefMut, Range};
26use std::pin::Pin;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU16, Ordering};
29use std::task::{Context, Poll, Waker};
30use zx::sys::zx_handle_t;
31use zx::{self as zx, HandleBased as _};
32use {
33 fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
34 fuchsia_async as fasync, storage_trace as trace,
35};
36
37pub use cache::Cache;
38
39pub use block::Flag as BlockFlags;
40
41pub use block_protocol::*;
42
43pub mod cache;
44
45const TEMP_VMO_SIZE: usize = 65536;
46
47pub const NO_TRACE_ID: u64 = 0;
49
50pub use block_driver::{BlockIoFlag, BlockOpcode};
51
52fn fidl_to_status(error: fidl::Error) -> zx::Status {
53 match error {
54 fidl::Error::ClientChannelClosed { status, .. } => status,
55 _ => zx::Status::INTERNAL,
56 }
57}
58
59fn opcode_str(opcode: u8) -> &'static str {
60 match BlockOpcode::from_primitive(opcode) {
61 Some(BlockOpcode::Read) => "read",
62 Some(BlockOpcode::Write) => "write",
63 Some(BlockOpcode::Flush) => "flush",
64 Some(BlockOpcode::Trim) => "trim",
65 Some(BlockOpcode::CloseVmo) => "close_vmo",
66 None => "unknown",
67 }
68}
69
70fn generate_trace_flow_id(request_id: u32) -> u64 {
73 lazy_static! {
74 static ref SELF_HANDLE: zx_handle_t = fuchsia_runtime::process_self().raw_handle();
75 };
76 *SELF_HANDLE as u64 + (request_id as u64) << 32
77}
78
79pub enum BufferSlice<'a> {
80 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
81 Memory(&'a [u8]),
82}
83
84impl<'a> BufferSlice<'a> {
85 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
86 BufferSlice::VmoId { vmo_id, offset, length }
87 }
88}
89
90impl<'a> From<&'a [u8]> for BufferSlice<'a> {
91 fn from(buf: &'a [u8]) -> Self {
92 BufferSlice::Memory(buf)
93 }
94}
95
96pub enum MutableBufferSlice<'a> {
97 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
98 Memory(&'a mut [u8]),
99}
100
101impl<'a> MutableBufferSlice<'a> {
102 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
103 MutableBufferSlice::VmoId { vmo_id, offset, length }
104 }
105}
106
107impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
108 fn from(buf: &'a mut [u8]) -> Self {
109 MutableBufferSlice::Memory(buf)
110 }
111}
112
113#[derive(Default)]
114struct RequestState {
115 result: Option<zx::Status>,
116 waker: Option<Waker>,
117}
118
119#[derive(Default)]
120struct FifoState {
121 fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
123
124 next_request_id: u32,
126
127 queue: std::collections::VecDeque<BlockFifoRequest>,
129
130 map: HashMap<u32, RequestState>,
132
133 poller_waker: Option<Waker>,
135
136 attach_barrier: bool,
138}
139
140impl FifoState {
141 fn terminate(&mut self) {
142 self.fifo.take();
143 for (_, request_state) in self.map.iter_mut() {
144 request_state.result.get_or_insert(zx::Status::CANCELED);
145 if let Some(waker) = request_state.waker.take() {
146 waker.wake();
147 }
148 }
149 if let Some(waker) = self.poller_waker.take() {
150 waker.wake();
151 }
152 }
153
154 fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
156 let fifo = if let Some(fifo) = self.fifo.as_ref() {
157 fifo
158 } else {
159 return true;
160 };
161
162 loop {
163 let slice = self.queue.as_slices().0;
164 if slice.is_empty() {
165 return false;
166 }
167 match fifo.try_write(context, slice) {
168 Poll::Ready(Ok(sent)) => {
169 self.queue.drain(0..sent);
170 }
171 Poll::Ready(Err(_)) => {
172 self.terminate();
173 return true;
174 }
175 Poll::Pending => {
176 return false;
177 }
178 }
179 }
180 }
181}
182
183type FifoStateRef = Arc<Mutex<FifoState>>;
184
185struct ResponseFuture {
187 request_id: u32,
188 fifo_state: FifoStateRef,
189}
190
191impl ResponseFuture {
192 fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
193 ResponseFuture { request_id, fifo_state }
194 }
195}
196
197impl Future for ResponseFuture {
198 type Output = Result<(), zx::Status>;
199
200 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
201 let mut state = self.fifo_state.lock();
202 let request_state = state.map.get_mut(&self.request_id).unwrap();
203 if let Some(result) = request_state.result {
204 Poll::Ready(result.into())
205 } else {
206 request_state.waker.replace(context.waker().clone());
207 Poll::Pending
208 }
209 }
210}
211
212impl Drop for ResponseFuture {
213 fn drop(&mut self) {
214 self.fifo_state.lock().map.remove(&self.request_id).unwrap();
215 }
216}
217
218#[derive(Debug)]
220#[must_use]
221pub struct VmoId(AtomicU16);
222
223impl VmoId {
224 pub fn new(id: u16) -> Self {
226 Self(AtomicU16::new(id))
227 }
228
229 pub fn take(&self) -> Self {
231 Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
232 }
233
234 pub fn is_valid(&self) -> bool {
235 self.id() != block_driver::BLOCK_VMOID_INVALID
236 }
237
238 #[must_use]
240 pub fn into_id(self) -> u16 {
241 self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
242 }
243
244 pub fn id(&self) -> u16 {
245 self.0.load(Ordering::Relaxed)
246 }
247}
248
249impl PartialEq for VmoId {
250 fn eq(&self, other: &Self) -> bool {
251 self.id() == other.id()
252 }
253}
254
255impl Eq for VmoId {}
256
257impl Drop for VmoId {
258 fn drop(&mut self) {
259 assert_eq!(
260 self.0.load(Ordering::Relaxed),
261 block_driver::BLOCK_VMOID_INVALID,
262 "Did you forget to detach?"
263 );
264 }
265}
266
267impl Hash for VmoId {
268 fn hash<H: Hasher>(&self, state: &mut H) {
269 self.id().hash(state);
270 }
271}
272
273pub trait BlockClient: Send + Sync {
277 fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
279
280 fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
282
283 fn read_at(
285 &self,
286 buffer_slice: MutableBufferSlice<'_>,
287 device_offset: u64,
288 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
289 self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
290 }
291
292 fn read_at_with_opts(
293 &self,
294 buffer_slice: MutableBufferSlice<'_>,
295 device_offset: u64,
296 opts: ReadOptions,
297 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
298 self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
299 }
300
301 fn read_at_with_opts_traced(
302 &self,
303 buffer_slice: MutableBufferSlice<'_>,
304 device_offset: u64,
305 opts: ReadOptions,
306 trace_flow_id: u64,
307 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
308
309 fn write_at(
311 &self,
312 buffer_slice: BufferSlice<'_>,
313 device_offset: u64,
314 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
315 self.write_at_with_opts_traced(
316 buffer_slice,
317 device_offset,
318 WriteOptions::default(),
319 NO_TRACE_ID,
320 )
321 }
322
323 fn write_at_with_opts(
324 &self,
325 buffer_slice: BufferSlice<'_>,
326 device_offset: u64,
327 opts: WriteOptions,
328 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
329 self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
330 }
331
332 fn write_at_with_opts_traced(
333 &self,
334 buffer_slice: BufferSlice<'_>,
335 device_offset: u64,
336 opts: WriteOptions,
337 trace_flow_id: u64,
338 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
339
340 fn trim(
342 &self,
343 device_range: Range<u64>,
344 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
345 self.trim_traced(device_range, NO_TRACE_ID)
346 }
347
348 fn trim_traced(
349 &self,
350 device_range: Range<u64>,
351 trace_flow_id: u64,
352 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
353
354 fn barrier(&self);
359
360 fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
361 self.flush_traced(NO_TRACE_ID)
362 }
363
364 fn flush_traced(
366 &self,
367 trace_flow_id: u64,
368 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
369
370 fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
372
373 fn block_size(&self) -> u32;
375
376 fn block_count(&self) -> u64;
378
379 fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
381
382 fn block_flags(&self) -> BlockFlags;
384
385 fn is_connected(&self) -> bool;
387}
388
389struct Common {
390 block_size: u32,
391 block_count: u64,
392 max_transfer_blocks: Option<NonZero<u32>>,
393 block_flags: BlockFlags,
394 fifo_state: FifoStateRef,
395 temp_vmo: futures::lock::Mutex<zx::Vmo>,
396 temp_vmo_id: VmoId,
397}
398
399impl Common {
400 fn new(
401 fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
402 info: &block::BlockInfo,
403 temp_vmo: zx::Vmo,
404 temp_vmo_id: VmoId,
405 ) -> Self {
406 let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
407 fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
408 Self {
409 block_size: info.block_size,
410 block_count: info.block_count,
411 max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
412 NonZero::new(info.max_transfer_size / info.block_size)
413 } else {
414 None
415 },
416 block_flags: info.flags,
417 fifo_state,
418 temp_vmo: futures::lock::Mutex::new(temp_vmo),
419 temp_vmo_id,
420 }
421 }
422
423 fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
424 if bytes % self.block_size as u64 != 0 {
425 Err(zx::Status::INVALID_ARGS)
426 } else {
427 Ok(bytes / self.block_size as u64)
428 }
429 }
430
431 async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
433 async move {
434 let (request_id, trace_flow_id) = {
435 let mut state = self.fifo_state.lock();
436
437 let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
438 if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
439 && state.attach_barrier
440 {
441 flags |= BlockIoFlag::PRE_BARRIER;
442 request.command.flags = flags.bits();
443 state.attach_barrier = false;
444 }
445
446 if state.fifo.is_none() {
447 return Err(zx::Status::CANCELED);
449 }
450 trace::duration!(
451 c"storage",
452 c"block_client::send::start",
453 "op" => opcode_str(request.command.opcode)
454 );
455 let request_id = state.next_request_id;
456 state.next_request_id = state.next_request_id.overflowing_add(1).0;
457 assert!(
458 state.map.insert(request_id, RequestState::default()).is_none(),
459 "request id in use!"
460 );
461 request.reqid = request_id;
462 if request.trace_flow_id == NO_TRACE_ID {
463 request.trace_flow_id = generate_trace_flow_id(request_id);
464 }
465 let trace_flow_id = request.trace_flow_id;
466 trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
467 state.queue.push_back(request);
468 if let Some(waker) = state.poller_waker.clone() {
469 state.poll_send_requests(&mut Context::from_waker(&waker));
470 }
471 (request_id, trace_flow_id)
472 };
473 ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
474 trace::duration!(c"storage", c"block_client::send::end");
475 trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
476 Ok(())
477 }
478 .await
479 }
480
481 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
482 self.send(BlockFifoRequest {
483 command: BlockFifoCommand {
484 opcode: BlockOpcode::CloseVmo.into_primitive(),
485 flags: 0,
486 ..Default::default()
487 },
488 vmoid: vmo_id.into_id(),
489 ..Default::default()
490 })
491 .await
492 }
493
494 async fn read_at(
495 &self,
496 buffer_slice: MutableBufferSlice<'_>,
497 device_offset: u64,
498 opts: ReadOptions,
499 trace_flow_id: u64,
500 ) -> Result<(), zx::Status> {
501 match buffer_slice {
502 MutableBufferSlice::VmoId { vmo_id, offset, length } => {
503 self.send(BlockFifoRequest {
504 command: BlockFifoCommand {
505 opcode: BlockOpcode::Read.into_primitive(),
506 flags: 0,
507 ..Default::default()
508 },
509 vmoid: vmo_id.id(),
510 length: self
511 .to_blocks(length)?
512 .try_into()
513 .map_err(|_| zx::Status::INVALID_ARGS)?,
514 vmo_offset: self.to_blocks(offset)?,
515 dev_offset: self.to_blocks(device_offset)?,
516 trace_flow_id,
517 dun: opts.inline_crypto_options.dun,
518 slot: opts.inline_crypto_options.slot,
519 ..Default::default()
520 })
521 .await?
522 }
523 MutableBufferSlice::Memory(mut slice) => {
524 let temp_vmo = self.temp_vmo.lock().await;
525 let mut device_block = self.to_blocks(device_offset)?;
526 loop {
527 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
528 let block_count = self.to_blocks(to_do as u64)? as u32;
529 self.send(BlockFifoRequest {
530 command: BlockFifoCommand {
531 opcode: BlockOpcode::Read.into_primitive(),
532 flags: 0,
533 ..Default::default()
534 },
535 vmoid: self.temp_vmo_id.id(),
536 length: block_count,
537 vmo_offset: 0,
538 dev_offset: device_block,
539 trace_flow_id,
540 dun: opts.inline_crypto_options.dun,
541 slot: opts.inline_crypto_options.slot,
542 ..Default::default()
543 })
544 .await?;
545 temp_vmo.read(&mut slice[..to_do], 0)?;
546 if to_do == slice.len() {
547 break;
548 }
549 device_block += block_count as u64;
550 slice = &mut slice[to_do..];
551 }
552 }
553 }
554 Ok(())
555 }
556
557 async fn write_at(
558 &self,
559 buffer_slice: BufferSlice<'_>,
560 device_offset: u64,
561 opts: WriteOptions,
562 trace_flow_id: u64,
563 ) -> Result<(), zx::Status> {
564 let mut flags = BlockIoFlag::empty();
565
566 if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
567 flags |= BlockIoFlag::FORCE_ACCESS;
568 }
569
570 if opts.flags.contains(WriteFlags::PRE_BARRIER) {
571 flags |= BlockIoFlag::PRE_BARRIER;
572 }
573
574 match buffer_slice {
575 BufferSlice::VmoId { vmo_id, offset, length } => {
576 self.send(BlockFifoRequest {
577 command: BlockFifoCommand {
578 opcode: BlockOpcode::Write.into_primitive(),
579 flags: flags.bits(),
580 ..Default::default()
581 },
582 vmoid: vmo_id.id(),
583 length: self
584 .to_blocks(length)?
585 .try_into()
586 .map_err(|_| zx::Status::INVALID_ARGS)?,
587 vmo_offset: self.to_blocks(offset)?,
588 dev_offset: self.to_blocks(device_offset)?,
589 trace_flow_id,
590 dun: opts.inline_crypto_options.dun,
591 slot: opts.inline_crypto_options.slot,
592 ..Default::default()
593 })
594 .await?;
595 }
596 BufferSlice::Memory(mut slice) => {
597 let temp_vmo = self.temp_vmo.lock().await;
598 let mut device_block = self.to_blocks(device_offset)?;
599 loop {
600 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
601 let block_count = self.to_blocks(to_do as u64)? as u32;
602 temp_vmo.write(&slice[..to_do], 0)?;
603 self.send(BlockFifoRequest {
604 command: BlockFifoCommand {
605 opcode: BlockOpcode::Write.into_primitive(),
606 flags: flags.bits(),
607 ..Default::default()
608 },
609 vmoid: self.temp_vmo_id.id(),
610 length: block_count,
611 vmo_offset: 0,
612 dev_offset: device_block,
613 trace_flow_id,
614 dun: opts.inline_crypto_options.dun,
615 slot: opts.inline_crypto_options.slot,
616 ..Default::default()
617 })
618 .await?;
619 if to_do == slice.len() {
620 break;
621 }
622 device_block += block_count as u64;
623 slice = &slice[to_do..];
624 }
625 }
626 }
627 Ok(())
628 }
629
630 async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
631 let length = self.to_blocks(device_range.end - device_range.start)? as u32;
632 let dev_offset = self.to_blocks(device_range.start)?;
633 self.send(BlockFifoRequest {
634 command: BlockFifoCommand {
635 opcode: BlockOpcode::Trim.into_primitive(),
636 flags: 0,
637 ..Default::default()
638 },
639 vmoid: block_driver::BLOCK_VMOID_INVALID,
640 length,
641 dev_offset,
642 trace_flow_id,
643 ..Default::default()
644 })
645 .await
646 }
647
648 async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
649 self.send(BlockFifoRequest {
650 command: BlockFifoCommand {
651 opcode: BlockOpcode::Flush.into_primitive(),
652 flags: 0,
653 ..Default::default()
654 },
655 vmoid: block_driver::BLOCK_VMOID_INVALID,
656 trace_flow_id,
657 ..Default::default()
658 })
659 .await
660 }
661
662 fn barrier(&self) {
663 self.fifo_state.lock().attach_barrier = true;
664 }
665
666 fn block_size(&self) -> u32 {
667 self.block_size
668 }
669
670 fn block_count(&self) -> u64 {
671 self.block_count
672 }
673
674 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
675 self.max_transfer_blocks.clone()
676 }
677
678 fn block_flags(&self) -> BlockFlags {
679 self.block_flags
680 }
681
682 fn is_connected(&self) -> bool {
683 self.fifo_state.lock().fifo.is_some()
684 }
685}
686
687impl Drop for Common {
688 fn drop(&mut self) {
689 let _ = self.temp_vmo_id.take().into_id();
692 self.fifo_state.lock().terminate();
693 }
694}
695
696pub struct RemoteBlockClient {
698 session: block::SessionProxy,
699 common: Common,
700}
701
702pub trait AsBlockProxy {
703 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
704
705 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
706}
707
708impl<T: AsBlockProxy> AsBlockProxy for &T {
709 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
710 AsBlockProxy::get_info(*self)
711 }
712 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
713 AsBlockProxy::open_session(*self, session)
714 }
715}
716
717macro_rules! impl_as_block_proxy {
718 ($name:ident) => {
719 impl AsBlockProxy for $name {
720 async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
721 $name::get_info(self).await
722 }
723
724 fn open_session(
725 &self,
726 session: ServerEnd<block::SessionMarker>,
727 ) -> Result<(), fidl::Error> {
728 $name::open_session(self, session)
729 }
730 }
731 };
732}
733
734impl_as_block_proxy!(BlockProxy);
735impl_as_block_proxy!(PartitionProxy);
736impl_as_block_proxy!(VolumeProxy);
737
738impl RemoteBlockClient {
739 pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
741 let info =
742 remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
743 let (session, server) = fidl::endpoints::create_proxy();
744 let () = remote.open_session(server).map_err(fidl_to_status)?;
745 Self::from_session(info, session).await
746 }
747
748 pub async fn from_session(
749 info: block::BlockInfo,
750 session: block::SessionProxy,
751 ) -> Result<Self, zx::Status> {
752 let fifo =
753 session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
754 let fifo = fasync::Fifo::from_fifo(fifo);
755 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
756 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
757 let vmo_id =
758 session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
759 let vmo_id = VmoId::new(vmo_id.id);
760 Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
761 }
762}
763
764impl BlockClient for RemoteBlockClient {
765 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
766 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
767 let vmo_id = self
768 .session
769 .attach_vmo(dup)
770 .await
771 .map_err(fidl_to_status)?
772 .map_err(zx::Status::from_raw)?;
773 Ok(VmoId::new(vmo_id.id))
774 }
775
776 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
777 self.common.detach_vmo(vmo_id).await
778 }
779
780 async fn read_at_with_opts_traced(
781 &self,
782 buffer_slice: MutableBufferSlice<'_>,
783 device_offset: u64,
784 opts: ReadOptions,
785 trace_flow_id: u64,
786 ) -> Result<(), zx::Status> {
787 self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
788 }
789
790 async fn write_at_with_opts_traced(
791 &self,
792 buffer_slice: BufferSlice<'_>,
793 device_offset: u64,
794 opts: WriteOptions,
795 trace_flow_id: u64,
796 ) -> Result<(), zx::Status> {
797 self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
798 }
799
800 async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
801 self.common.trim(range, trace_flow_id).await
802 }
803
804 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
805 self.common.flush(trace_flow_id).await
806 }
807
808 fn barrier(&self) {
809 self.common.barrier()
810 }
811
812 async fn close(&self) -> Result<(), zx::Status> {
813 let () =
814 self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
815 Ok(())
816 }
817
818 fn block_size(&self) -> u32 {
819 self.common.block_size()
820 }
821
822 fn block_count(&self) -> u64 {
823 self.common.block_count()
824 }
825
826 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
827 self.common.max_transfer_blocks()
828 }
829
830 fn block_flags(&self) -> BlockFlags {
831 self.common.block_flags()
832 }
833
834 fn is_connected(&self) -> bool {
835 self.common.is_connected()
836 }
837}
838
839pub struct RemoteBlockClientSync {
840 session: block::SessionSynchronousProxy,
841 common: Common,
842}
843
844impl RemoteBlockClientSync {
845 pub fn new(
849 client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
850 ) -> Result<Self, zx::Status> {
851 let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
852 let info = remote
853 .get_info(zx::MonotonicInstant::INFINITE)
854 .map_err(fidl_to_status)?
855 .map_err(zx::Status::from_raw)?;
856 let (client, server) = fidl::endpoints::create_endpoints();
857 let () = remote.open_session(server).map_err(fidl_to_status)?;
858 let session = block::SessionSynchronousProxy::new(client.into_channel());
859 let fifo = session
860 .get_fifo(zx::MonotonicInstant::INFINITE)
861 .map_err(fidl_to_status)?
862 .map_err(zx::Status::from_raw)?;
863 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
864 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
865 let vmo_id = session
866 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
867 .map_err(fidl_to_status)?
868 .map_err(zx::Status::from_raw)?;
869 let vmo_id = VmoId::new(vmo_id.id);
870
871 let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
874 std::thread::spawn(move || {
875 let mut executor = fasync::LocalExecutor::new();
876 let fifo = fasync::Fifo::from_fifo(fifo);
877 let common = Common::new(fifo, &info, temp_vmo, vmo_id);
878 let fifo_state = common.fifo_state.clone();
879 let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
880 executor.run_singlethreaded(FifoPoller { fifo_state });
881 });
882 block_on(receiver).map_err(|_| zx::Status::CANCELED)?
883 }
884
885 pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
886 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
887 let vmo_id = self
888 .session
889 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
890 .map_err(fidl_to_status)?
891 .map_err(zx::Status::from_raw)?;
892 Ok(VmoId::new(vmo_id.id))
893 }
894
895 pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
896 block_on(self.common.detach_vmo(vmo_id))
897 }
898
899 pub fn read_at(
900 &self,
901 buffer_slice: MutableBufferSlice<'_>,
902 device_offset: u64,
903 ) -> Result<(), zx::Status> {
904 block_on(self.common.read_at(
905 buffer_slice,
906 device_offset,
907 ReadOptions::default(),
908 NO_TRACE_ID,
909 ))
910 }
911
912 pub fn write_at(
913 &self,
914 buffer_slice: BufferSlice<'_>,
915 device_offset: u64,
916 ) -> Result<(), zx::Status> {
917 block_on(self.common.write_at(
918 buffer_slice,
919 device_offset,
920 WriteOptions::default(),
921 NO_TRACE_ID,
922 ))
923 }
924
925 pub fn flush(&self) -> Result<(), zx::Status> {
926 block_on(self.common.flush(NO_TRACE_ID))
927 }
928
929 pub fn close(&self) -> Result<(), zx::Status> {
930 let () = self
931 .session
932 .close(zx::MonotonicInstant::INFINITE)
933 .map_err(fidl_to_status)?
934 .map_err(zx::Status::from_raw)?;
935 Ok(())
936 }
937
938 pub fn block_size(&self) -> u32 {
939 self.common.block_size()
940 }
941
942 pub fn block_count(&self) -> u64 {
943 self.common.block_count()
944 }
945
946 pub fn is_connected(&self) -> bool {
947 self.common.is_connected()
948 }
949}
950
951impl Drop for RemoteBlockClientSync {
952 fn drop(&mut self) {
953 let _ = self.close();
955 }
956}
957
958struct FifoPoller {
960 fifo_state: FifoStateRef,
961}
962
963impl Future for FifoPoller {
964 type Output = ();
965
966 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
967 let mut state_lock = self.fifo_state.lock();
968 let state = state_lock.deref_mut(); if state.poll_send_requests(context) {
972 return Poll::Ready(());
973 }
974
975 let fifo = state.fifo.as_ref().unwrap(); loop {
978 let mut response = MaybeUninit::uninit();
979 match fifo.try_read(context, &mut response) {
980 Poll::Pending => {
981 state.poller_waker = Some(context.waker().clone());
982 return Poll::Pending;
983 }
984 Poll::Ready(Ok(_)) => {
985 let response = unsafe { response.assume_init() };
986 let request_id = response.reqid;
987 if let Some(request_state) = state.map.get_mut(&request_id) {
989 request_state.result.replace(zx::Status::from_raw(response.status));
990 if let Some(waker) = request_state.waker.take() {
991 waker.wake();
992 }
993 }
994 }
995 Poll::Ready(Err(_)) => {
996 state.terminate();
997 return Poll::Ready(());
998 }
999 }
1000 }
1001 }
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006 use super::{
1007 BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
1008 RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
1009 };
1010 use block_server::{BlockServer, DeviceInfo, PartitionInfo};
1011 use fidl::endpoints::RequestStream as _;
1012 use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
1013 use futures::join;
1014 use futures::stream::StreamExt as _;
1015 use futures::stream::futures_unordered::FuturesUnordered;
1016 use ramdevice_client::RamdiskClient;
1017 use std::borrow::Cow;
1018 use std::num::NonZero;
1019 use std::sync::Arc;
1020 use std::sync::atomic::{AtomicBool, Ordering};
1021 use {fidl_fuchsia_hardware_block as block, fuchsia_async as fasync};
1022
1023 const RAMDISK_BLOCK_SIZE: u64 = 1024;
1024 const RAMDISK_BLOCK_COUNT: u64 = 1024;
1025
1026 pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
1027 let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
1028 .await
1029 .expect("RamdiskClient::create failed");
1030 let client_end = ramdisk.open().expect("ramdisk.open failed");
1031 let proxy = client_end.into_proxy();
1032 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1033 assert_eq!(block_client.block_size(), 1024);
1034 let client_end = ramdisk.open().expect("ramdisk.open failed");
1035 let proxy = client_end.into_proxy();
1036 (ramdisk, proxy, block_client)
1037 }
1038
1039 #[fuchsia::test]
1040 async fn test_against_ram_disk() {
1041 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1042
1043 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1044 vmo.write(b"hello", 5).expect("vmo.write failed");
1045 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1046 block_client
1047 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1048 .await
1049 .expect("write_at failed");
1050 block_client
1051 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1052 .await
1053 .expect("read_at failed");
1054 let mut buf: [u8; 5] = Default::default();
1055 vmo.read(&mut buf, 1029).expect("vmo.read failed");
1056 assert_eq!(&buf, b"hello");
1057 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1058 }
1059
1060 #[fuchsia::test]
1061 async fn test_alignment() {
1062 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1063 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1064 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1065 block_client
1066 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1067 .await
1068 .expect_err("expected failure due to bad alignment");
1069 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1070 }
1071
1072 #[fuchsia::test]
1073 async fn test_parallel_io() {
1074 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1075 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1076 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1077 let mut reads = Vec::new();
1078 for _ in 0..1024 {
1079 reads.push(
1080 block_client
1081 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1082 .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1083 );
1084 }
1085 futures::future::join_all(reads).await;
1086 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1087 }
1088
1089 #[fuchsia::test]
1090 async fn test_closed_device() {
1091 let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1092 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1093 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1094 let mut reads = Vec::new();
1095 for _ in 0..1024 {
1096 reads.push(
1097 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1098 );
1099 }
1100 assert!(block_client.is_connected());
1101 let _ = futures::join!(futures::future::join_all(reads), async {
1102 ramdisk.destroy().await.expect("ramdisk.destroy failed")
1103 });
1104 while block_client
1106 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1107 .await
1108 .is_ok()
1109 {}
1110
1111 while block_client.is_connected() {
1114 fasync::Timer::new(fasync::MonotonicInstant::after(
1116 zx::MonotonicDuration::from_millis(500),
1117 ))
1118 .await;
1119 }
1120
1121 assert_eq!(block_client.is_connected(), false);
1123 let _ = block_client.detach_vmo(vmo_id).await;
1124 }
1125
1126 #[fuchsia::test]
1127 async fn test_cancelled_reads() {
1128 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1129 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1130 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1131 {
1132 let mut reads = FuturesUnordered::new();
1133 for _ in 0..1024 {
1134 reads.push(
1135 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1136 );
1137 }
1138 for _ in 0..500 {
1140 reads.next().await;
1141 }
1142 }
1143 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1144 }
1145
1146 #[fuchsia::test]
1147 async fn test_parallel_large_read_and_write_with_memory_succeds() {
1148 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1149 let block_client_ref = &block_client;
1150 let test_one = |offset, len, fill| async move {
1151 let buf = vec![fill; len];
1152 block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1153 let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1155 block_client_ref
1156 .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1157 .await
1158 .expect("read_at failed");
1159 assert_eq!(
1160 &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1161 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1162 );
1163 assert_eq!(
1164 &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1165 &buf[..]
1166 );
1167 assert_eq!(
1168 &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1169 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1170 );
1171 };
1172 const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1173 join!(
1174 test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1175 test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1176 );
1177 }
1178
1179 struct FakeBlockServer<'a> {
1183 server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1184 channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1185 fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1186 }
1187
1188 impl<'a> FakeBlockServer<'a> {
1189 fn new(
1201 server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1202 channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1203 fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1204 ) -> FakeBlockServer<'a> {
1205 FakeBlockServer {
1206 server_channel: Some(server_channel),
1207 channel_handler: Box::new(channel_handler),
1208 fifo_handler: Box::new(fifo_handler),
1209 }
1210 }
1211
1212 async fn run(&mut self) {
1214 let server = self.server_channel.take().unwrap();
1215
1216 let (server_fifo, client_fifo) =
1218 zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1219 .expect("Fifo::create failed");
1220 let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1221
1222 let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1223 let fifo_future = Abortable::new(
1224 async {
1225 let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1226 let (mut reader, mut writer) = fifo.async_io();
1227 let mut request = BlockFifoRequest::default();
1228 loop {
1229 match reader.read_entries(&mut request).await {
1230 Ok(1) => {}
1231 Err(zx::Status::PEER_CLOSED) => break,
1232 Err(e) => panic!("read_entry failed {:?}", e),
1233 _ => unreachable!(),
1234 };
1235
1236 let response = self.fifo_handler.as_ref()(request);
1237 writer
1238 .write_entries(std::slice::from_ref(&response))
1239 .await
1240 .expect("write_entries failed");
1241 }
1242 },
1243 fifo_future_abort_registration,
1244 );
1245
1246 let channel_future = async {
1247 server
1248 .into_stream()
1249 .for_each_concurrent(None, |request| async {
1250 let request = request.expect("unexpected fidl error");
1251
1252 match request {
1253 block::BlockRequest::GetInfo { responder } => {
1254 responder
1255 .send(Ok(&block::BlockInfo {
1256 block_count: 1024,
1257 block_size: 512,
1258 max_transfer_size: 1024 * 1024,
1259 flags: block::Flag::empty(),
1260 }))
1261 .expect("send failed");
1262 }
1263 block::BlockRequest::OpenSession { session, control_handle: _ } => {
1264 let stream = session.into_stream();
1265 stream
1266 .for_each(|request| async {
1267 let request = request.expect("unexpected fidl error");
1268 if self.channel_handler.as_ref()(&request) {
1271 return;
1272 }
1273 match request {
1274 block::SessionRequest::GetFifo { responder } => {
1275 match maybe_server_fifo.lock().take() {
1276 Some(fifo) => {
1277 responder.send(Ok(fifo.downcast()))
1278 }
1279 None => responder.send(Err(
1280 zx::Status::NO_RESOURCES.into_raw(),
1281 )),
1282 }
1283 .expect("send failed")
1284 }
1285 block::SessionRequest::AttachVmo {
1286 vmo: _,
1287 responder,
1288 } => responder
1289 .send(Ok(&block::VmoId { id: 1 }))
1290 .expect("send failed"),
1291 block::SessionRequest::Close { responder } => {
1292 fifo_future_abort.abort();
1293 responder.send(Ok(())).expect("send failed")
1294 }
1295 }
1296 })
1297 .await
1298 }
1299 _ => panic!("Unexpected message"),
1300 }
1301 })
1302 .await;
1303 };
1304
1305 let _result = join!(fifo_future, channel_future);
1306 }
1308 }
1309
1310 #[fuchsia::test]
1311 async fn test_block_close_is_called() {
1312 let close_called = fuchsia_sync::Mutex::new(false);
1313 let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1314
1315 std::thread::spawn(move || {
1316 let _block_client =
1317 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1318 });
1320
1321 let channel_handler = |request: &block::SessionRequest| -> bool {
1322 if let block::SessionRequest::Close { .. } = request {
1323 *close_called.lock() = true;
1324 }
1325 false
1326 };
1327 FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1328
1329 assert!(*close_called.lock());
1331 }
1332
1333 #[fuchsia::test]
1334 async fn test_block_flush_is_called() {
1335 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1336
1337 struct Interface {
1338 flush_called: Arc<AtomicBool>,
1339 }
1340 impl block_server::async_interface::Interface for Interface {
1341 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1342 Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1343 device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1344 max_transfer_blocks: None,
1345 block_range: Some(0..1000),
1346 type_guid: [0; 16],
1347 instance_guid: [0; 16],
1348 name: "foo".to_string(),
1349 flags: 0,
1350 })))
1351 }
1352
1353 async fn read(
1354 &self,
1355 _device_block_offset: u64,
1356 _block_count: u32,
1357 _vmo: &Arc<zx::Vmo>,
1358 _vmo_offset: u64,
1359 _trace_flow_id: Option<NonZero<u64>>,
1360 ) -> Result<(), zx::Status> {
1361 unreachable!();
1362 }
1363
1364 fn barrier(&self) -> Result<(), zx::Status> {
1365 unreachable!()
1366 }
1367
1368 async fn write(
1369 &self,
1370 _device_block_offset: u64,
1371 _block_count: u32,
1372 _vmo: &Arc<zx::Vmo>,
1373 _vmo_offset: u64,
1374 _write_opts: WriteOptions,
1375 _trace_flow_id: Option<NonZero<u64>>,
1376 ) -> Result<(), zx::Status> {
1377 unreachable!();
1378 }
1379
1380 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1381 self.flush_called.store(true, Ordering::Relaxed);
1382 Ok(())
1383 }
1384
1385 async fn trim(
1386 &self,
1387 _device_block_offset: u64,
1388 _block_count: u32,
1389 _trace_flow_id: Option<NonZero<u64>>,
1390 ) -> Result<(), zx::Status> {
1391 unreachable!();
1392 }
1393 }
1394
1395 let flush_called = Arc::new(AtomicBool::new(false));
1396
1397 futures::join!(
1398 async {
1399 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1400
1401 block_client.flush().await.expect("flush failed");
1402 },
1403 async {
1404 let block_server = BlockServer::new(
1405 512,
1406 Arc::new(Interface { flush_called: flush_called.clone() }),
1407 );
1408 block_server.handle_requests(stream.cast_stream()).await.unwrap();
1409 }
1410 );
1411
1412 assert!(flush_called.load(Ordering::Relaxed));
1413 }
1414
1415 #[fuchsia::test]
1416 async fn test_trace_flow_ids_set() {
1417 let (proxy, server) = fidl::endpoints::create_proxy();
1418
1419 futures::join!(
1420 async {
1421 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1422 block_client.flush().await.expect("flush failed");
1423 },
1424 async {
1425 let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1426 let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1427 if request.trace_flow_id > 0 {
1428 *flow_id.lock() = Some(request.trace_flow_id);
1429 }
1430 BlockFifoResponse {
1431 status: zx::Status::OK.into_raw(),
1432 reqid: request.reqid,
1433 ..Default::default()
1434 }
1435 };
1436 FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1437 assert!(flow_id.lock().is_some());
1439 }
1440 );
1441 }
1442}