1use fidl_fuchsia_storage_block::{MAX_TRANSFER_UNBOUNDED, VMOID_INVALID};
13use fuchsia_sync::Mutex;
14use futures::channel::oneshot;
15use futures::executor::block_on;
16use std::borrow::Borrow;
17use std::collections::HashMap;
18use std::future::Future;
19use std::hash::{Hash, Hasher};
20use std::mem::MaybeUninit;
21use std::num::NonZero;
22use std::ops::{DerefMut, Range};
23use std::pin::Pin;
24use std::sync::atomic::{AtomicU16, Ordering};
25use std::sync::{Arc, LazyLock};
26use std::task::{Context, Poll, Waker};
27use zx::sys::zx_handle_t;
28use zx::{self as zx, HandleBased as _};
29use {fidl_fuchsia_storage_block as block, fuchsia_async as fasync, storage_trace as trace};
30
31pub use cache::Cache;
32
33pub use block::DeviceFlag as BlockDeviceFlag;
34
35pub use block_protocol::*;
36
37pub mod cache;
38
39const TEMP_VMO_SIZE: usize = 65536;
40
41pub const NO_TRACE_ID: u64 = 0;
43
44pub use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
45
46fn fidl_to_status(error: fidl::Error) -> zx::Status {
47 match error {
48 fidl::Error::ClientChannelClosed { status, .. } => status,
49 _ => zx::Status::INTERNAL,
50 }
51}
52
53fn opcode_str(opcode: u8) -> &'static str {
54 match BlockOpcode::from_primitive(opcode) {
55 Some(BlockOpcode::Read) => "read",
56 Some(BlockOpcode::Write) => "write",
57 Some(BlockOpcode::Flush) => "flush",
58 Some(BlockOpcode::Trim) => "trim",
59 Some(BlockOpcode::CloseVmo) => "close_vmo",
60 None => "unknown",
61 }
62}
63
64fn generate_trace_flow_id(request_id: u32) -> u64 {
67 static SELF_HANDLE: LazyLock<zx_handle_t> =
68 LazyLock::new(|| fuchsia_runtime::process_self().raw_handle());
69 *SELF_HANDLE as u64 + (request_id as u64) << 32
70}
71
72pub enum BufferSlice<'a> {
73 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
74 Memory(&'a [u8]),
75}
76
77impl<'a> BufferSlice<'a> {
78 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
79 BufferSlice::VmoId { vmo_id, offset, length }
80 }
81}
82
83impl<'a> From<&'a [u8]> for BufferSlice<'a> {
84 fn from(buf: &'a [u8]) -> Self {
85 BufferSlice::Memory(buf)
86 }
87}
88
89pub enum MutableBufferSlice<'a> {
90 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
91 Memory(&'a mut [u8]),
92}
93
94impl<'a> MutableBufferSlice<'a> {
95 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
96 MutableBufferSlice::VmoId { vmo_id, offset, length }
97 }
98}
99
100impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
101 fn from(buf: &'a mut [u8]) -> Self {
102 MutableBufferSlice::Memory(buf)
103 }
104}
105
106#[derive(Default)]
107struct RequestState {
108 result: Option<zx::Status>,
109 waker: Option<Waker>,
110}
111
112#[derive(Default)]
113struct FifoState {
114 fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
116
117 next_request_id: u32,
119
120 queue: std::collections::VecDeque<BlockFifoRequest>,
122
123 map: HashMap<u32, RequestState>,
125
126 poller_waker: Option<Waker>,
128
129 attach_barrier: bool,
131}
132
133impl FifoState {
134 fn terminate(&mut self) {
135 self.fifo.take();
136 for (_, request_state) in self.map.iter_mut() {
137 request_state.result.get_or_insert(zx::Status::CANCELED);
138 if let Some(waker) = request_state.waker.take() {
139 waker.wake();
140 }
141 }
142 if let Some(waker) = self.poller_waker.take() {
143 waker.wake();
144 }
145 }
146
147 fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
149 let fifo = if let Some(fifo) = self.fifo.as_ref() {
150 fifo
151 } else {
152 return true;
153 };
154
155 loop {
156 let slice = self.queue.as_slices().0;
157 if slice.is_empty() {
158 return false;
159 }
160 match fifo.try_write(context, slice) {
161 Poll::Ready(Ok(sent)) => {
162 self.queue.drain(0..sent);
163 }
164 Poll::Ready(Err(_)) => {
165 self.terminate();
166 return true;
167 }
168 Poll::Pending => {
169 return false;
170 }
171 }
172 }
173 }
174}
175
176type FifoStateRef = Arc<Mutex<FifoState>>;
177
178struct ResponseFuture {
180 request_id: u32,
181 fifo_state: FifoStateRef,
182}
183
184impl ResponseFuture {
185 fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
186 ResponseFuture { request_id, fifo_state }
187 }
188}
189
190impl Future for ResponseFuture {
191 type Output = Result<(), zx::Status>;
192
193 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
194 let mut state = self.fifo_state.lock();
195 let request_state = state.map.get_mut(&self.request_id).unwrap();
196 if let Some(result) = request_state.result {
197 Poll::Ready(result.into())
198 } else {
199 request_state.waker.replace(context.waker().clone());
200 Poll::Pending
201 }
202 }
203}
204
205impl Drop for ResponseFuture {
206 fn drop(&mut self) {
207 let mut state = self.fifo_state.lock();
208 state.map.remove(&self.request_id).unwrap();
209 update_outstanding_requests_counter(state.map.len());
210 }
211}
212
213#[derive(Debug)]
215#[must_use]
216pub struct VmoId(AtomicU16);
217
218impl VmoId {
219 pub fn new(id: u16) -> Self {
221 Self(AtomicU16::new(id))
222 }
223
224 pub fn take(&self) -> Self {
226 Self(AtomicU16::new(self.0.swap(VMOID_INVALID, Ordering::Relaxed)))
227 }
228
229 pub fn is_valid(&self) -> bool {
230 self.id() != VMOID_INVALID
231 }
232
233 #[must_use]
235 pub fn into_id(self) -> u16 {
236 self.0.swap(VMOID_INVALID, Ordering::Relaxed)
237 }
238
239 pub fn id(&self) -> u16 {
240 self.0.load(Ordering::Relaxed)
241 }
242}
243
244impl PartialEq for VmoId {
245 fn eq(&self, other: &Self) -> bool {
246 self.id() == other.id()
247 }
248}
249
250impl Eq for VmoId {}
251
252impl Drop for VmoId {
253 fn drop(&mut self) {
254 assert_eq!(self.0.load(Ordering::Relaxed), VMOID_INVALID, "Did you forget to detach?");
255 }
256}
257
258impl Hash for VmoId {
259 fn hash<H: Hasher>(&self, state: &mut H) {
260 self.id().hash(state);
261 }
262}
263
264pub trait BlockClient: Send + Sync {
268 fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
270
271 fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
273
274 fn read_at(
276 &self,
277 buffer_slice: MutableBufferSlice<'_>,
278 device_offset: u64,
279 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
280 self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
281 }
282
283 fn read_at_with_opts(
284 &self,
285 buffer_slice: MutableBufferSlice<'_>,
286 device_offset: u64,
287 opts: ReadOptions,
288 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
289 self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
290 }
291
292 fn read_at_with_opts_traced(
293 &self,
294 buffer_slice: MutableBufferSlice<'_>,
295 device_offset: u64,
296 opts: ReadOptions,
297 trace_flow_id: u64,
298 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
299
300 fn write_at(
302 &self,
303 buffer_slice: BufferSlice<'_>,
304 device_offset: u64,
305 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
306 self.write_at_with_opts_traced(
307 buffer_slice,
308 device_offset,
309 WriteOptions::default(),
310 NO_TRACE_ID,
311 )
312 }
313
314 fn write_at_with_opts(
315 &self,
316 buffer_slice: BufferSlice<'_>,
317 device_offset: u64,
318 opts: WriteOptions,
319 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
320 self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
321 }
322
323 fn write_at_with_opts_traced(
324 &self,
325 buffer_slice: BufferSlice<'_>,
326 device_offset: u64,
327 opts: WriteOptions,
328 trace_flow_id: u64,
329 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
330
331 fn trim(
333 &self,
334 device_range: Range<u64>,
335 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
336 self.trim_traced(device_range, NO_TRACE_ID)
337 }
338
339 fn trim_traced(
340 &self,
341 device_range: Range<u64>,
342 trace_flow_id: u64,
343 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
344
345 fn barrier(&self);
350
351 fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
352 self.flush_traced(NO_TRACE_ID)
353 }
354
355 fn flush_traced(
357 &self,
358 trace_flow_id: u64,
359 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
360
361 fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
363
364 fn block_size(&self) -> u32;
366
367 fn block_count(&self) -> u64;
369
370 fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
372
373 fn block_flags(&self) -> BlockDeviceFlag;
375
376 fn is_connected(&self) -> bool;
378}
379
380struct Common {
381 block_size: u32,
382 block_count: u64,
383 max_transfer_blocks: Option<NonZero<u32>>,
384 block_flags: BlockDeviceFlag,
385 fifo_state: FifoStateRef,
386 temp_vmo: futures::lock::Mutex<zx::Vmo>,
387 temp_vmo_id: VmoId,
388}
389
390impl Common {
391 fn new(
392 fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
393 info: &block::BlockInfo,
394 temp_vmo: zx::Vmo,
395 temp_vmo_id: VmoId,
396 ) -> Self {
397 let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
398 fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
399 Self {
400 block_size: info.block_size,
401 block_count: info.block_count,
402 max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
403 NonZero::new(info.max_transfer_size / info.block_size)
404 } else {
405 None
406 },
407 block_flags: info.flags,
408 fifo_state,
409 temp_vmo: futures::lock::Mutex::new(temp_vmo),
410 temp_vmo_id,
411 }
412 }
413
414 fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
415 if bytes % self.block_size as u64 != 0 {
416 Err(zx::Status::INVALID_ARGS)
417 } else {
418 Ok(bytes / self.block_size as u64)
419 }
420 }
421
422 async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
424 let (request_id, trace_flow_id) = {
425 let mut state = self.fifo_state.lock();
426
427 let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
428 if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
429 && state.attach_barrier
430 {
431 flags |= BlockIoFlag::PRE_BARRIER;
432 request.command.flags = flags.bits();
433 state.attach_barrier = false;
434 }
435
436 if state.fifo.is_none() {
437 return Err(zx::Status::CANCELED);
439 }
440 trace::duration!(
441 "storage",
442 "block_client::send::start",
443 "op" => opcode_str(request.command.opcode),
444 "len" => request.length * self.block_size
445 );
446 let request_id = state.next_request_id;
447 state.next_request_id = state.next_request_id.overflowing_add(1).0;
448 assert!(
449 state.map.insert(request_id, RequestState::default()).is_none(),
450 "request id in use!"
451 );
452 update_outstanding_requests_counter(state.map.len());
453 request.reqid = request_id;
454 if request.trace_flow_id == NO_TRACE_ID {
455 request.trace_flow_id = generate_trace_flow_id(request_id);
456 }
457 let trace_flow_id = request.trace_flow_id;
458 trace::flow_begin!("storage", "block_client::send", trace_flow_id.into());
459 state.queue.push_back(request);
460 if let Some(waker) = state.poller_waker.clone() {
461 state.poll_send_requests(&mut Context::from_waker(&waker));
462 }
463 (request_id, trace_flow_id)
464 };
465 ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
466 trace::duration!("storage", "block_client::send::end");
467 trace::flow_end!("storage", "block_client::send", trace_flow_id.into());
468 Ok(())
469 }
470
471 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
472 self.send(BlockFifoRequest {
473 command: BlockFifoCommand {
474 opcode: BlockOpcode::CloseVmo.into_primitive(),
475 flags: 0,
476 ..Default::default()
477 },
478 vmoid: vmo_id.into_id(),
479 ..Default::default()
480 })
481 .await
482 }
483
484 async fn read_at(
485 &self,
486 buffer_slice: MutableBufferSlice<'_>,
487 device_offset: u64,
488 opts: ReadOptions,
489 trace_flow_id: u64,
490 ) -> Result<(), zx::Status> {
491 let mut flags = BlockIoFlag::empty();
492
493 if opts.inline_crypto.is_enabled {
494 flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
495 }
496
497 match buffer_slice {
498 MutableBufferSlice::VmoId { vmo_id, offset, length } => {
499 self.send(BlockFifoRequest {
500 command: BlockFifoCommand {
501 opcode: BlockOpcode::Read.into_primitive(),
502 flags: flags.bits(),
503 ..Default::default()
504 },
505 vmoid: vmo_id.id(),
506 length: self
507 .to_blocks(length)?
508 .try_into()
509 .map_err(|_| zx::Status::INVALID_ARGS)?,
510 vmo_offset: self.to_blocks(offset)?,
511 dev_offset: self.to_blocks(device_offset)?,
512 trace_flow_id,
513 dun: opts.inline_crypto.dun,
514 slot: opts.inline_crypto.slot,
515 ..Default::default()
516 })
517 .await?
518 }
519 MutableBufferSlice::Memory(mut slice) => {
520 let temp_vmo = self.temp_vmo.lock().await;
521 let mut device_block = self.to_blocks(device_offset)?;
522 loop {
523 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
524 let block_count = self.to_blocks(to_do as u64)? as u32;
525 self.send(BlockFifoRequest {
526 command: BlockFifoCommand {
527 opcode: BlockOpcode::Read.into_primitive(),
528 flags: flags.bits(),
529 ..Default::default()
530 },
531 vmoid: self.temp_vmo_id.id(),
532 length: block_count,
533 vmo_offset: 0,
534 dev_offset: device_block,
535 trace_flow_id,
536 dun: opts.inline_crypto.dun,
537 slot: opts.inline_crypto.slot,
538 ..Default::default()
539 })
540 .await?;
541 temp_vmo.read(&mut slice[..to_do], 0)?;
542 if to_do == slice.len() {
543 break;
544 }
545 device_block += block_count as u64;
546 slice = &mut slice[to_do..];
547 }
548 }
549 }
550 Ok(())
551 }
552
553 async fn write_at(
554 &self,
555 buffer_slice: BufferSlice<'_>,
556 device_offset: u64,
557 opts: WriteOptions,
558 trace_flow_id: u64,
559 ) -> Result<(), zx::Status> {
560 let mut flags = BlockIoFlag::empty();
561
562 if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
563 flags |= BlockIoFlag::FORCE_ACCESS;
564 }
565
566 if opts.flags.contains(WriteFlags::PRE_BARRIER) {
567 flags |= BlockIoFlag::PRE_BARRIER;
568 }
569
570 if opts.inline_crypto.is_enabled {
571 flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
572 }
573
574 match buffer_slice {
575 BufferSlice::VmoId { vmo_id, offset, length } => {
576 self.send(BlockFifoRequest {
577 command: BlockFifoCommand {
578 opcode: BlockOpcode::Write.into_primitive(),
579 flags: flags.bits(),
580 ..Default::default()
581 },
582 vmoid: vmo_id.id(),
583 length: self
584 .to_blocks(length)?
585 .try_into()
586 .map_err(|_| zx::Status::INVALID_ARGS)?,
587 vmo_offset: self.to_blocks(offset)?,
588 dev_offset: self.to_blocks(device_offset)?,
589 trace_flow_id,
590 dun: opts.inline_crypto.dun,
591 slot: opts.inline_crypto.slot,
592 ..Default::default()
593 })
594 .await?;
595 }
596 BufferSlice::Memory(mut slice) => {
597 let temp_vmo = self.temp_vmo.lock().await;
598 let mut device_block = self.to_blocks(device_offset)?;
599 loop {
600 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
601 let block_count = self.to_blocks(to_do as u64)? as u32;
602 temp_vmo.write(&slice[..to_do], 0)?;
603 self.send(BlockFifoRequest {
604 command: BlockFifoCommand {
605 opcode: BlockOpcode::Write.into_primitive(),
606 flags: flags.bits(),
607 ..Default::default()
608 },
609 vmoid: self.temp_vmo_id.id(),
610 length: block_count,
611 vmo_offset: 0,
612 dev_offset: device_block,
613 trace_flow_id,
614 dun: opts.inline_crypto.dun,
615 slot: opts.inline_crypto.slot,
616 ..Default::default()
617 })
618 .await?;
619 if to_do == slice.len() {
620 break;
621 }
622 device_block += block_count as u64;
623 slice = &slice[to_do..];
624 }
625 }
626 }
627 Ok(())
628 }
629
630 async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
631 let length = self.to_blocks(device_range.end - device_range.start)? as u32;
632 let dev_offset = self.to_blocks(device_range.start)?;
633 self.send(BlockFifoRequest {
634 command: BlockFifoCommand {
635 opcode: BlockOpcode::Trim.into_primitive(),
636 flags: 0,
637 ..Default::default()
638 },
639 vmoid: VMOID_INVALID,
640 length,
641 dev_offset,
642 trace_flow_id,
643 ..Default::default()
644 })
645 .await
646 }
647
648 async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
649 self.send(BlockFifoRequest {
650 command: BlockFifoCommand {
651 opcode: BlockOpcode::Flush.into_primitive(),
652 flags: 0,
653 ..Default::default()
654 },
655 vmoid: VMOID_INVALID,
656 trace_flow_id,
657 ..Default::default()
658 })
659 .await
660 }
661
662 fn barrier(&self) {
663 self.fifo_state.lock().attach_barrier = true;
664 }
665
666 fn block_size(&self) -> u32 {
667 self.block_size
668 }
669
670 fn block_count(&self) -> u64 {
671 self.block_count
672 }
673
674 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
675 self.max_transfer_blocks.clone()
676 }
677
678 fn block_flags(&self) -> BlockDeviceFlag {
679 self.block_flags
680 }
681
682 fn is_connected(&self) -> bool {
683 self.fifo_state.lock().fifo.is_some()
684 }
685}
686
687impl Drop for Common {
688 fn drop(&mut self) {
689 let _ = self.temp_vmo_id.take().into_id();
692 self.fifo_state.lock().terminate();
693 }
694}
695
696pub struct RemoteBlockClient {
698 session: block::SessionProxy,
699 common: Common,
700}
701
702impl RemoteBlockClient {
703 pub async fn new(remote: impl Borrow<block::BlockProxy>) -> Result<Self, zx::Status> {
705 let remote = remote.borrow();
706 let info =
707 remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
708 let (session, server) = fidl::endpoints::create_proxy();
709 let () = remote.open_session(server).map_err(fidl_to_status)?;
710 Self::from_session(info, session).await
711 }
712
713 pub async fn from_session(
714 info: block::BlockInfo,
715 session: block::SessionProxy,
716 ) -> Result<Self, zx::Status> {
717 let fifo =
718 session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
719 let fifo = fasync::Fifo::from_fifo(fifo);
720 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
721 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
722 let vmo_id =
723 session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
724 let vmo_id = VmoId::new(vmo_id.id);
725 Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
726 }
727}
728
729impl BlockClient for RemoteBlockClient {
730 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
731 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
732 let vmo_id = self
733 .session
734 .attach_vmo(dup)
735 .await
736 .map_err(fidl_to_status)?
737 .map_err(zx::Status::from_raw)?;
738 Ok(VmoId::new(vmo_id.id))
739 }
740
741 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
742 self.common.detach_vmo(vmo_id).await
743 }
744
745 async fn read_at_with_opts_traced(
746 &self,
747 buffer_slice: MutableBufferSlice<'_>,
748 device_offset: u64,
749 opts: ReadOptions,
750 trace_flow_id: u64,
751 ) -> Result<(), zx::Status> {
752 self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
753 }
754
755 async fn write_at_with_opts_traced(
756 &self,
757 buffer_slice: BufferSlice<'_>,
758 device_offset: u64,
759 opts: WriteOptions,
760 trace_flow_id: u64,
761 ) -> Result<(), zx::Status> {
762 self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
763 }
764
765 async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
766 self.common.trim(range, trace_flow_id).await
767 }
768
769 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
770 self.common.flush(trace_flow_id).await
771 }
772
773 fn barrier(&self) {
774 self.common.barrier()
775 }
776
777 async fn close(&self) -> Result<(), zx::Status> {
778 let () =
779 self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
780 Ok(())
781 }
782
783 fn block_size(&self) -> u32 {
784 self.common.block_size()
785 }
786
787 fn block_count(&self) -> u64 {
788 self.common.block_count()
789 }
790
791 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
792 self.common.max_transfer_blocks()
793 }
794
795 fn block_flags(&self) -> BlockDeviceFlag {
796 self.common.block_flags()
797 }
798
799 fn is_connected(&self) -> bool {
800 self.common.is_connected()
801 }
802}
803
804pub struct RemoteBlockClientSync {
805 session: block::SessionSynchronousProxy,
806 common: Common,
807}
808
809impl RemoteBlockClientSync {
810 pub fn new(
814 client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
815 ) -> Result<Self, zx::Status> {
816 let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
817 let info = remote
818 .get_info(zx::MonotonicInstant::INFINITE)
819 .map_err(fidl_to_status)?
820 .map_err(zx::Status::from_raw)?;
821 let (client, server) = fidl::endpoints::create_endpoints();
822 let () = remote.open_session(server).map_err(fidl_to_status)?;
823 let session = block::SessionSynchronousProxy::new(client.into_channel());
824 let fifo = session
825 .get_fifo(zx::MonotonicInstant::INFINITE)
826 .map_err(fidl_to_status)?
827 .map_err(zx::Status::from_raw)?;
828 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
829 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
830 let vmo_id = session
831 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
832 .map_err(fidl_to_status)?
833 .map_err(zx::Status::from_raw)?;
834 let vmo_id = VmoId::new(vmo_id.id);
835
836 let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
839 std::thread::spawn(move || {
840 let mut executor = fasync::LocalExecutor::default();
841 let fifo = fasync::Fifo::from_fifo(fifo);
842 let common = Common::new(fifo, &info, temp_vmo, vmo_id);
843 let fifo_state = common.fifo_state.clone();
844 let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
845 executor.run_singlethreaded(FifoPoller { fifo_state });
846 });
847 block_on(receiver).map_err(|_| zx::Status::CANCELED)?
848 }
849
850 pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
851 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
852 let vmo_id = self
853 .session
854 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
855 .map_err(fidl_to_status)?
856 .map_err(zx::Status::from_raw)?;
857 Ok(VmoId::new(vmo_id.id))
858 }
859
860 pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
861 block_on(self.common.detach_vmo(vmo_id))
862 }
863
864 pub fn read_at(
865 &self,
866 buffer_slice: MutableBufferSlice<'_>,
867 device_offset: u64,
868 ) -> Result<(), zx::Status> {
869 block_on(self.common.read_at(
870 buffer_slice,
871 device_offset,
872 ReadOptions::default(),
873 NO_TRACE_ID,
874 ))
875 }
876
877 pub fn write_at(
878 &self,
879 buffer_slice: BufferSlice<'_>,
880 device_offset: u64,
881 ) -> Result<(), zx::Status> {
882 block_on(self.common.write_at(
883 buffer_slice,
884 device_offset,
885 WriteOptions::default(),
886 NO_TRACE_ID,
887 ))
888 }
889
890 pub fn flush(&self) -> Result<(), zx::Status> {
891 block_on(self.common.flush(NO_TRACE_ID))
892 }
893
894 pub fn close(&self) -> Result<(), zx::Status> {
895 let () = self
896 .session
897 .close(zx::MonotonicInstant::INFINITE)
898 .map_err(fidl_to_status)?
899 .map_err(zx::Status::from_raw)?;
900 Ok(())
901 }
902
903 pub fn block_size(&self) -> u32 {
904 self.common.block_size()
905 }
906
907 pub fn block_count(&self) -> u64 {
908 self.common.block_count()
909 }
910
911 pub fn is_connected(&self) -> bool {
912 self.common.is_connected()
913 }
914}
915
916impl Drop for RemoteBlockClientSync {
917 fn drop(&mut self) {
918 let _ = self.close();
920 }
921}
922
923struct FifoPoller {
925 fifo_state: FifoStateRef,
926}
927
928impl Future for FifoPoller {
929 type Output = ();
930
931 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
932 let mut state_lock = self.fifo_state.lock();
933 let state = state_lock.deref_mut(); if state.poll_send_requests(context) {
937 return Poll::Ready(());
938 }
939
940 let fifo = state.fifo.as_ref().unwrap(); loop {
943 let mut response = MaybeUninit::uninit();
944 match fifo.try_read(context, &mut response) {
945 Poll::Pending => {
946 state.poller_waker = Some(context.waker().clone());
947 return Poll::Pending;
948 }
949 Poll::Ready(Ok(_)) => {
950 let response = unsafe { response.assume_init() };
951 let request_id = response.reqid;
952 if let Some(request_state) = state.map.get_mut(&request_id) {
954 request_state.result.replace(zx::Status::from_raw(response.status));
955 if let Some(waker) = request_state.waker.take() {
956 waker.wake();
957 }
958 }
959 }
960 Poll::Ready(Err(_)) => {
961 state.terminate();
962 return Poll::Ready(());
963 }
964 }
965 }
966 }
967}
968
969fn update_outstanding_requests_counter(outstanding: usize) {
970 trace::counter!("storage", "block-requests", 0, "outstanding" => outstanding);
971}
972
973#[cfg(test)]
974mod tests {
975 use super::{
976 BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
977 RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
978 };
979 use block_protocol::ReadOptions;
980 use block_server::{BlockServer, DeviceInfo, PartitionInfo};
981 use fidl::endpoints::RequestStream as _;
982 use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
983 use futures::join;
984 use futures::stream::StreamExt as _;
985 use futures::stream::futures_unordered::FuturesUnordered;
986 use ramdevice_client::RamdiskClient;
987 use std::borrow::Cow;
988 use std::num::NonZero;
989 use std::sync::Arc;
990 use std::sync::atomic::{AtomicBool, Ordering};
991 use {fidl_fuchsia_storage_block as block, fuchsia_async as fasync};
992
993 const RAMDISK_BLOCK_SIZE: u64 = 1024;
994 const RAMDISK_BLOCK_COUNT: u64 = 1024;
995
996 pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
997 let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
998 .await
999 .expect("RamdiskClient::create failed");
1000 let client_end = ramdisk.open().expect("ramdisk.open failed");
1001 let proxy = client_end.into_proxy();
1002 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1003 assert_eq!(block_client.block_size(), 1024);
1004 let client_end = ramdisk.open().expect("ramdisk.open failed");
1005 let proxy = client_end.into_proxy();
1006 (ramdisk, proxy, block_client)
1007 }
1008
1009 #[fuchsia::test]
1010 async fn test_against_ram_disk() {
1011 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1012
1013 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1014 vmo.write(b"hello", 5).expect("vmo.write failed");
1015 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1016 block_client
1017 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1018 .await
1019 .expect("write_at failed");
1020 block_client
1021 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1022 .await
1023 .expect("read_at failed");
1024 let mut buf: [u8; 5] = Default::default();
1025 vmo.read(&mut buf, 1029).expect("vmo.read failed");
1026 assert_eq!(&buf, b"hello");
1027 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1028 }
1029
1030 #[fuchsia::test]
1031 async fn test_alignment() {
1032 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1033 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1034 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1035 block_client
1036 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1037 .await
1038 .expect_err("expected failure due to bad alignment");
1039 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1040 }
1041
1042 #[fuchsia::test]
1043 async fn test_parallel_io() {
1044 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1045 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1046 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1047 let mut reads = Vec::new();
1048 for _ in 0..1024 {
1049 reads.push(
1050 block_client
1051 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1052 .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1053 );
1054 }
1055 futures::future::join_all(reads).await;
1056 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1057 }
1058
1059 #[fuchsia::test]
1060 async fn test_closed_device() {
1061 let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1062 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1063 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1064 let mut reads = Vec::new();
1065 for _ in 0..1024 {
1066 reads.push(
1067 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1068 );
1069 }
1070 assert!(block_client.is_connected());
1071 let _ = futures::join!(futures::future::join_all(reads), async {
1072 ramdisk.destroy().await.expect("ramdisk.destroy failed")
1073 });
1074 while block_client
1076 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1077 .await
1078 .is_ok()
1079 {}
1080
1081 while block_client.is_connected() {
1084 fasync::Timer::new(fasync::MonotonicInstant::after(
1086 zx::MonotonicDuration::from_millis(500),
1087 ))
1088 .await;
1089 }
1090
1091 assert_eq!(block_client.is_connected(), false);
1093 let _ = block_client.detach_vmo(vmo_id).await;
1094 }
1095
1096 #[fuchsia::test]
1097 async fn test_cancelled_reads() {
1098 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1099 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1100 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1101 {
1102 let mut reads = FuturesUnordered::new();
1103 for _ in 0..1024 {
1104 reads.push(
1105 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1106 );
1107 }
1108 for _ in 0..500 {
1110 reads.next().await;
1111 }
1112 }
1113 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1114 }
1115
1116 #[fuchsia::test]
1117 async fn test_parallel_large_read_and_write_with_memory_succeds() {
1118 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1119 let block_client_ref = &block_client;
1120 let test_one = |offset, len, fill| async move {
1121 let buf = vec![fill; len];
1122 block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1123 let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1125 block_client_ref
1126 .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1127 .await
1128 .expect("read_at failed");
1129 assert_eq!(
1130 &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1131 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1132 );
1133 assert_eq!(
1134 &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1135 &buf[..]
1136 );
1137 assert_eq!(
1138 &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1139 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1140 );
1141 };
1142 const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1143 join!(
1144 test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1145 test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1146 );
1147 }
1148
1149 struct FakeBlockServer<'a> {
1153 server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1154 channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1155 fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1156 }
1157
1158 impl<'a> FakeBlockServer<'a> {
1159 fn new(
1171 server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1172 channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1173 fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1174 ) -> FakeBlockServer<'a> {
1175 FakeBlockServer {
1176 server_channel: Some(server_channel),
1177 channel_handler: Box::new(channel_handler),
1178 fifo_handler: Box::new(fifo_handler),
1179 }
1180 }
1181
1182 async fn run(&mut self) {
1184 let server = self.server_channel.take().unwrap();
1185
1186 let (server_fifo, client_fifo) =
1188 zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1189 .expect("Fifo::create failed");
1190 let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1191
1192 let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1193 let fifo_future = Abortable::new(
1194 async {
1195 let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1196 let (mut reader, mut writer) = fifo.async_io();
1197 let mut request = BlockFifoRequest::default();
1198 loop {
1199 match reader.read_entries(&mut request).await {
1200 Ok(1) => {}
1201 Err(zx::Status::PEER_CLOSED) => break,
1202 Err(e) => panic!("read_entry failed {:?}", e),
1203 _ => unreachable!(),
1204 };
1205
1206 let response = self.fifo_handler.as_ref()(request);
1207 writer
1208 .write_entries(std::slice::from_ref(&response))
1209 .await
1210 .expect("write_entries failed");
1211 }
1212 },
1213 fifo_future_abort_registration,
1214 );
1215
1216 let channel_future = async {
1217 server
1218 .into_stream()
1219 .for_each_concurrent(None, |request| async {
1220 let request = request.expect("unexpected fidl error");
1221
1222 match request {
1223 block::BlockRequest::GetInfo { responder } => {
1224 responder
1225 .send(Ok(&block::BlockInfo {
1226 block_count: 1024,
1227 block_size: 512,
1228 max_transfer_size: 1024 * 1024,
1229 flags: block::DeviceFlag::empty(),
1230 }))
1231 .expect("send failed");
1232 }
1233 block::BlockRequest::OpenSession { session, control_handle: _ } => {
1234 let stream = session.into_stream();
1235 stream
1236 .for_each(|request| async {
1237 let request = request.expect("unexpected fidl error");
1238 if self.channel_handler.as_ref()(&request) {
1241 return;
1242 }
1243 match request {
1244 block::SessionRequest::GetFifo { responder } => {
1245 match maybe_server_fifo.lock().take() {
1246 Some(fifo) => {
1247 responder.send(Ok(fifo.downcast()))
1248 }
1249 None => responder.send(Err(
1250 zx::Status::NO_RESOURCES.into_raw(),
1251 )),
1252 }
1253 .expect("send failed")
1254 }
1255 block::SessionRequest::AttachVmo {
1256 vmo: _,
1257 responder,
1258 } => responder
1259 .send(Ok(&block::VmoId { id: 1 }))
1260 .expect("send failed"),
1261 block::SessionRequest::Close { responder } => {
1262 fifo_future_abort.abort();
1263 responder.send(Ok(())).expect("send failed")
1264 }
1265 }
1266 })
1267 .await
1268 }
1269 _ => panic!("Unexpected message"),
1270 }
1271 })
1272 .await;
1273 };
1274
1275 let _result = join!(fifo_future, channel_future);
1276 }
1278 }
1279
1280 #[fuchsia::test]
1281 async fn test_block_close_is_called() {
1282 let close_called = fuchsia_sync::Mutex::new(false);
1283 let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1284
1285 std::thread::spawn(move || {
1286 let _block_client =
1287 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1288 });
1290
1291 let channel_handler = |request: &block::SessionRequest| -> bool {
1292 if let block::SessionRequest::Close { .. } = request {
1293 *close_called.lock() = true;
1294 }
1295 false
1296 };
1297 FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1298
1299 assert!(*close_called.lock());
1301 }
1302
1303 #[fuchsia::test]
1304 async fn test_block_flush_is_called() {
1305 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1306
1307 struct Interface {
1308 flush_called: Arc<AtomicBool>,
1309 }
1310 impl block_server::async_interface::Interface for Interface {
1311 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1312 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1313 device_flags: fidl_fuchsia_storage_block::DeviceFlag::empty(),
1314 max_transfer_blocks: None,
1315 block_range: Some(0..1000),
1316 type_guid: [0; 16],
1317 instance_guid: [0; 16],
1318 name: "foo".to_string(),
1319 flags: 0,
1320 }))
1321 }
1322
1323 async fn read(
1324 &self,
1325 _device_block_offset: u64,
1326 _block_count: u32,
1327 _vmo: &Arc<zx::Vmo>,
1328 _vmo_offset: u64,
1329 _opts: ReadOptions,
1330 _trace_flow_id: Option<NonZero<u64>>,
1331 ) -> Result<(), zx::Status> {
1332 unreachable!();
1333 }
1334
1335 async fn write(
1336 &self,
1337 _device_block_offset: u64,
1338 _block_count: u32,
1339 _vmo: &Arc<zx::Vmo>,
1340 _vmo_offset: u64,
1341 _opts: WriteOptions,
1342 _trace_flow_id: Option<NonZero<u64>>,
1343 ) -> Result<(), zx::Status> {
1344 unreachable!();
1345 }
1346
1347 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1348 self.flush_called.store(true, Ordering::Relaxed);
1349 Ok(())
1350 }
1351
1352 async fn trim(
1353 &self,
1354 _device_block_offset: u64,
1355 _block_count: u32,
1356 _trace_flow_id: Option<NonZero<u64>>,
1357 ) -> Result<(), zx::Status> {
1358 unreachable!();
1359 }
1360 }
1361
1362 let flush_called = Arc::new(AtomicBool::new(false));
1363
1364 futures::join!(
1365 async {
1366 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1367
1368 block_client.flush().await.expect("flush failed");
1369 },
1370 async {
1371 let block_server = BlockServer::new(
1372 512,
1373 Arc::new(Interface { flush_called: flush_called.clone() }),
1374 );
1375 block_server.handle_requests(stream.cast_stream()).await.unwrap();
1376 }
1377 );
1378
1379 assert!(flush_called.load(Ordering::Relaxed));
1380 }
1381
1382 #[fuchsia::test]
1383 async fn test_trace_flow_ids_set() {
1384 let (proxy, server) = fidl::endpoints::create_proxy();
1385
1386 futures::join!(
1387 async {
1388 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1389 block_client.flush().await.expect("flush failed");
1390 },
1391 async {
1392 let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1393 let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1394 if request.trace_flow_id > 0 {
1395 *flow_id.lock() = Some(request.trace_flow_id);
1396 }
1397 BlockFifoResponse {
1398 status: zx::Status::OK.into_raw(),
1399 reqid: request.reqid,
1400 ..Default::default()
1401 }
1402 };
1403 FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1404 assert!(flow_id.lock().is_some());
1406 }
1407 );
1408 }
1409}