1use fidl_fuchsia_storage_block as block;
13use fidl_fuchsia_storage_block::{MAX_TRANSFER_UNBOUNDED, VMOID_INVALID};
14use fuchsia_async as fasync;
15use fuchsia_sync::Mutex;
16use futures::channel::oneshot;
17use futures::executor::block_on;
18use std::borrow::Borrow;
19use std::collections::HashMap;
20use std::future::Future;
21use std::hash::{Hash, Hasher};
22use std::mem::MaybeUninit;
23use std::num::NonZero;
24use std::ops::{DerefMut, Range};
25use std::pin::Pin;
26use std::sync::atomic::{AtomicU16, Ordering};
27use std::sync::{Arc, LazyLock};
28use std::task::{Context, Poll, Waker};
29use storage_trace as trace;
30use zx::sys::zx_handle_t;
31
32pub use cache::Cache;
33
34pub use block::DeviceFlag as BlockDeviceFlag;
35
36pub use block_protocol::*;
37
38pub mod cache;
39
40const TEMP_VMO_SIZE: usize = 65536;
41
42pub const NO_TRACE_ID: u64 = 0;
44
45pub use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
46
47fn fidl_to_status(error: fidl::Error) -> zx::Status {
48 match error {
49 fidl::Error::ClientChannelClosed { status, .. } => status,
50 _ => zx::Status::INTERNAL,
51 }
52}
53
54fn opcode_str(opcode: u8) -> &'static str {
55 match BlockOpcode::from_primitive(opcode) {
56 Some(BlockOpcode::Read) => "read",
57 Some(BlockOpcode::Write) => "write",
58 Some(BlockOpcode::Flush) => "flush",
59 Some(BlockOpcode::Trim) => "trim",
60 Some(BlockOpcode::CloseVmo) => "close_vmo",
61 None => "unknown",
62 }
63}
64
65fn generate_trace_flow_id(request_id: u32) -> u64 {
68 static SELF_HANDLE: LazyLock<zx_handle_t> =
69 LazyLock::new(|| fuchsia_runtime::process_self().raw_handle());
70 *SELF_HANDLE as u64 + (request_id as u64) << 32
71}
72
73pub enum BufferSlice<'a> {
74 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
75 Memory(&'a [u8]),
76}
77
78impl<'a> BufferSlice<'a> {
79 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
80 BufferSlice::VmoId { vmo_id, offset, length }
81 }
82}
83
84impl<'a> From<&'a [u8]> for BufferSlice<'a> {
85 fn from(buf: &'a [u8]) -> Self {
86 BufferSlice::Memory(buf)
87 }
88}
89
90pub enum MutableBufferSlice<'a> {
91 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
92 Memory(&'a mut [u8]),
93}
94
95impl<'a> MutableBufferSlice<'a> {
96 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
97 MutableBufferSlice::VmoId { vmo_id, offset, length }
98 }
99}
100
101impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
102 fn from(buf: &'a mut [u8]) -> Self {
103 MutableBufferSlice::Memory(buf)
104 }
105}
106
107#[derive(Default)]
108struct RequestState {
109 result: Option<zx::Status>,
110 waker: Option<Waker>,
111}
112
113#[derive(Default)]
114struct FifoState {
115 fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
117
118 next_request_id: u32,
120
121 queue: std::collections::VecDeque<BlockFifoRequest>,
123
124 map: HashMap<u32, RequestState>,
126
127 poller_waker: Option<Waker>,
129
130 attach_barrier: bool,
132}
133
134impl FifoState {
135 fn terminate(&mut self) {
136 self.fifo.take();
137 for (_, request_state) in self.map.iter_mut() {
138 request_state.result.get_or_insert(zx::Status::CANCELED);
139 if let Some(waker) = request_state.waker.take() {
140 waker.wake();
141 }
142 }
143 if let Some(waker) = self.poller_waker.take() {
144 waker.wake();
145 }
146 }
147
148 fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
150 let fifo = if let Some(fifo) = self.fifo.as_ref() {
151 fifo
152 } else {
153 return true;
154 };
155
156 loop {
157 let slice = self.queue.as_slices().0;
158 if slice.is_empty() {
159 return false;
160 }
161 match fifo.try_write(context, slice) {
162 Poll::Ready(Ok(sent)) => {
163 self.queue.drain(0..sent);
164 }
165 Poll::Ready(Err(_)) => {
166 self.terminate();
167 return true;
168 }
169 Poll::Pending => {
170 return false;
171 }
172 }
173 }
174 }
175}
176
177type FifoStateRef = Arc<Mutex<FifoState>>;
178
179struct ResponseFuture {
181 request_id: u32,
182 fifo_state: FifoStateRef,
183}
184
185impl ResponseFuture {
186 fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
187 ResponseFuture { request_id, fifo_state }
188 }
189}
190
191impl Future for ResponseFuture {
192 type Output = Result<(), zx::Status>;
193
194 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
195 let mut state = self.fifo_state.lock();
196 let request_state = state.map.get_mut(&self.request_id).unwrap();
197 if let Some(result) = request_state.result {
198 Poll::Ready(result.into())
199 } else {
200 request_state.waker.replace(context.waker().clone());
201 Poll::Pending
202 }
203 }
204}
205
206impl Drop for ResponseFuture {
207 fn drop(&mut self) {
208 let mut state = self.fifo_state.lock();
209 state.map.remove(&self.request_id).unwrap();
210 update_outstanding_requests_counter(state.map.len());
211 }
212}
213
214#[derive(Debug)]
216#[must_use]
217pub struct VmoId(AtomicU16);
218
219impl VmoId {
220 pub fn new(id: u16) -> Self {
222 Self(AtomicU16::new(id))
223 }
224
225 pub fn take(&self) -> Self {
227 Self(AtomicU16::new(self.0.swap(VMOID_INVALID, Ordering::Relaxed)))
228 }
229
230 pub fn is_valid(&self) -> bool {
231 self.id() != VMOID_INVALID
232 }
233
234 #[must_use]
236 pub fn into_id(self) -> u16 {
237 self.0.swap(VMOID_INVALID, Ordering::Relaxed)
238 }
239
240 pub fn id(&self) -> u16 {
241 self.0.load(Ordering::Relaxed)
242 }
243}
244
245impl PartialEq for VmoId {
246 fn eq(&self, other: &Self) -> bool {
247 self.id() == other.id()
248 }
249}
250
251impl Eq for VmoId {}
252
253impl Drop for VmoId {
254 fn drop(&mut self) {
255 assert_eq!(self.0.load(Ordering::Relaxed), VMOID_INVALID, "Did you forget to detach?");
256 }
257}
258
259impl Hash for VmoId {
260 fn hash<H: Hasher>(&self, state: &mut H) {
261 self.id().hash(state);
262 }
263}
264
265pub trait BlockClient: Send + Sync {
269 fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
271
272 fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
274
275 fn read_at(
277 &self,
278 buffer_slice: MutableBufferSlice<'_>,
279 device_offset: u64,
280 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
281 self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
282 }
283
284 fn read_at_with_opts(
285 &self,
286 buffer_slice: MutableBufferSlice<'_>,
287 device_offset: u64,
288 opts: ReadOptions,
289 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
290 self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
291 }
292
293 fn read_at_with_opts_traced(
294 &self,
295 buffer_slice: MutableBufferSlice<'_>,
296 device_offset: u64,
297 opts: ReadOptions,
298 trace_flow_id: u64,
299 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
300
301 fn write_at(
303 &self,
304 buffer_slice: BufferSlice<'_>,
305 device_offset: u64,
306 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
307 self.write_at_with_opts_traced(
308 buffer_slice,
309 device_offset,
310 WriteOptions::default(),
311 NO_TRACE_ID,
312 )
313 }
314
315 fn write_at_with_opts(
316 &self,
317 buffer_slice: BufferSlice<'_>,
318 device_offset: u64,
319 opts: WriteOptions,
320 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
321 self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
322 }
323
324 fn write_at_with_opts_traced(
325 &self,
326 buffer_slice: BufferSlice<'_>,
327 device_offset: u64,
328 opts: WriteOptions,
329 trace_flow_id: u64,
330 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
331
332 fn trim(
334 &self,
335 device_range: Range<u64>,
336 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
337 self.trim_traced(device_range, NO_TRACE_ID)
338 }
339
340 fn trim_traced(
341 &self,
342 device_range: Range<u64>,
343 trace_flow_id: u64,
344 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
345
346 fn barrier(&self);
351
352 fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
353 self.flush_traced(NO_TRACE_ID)
354 }
355
356 fn flush_traced(
358 &self,
359 trace_flow_id: u64,
360 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
361
362 fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
364
365 fn block_size(&self) -> u32;
367
368 fn block_count(&self) -> u64;
370
371 fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
373
374 fn block_flags(&self) -> BlockDeviceFlag;
376
377 fn is_connected(&self) -> bool;
379}
380
381struct Common {
382 block_size: u32,
383 block_count: u64,
384 max_transfer_blocks: Option<NonZero<u32>>,
385 block_flags: BlockDeviceFlag,
386 fifo_state: FifoStateRef,
387 temp_vmo: futures::lock::Mutex<zx::Vmo>,
388 temp_vmo_id: VmoId,
389}
390
391impl Common {
392 fn new(
393 fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
394 info: &block::BlockInfo,
395 temp_vmo: zx::Vmo,
396 temp_vmo_id: VmoId,
397 ) -> Self {
398 let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
399 fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
400 Self {
401 block_size: info.block_size,
402 block_count: info.block_count,
403 max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
404 NonZero::new(info.max_transfer_size / info.block_size)
405 } else {
406 None
407 },
408 block_flags: info.flags,
409 fifo_state,
410 temp_vmo: futures::lock::Mutex::new(temp_vmo),
411 temp_vmo_id,
412 }
413 }
414
415 fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
416 if bytes % self.block_size as u64 != 0 {
417 Err(zx::Status::INVALID_ARGS)
418 } else {
419 Ok(bytes / self.block_size as u64)
420 }
421 }
422
423 async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
425 let (request_id, trace_flow_id) = {
426 let mut state = self.fifo_state.lock();
427
428 let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
429 if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
430 && state.attach_barrier
431 {
432 flags |= BlockIoFlag::PRE_BARRIER;
433 request.command.flags = flags.bits();
434 state.attach_barrier = false;
435 }
436
437 if state.fifo.is_none() {
438 return Err(zx::Status::CANCELED);
440 }
441 trace::duration!(
442 "storage",
443 "block_client::send::start",
444 "op" => opcode_str(request.command.opcode),
445 "len" => request.length * self.block_size
446 );
447 let request_id = state.next_request_id;
448 state.next_request_id = state.next_request_id.overflowing_add(1).0;
449 assert!(
450 state.map.insert(request_id, RequestState::default()).is_none(),
451 "request id in use!"
452 );
453 update_outstanding_requests_counter(state.map.len());
454 request.reqid = request_id;
455 if request.trace_flow_id == NO_TRACE_ID {
456 request.trace_flow_id = generate_trace_flow_id(request_id);
457 }
458 let trace_flow_id = request.trace_flow_id;
459 trace::flow_begin!("storage", "block_client::send", trace_flow_id.into());
460 state.queue.push_back(request);
461 if let Some(waker) = state.poller_waker.clone() {
462 state.poll_send_requests(&mut Context::from_waker(&waker));
463 }
464 (request_id, trace_flow_id)
465 };
466 ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
467 trace::duration!("storage", "block_client::send::end");
468 trace::flow_end!("storage", "block_client::send", trace_flow_id.into());
469 Ok(())
470 }
471
472 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
473 self.send(BlockFifoRequest {
474 command: BlockFifoCommand {
475 opcode: BlockOpcode::CloseVmo.into_primitive(),
476 flags: 0,
477 ..Default::default()
478 },
479 vmoid: vmo_id.into_id(),
480 ..Default::default()
481 })
482 .await
483 }
484
485 async fn read_at(
486 &self,
487 buffer_slice: MutableBufferSlice<'_>,
488 device_offset: u64,
489 opts: ReadOptions,
490 trace_flow_id: u64,
491 ) -> Result<(), zx::Status> {
492 let mut flags = BlockIoFlag::empty();
493
494 if opts.inline_crypto.is_enabled {
495 flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
496 }
497
498 match buffer_slice {
499 MutableBufferSlice::VmoId { vmo_id, offset, length } => {
500 self.send(BlockFifoRequest {
501 command: BlockFifoCommand {
502 opcode: BlockOpcode::Read.into_primitive(),
503 flags: flags.bits(),
504 ..Default::default()
505 },
506 vmoid: vmo_id.id(),
507 length: self
508 .to_blocks(length)?
509 .try_into()
510 .map_err(|_| zx::Status::INVALID_ARGS)?,
511 vmo_offset: self.to_blocks(offset)?,
512 dev_offset: self.to_blocks(device_offset)?,
513 trace_flow_id,
514 dun: opts.inline_crypto.dun,
515 slot: opts.inline_crypto.slot,
516 ..Default::default()
517 })
518 .await?
519 }
520 MutableBufferSlice::Memory(mut slice) => {
521 let temp_vmo = self.temp_vmo.lock().await;
522 let mut device_block = self.to_blocks(device_offset)?;
523 loop {
524 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
525 let block_count = self.to_blocks(to_do as u64)? as u32;
526 self.send(BlockFifoRequest {
527 command: BlockFifoCommand {
528 opcode: BlockOpcode::Read.into_primitive(),
529 flags: flags.bits(),
530 ..Default::default()
531 },
532 vmoid: self.temp_vmo_id.id(),
533 length: block_count,
534 vmo_offset: 0,
535 dev_offset: device_block,
536 trace_flow_id,
537 dun: opts.inline_crypto.dun,
538 slot: opts.inline_crypto.slot,
539 ..Default::default()
540 })
541 .await?;
542 temp_vmo.read(&mut slice[..to_do], 0)?;
543 if to_do == slice.len() {
544 break;
545 }
546 device_block += block_count as u64;
547 slice = &mut slice[to_do..];
548 }
549 }
550 }
551 Ok(())
552 }
553
554 async fn write_at(
555 &self,
556 buffer_slice: BufferSlice<'_>,
557 device_offset: u64,
558 opts: WriteOptions,
559 trace_flow_id: u64,
560 ) -> Result<(), zx::Status> {
561 let mut flags = BlockIoFlag::empty();
562
563 if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
564 flags |= BlockIoFlag::FORCE_ACCESS;
565 }
566
567 if opts.flags.contains(WriteFlags::PRE_BARRIER) {
568 flags |= BlockIoFlag::PRE_BARRIER;
569 }
570
571 if opts.inline_crypto.is_enabled {
572 flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
573 }
574
575 match buffer_slice {
576 BufferSlice::VmoId { vmo_id, offset, length } => {
577 self.send(BlockFifoRequest {
578 command: BlockFifoCommand {
579 opcode: BlockOpcode::Write.into_primitive(),
580 flags: flags.bits(),
581 ..Default::default()
582 },
583 vmoid: vmo_id.id(),
584 length: self
585 .to_blocks(length)?
586 .try_into()
587 .map_err(|_| zx::Status::INVALID_ARGS)?,
588 vmo_offset: self.to_blocks(offset)?,
589 dev_offset: self.to_blocks(device_offset)?,
590 trace_flow_id,
591 dun: opts.inline_crypto.dun,
592 slot: opts.inline_crypto.slot,
593 ..Default::default()
594 })
595 .await?;
596 }
597 BufferSlice::Memory(mut slice) => {
598 let temp_vmo = self.temp_vmo.lock().await;
599 let mut device_block = self.to_blocks(device_offset)?;
600 loop {
601 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
602 let block_count = self.to_blocks(to_do as u64)? as u32;
603 temp_vmo.write(&slice[..to_do], 0)?;
604 self.send(BlockFifoRequest {
605 command: BlockFifoCommand {
606 opcode: BlockOpcode::Write.into_primitive(),
607 flags: flags.bits(),
608 ..Default::default()
609 },
610 vmoid: self.temp_vmo_id.id(),
611 length: block_count,
612 vmo_offset: 0,
613 dev_offset: device_block,
614 trace_flow_id,
615 dun: opts.inline_crypto.dun,
616 slot: opts.inline_crypto.slot,
617 ..Default::default()
618 })
619 .await?;
620 if to_do == slice.len() {
621 break;
622 }
623 device_block += block_count as u64;
624 slice = &slice[to_do..];
625 }
626 }
627 }
628 Ok(())
629 }
630
631 async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
632 let length = self.to_blocks(device_range.end - device_range.start)? as u32;
633 let dev_offset = self.to_blocks(device_range.start)?;
634 self.send(BlockFifoRequest {
635 command: BlockFifoCommand {
636 opcode: BlockOpcode::Trim.into_primitive(),
637 flags: 0,
638 ..Default::default()
639 },
640 vmoid: VMOID_INVALID,
641 length,
642 dev_offset,
643 trace_flow_id,
644 ..Default::default()
645 })
646 .await
647 }
648
649 async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
650 self.send(BlockFifoRequest {
651 command: BlockFifoCommand {
652 opcode: BlockOpcode::Flush.into_primitive(),
653 flags: 0,
654 ..Default::default()
655 },
656 vmoid: VMOID_INVALID,
657 trace_flow_id,
658 ..Default::default()
659 })
660 .await
661 }
662
663 fn barrier(&self) {
664 self.fifo_state.lock().attach_barrier = true;
665 }
666
667 fn block_size(&self) -> u32 {
668 self.block_size
669 }
670
671 fn block_count(&self) -> u64 {
672 self.block_count
673 }
674
675 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
676 self.max_transfer_blocks.clone()
677 }
678
679 fn block_flags(&self) -> BlockDeviceFlag {
680 self.block_flags
681 }
682
683 fn is_connected(&self) -> bool {
684 self.fifo_state.lock().fifo.is_some()
685 }
686}
687
688impl Drop for Common {
689 fn drop(&mut self) {
690 let _ = self.temp_vmo_id.take().into_id();
693 self.fifo_state.lock().terminate();
694 }
695}
696
697pub struct RemoteBlockClient {
699 session: block::SessionProxy,
700 common: Common,
701}
702
703impl RemoteBlockClient {
704 pub async fn new(remote: impl Borrow<block::BlockProxy>) -> Result<Self, zx::Status> {
706 let remote = remote.borrow();
707 let info =
708 remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
709 let (session, server) = fidl::endpoints::create_proxy();
710 let () = remote.open_session(server).map_err(fidl_to_status)?;
711 Self::from_session(info, session).await
712 }
713
714 pub async fn from_session(
715 info: block::BlockInfo,
716 session: block::SessionProxy,
717 ) -> Result<Self, zx::Status> {
718 const SCRATCH_VMO_NAME: zx::Name = zx::Name::new_lossy("block-client-scratch-vmo");
719 let fifo =
720 session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
721 let fifo = fasync::Fifo::from_fifo(fifo);
722 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
723 temp_vmo.set_name(&SCRATCH_VMO_NAME)?;
724 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
725 let vmo_id =
726 session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
727 let vmo_id = VmoId::new(vmo_id.id);
728 Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
729 }
730}
731
732impl BlockClient for RemoteBlockClient {
733 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
734 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
735 let vmo_id = self
736 .session
737 .attach_vmo(dup)
738 .await
739 .map_err(fidl_to_status)?
740 .map_err(zx::Status::from_raw)?;
741 Ok(VmoId::new(vmo_id.id))
742 }
743
744 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
745 self.common.detach_vmo(vmo_id).await
746 }
747
748 async fn read_at_with_opts_traced(
749 &self,
750 buffer_slice: MutableBufferSlice<'_>,
751 device_offset: u64,
752 opts: ReadOptions,
753 trace_flow_id: u64,
754 ) -> Result<(), zx::Status> {
755 self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
756 }
757
758 async fn write_at_with_opts_traced(
759 &self,
760 buffer_slice: BufferSlice<'_>,
761 device_offset: u64,
762 opts: WriteOptions,
763 trace_flow_id: u64,
764 ) -> Result<(), zx::Status> {
765 self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
766 }
767
768 async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
769 self.common.trim(range, trace_flow_id).await
770 }
771
772 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
773 self.common.flush(trace_flow_id).await
774 }
775
776 fn barrier(&self) {
777 self.common.barrier()
778 }
779
780 async fn close(&self) -> Result<(), zx::Status> {
781 let () =
782 self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
783 Ok(())
784 }
785
786 fn block_size(&self) -> u32 {
787 self.common.block_size()
788 }
789
790 fn block_count(&self) -> u64 {
791 self.common.block_count()
792 }
793
794 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
795 self.common.max_transfer_blocks()
796 }
797
798 fn block_flags(&self) -> BlockDeviceFlag {
799 self.common.block_flags()
800 }
801
802 fn is_connected(&self) -> bool {
803 self.common.is_connected()
804 }
805}
806
807pub struct RemoteBlockClientSync {
808 session: block::SessionSynchronousProxy,
809 common: Common,
810}
811
812impl RemoteBlockClientSync {
813 pub fn new(
817 client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
818 ) -> Result<Self, zx::Status> {
819 let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
820 let info = remote
821 .get_info(zx::MonotonicInstant::INFINITE)
822 .map_err(fidl_to_status)?
823 .map_err(zx::Status::from_raw)?;
824 let (client, server) = fidl::endpoints::create_endpoints();
825 let () = remote.open_session(server).map_err(fidl_to_status)?;
826 let session = block::SessionSynchronousProxy::new(client.into_channel());
827 let fifo = session
828 .get_fifo(zx::MonotonicInstant::INFINITE)
829 .map_err(fidl_to_status)?
830 .map_err(zx::Status::from_raw)?;
831 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
832 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
833 let vmo_id = session
834 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
835 .map_err(fidl_to_status)?
836 .map_err(zx::Status::from_raw)?;
837 let vmo_id = VmoId::new(vmo_id.id);
838
839 let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
842 std::thread::spawn(move || {
843 let mut executor = fasync::LocalExecutor::default();
844 let fifo = fasync::Fifo::from_fifo(fifo);
845 let common = Common::new(fifo, &info, temp_vmo, vmo_id);
846 let fifo_state = common.fifo_state.clone();
847 let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
848 executor.run_singlethreaded(FifoPoller { fifo_state });
849 });
850 block_on(receiver).map_err(|_| zx::Status::CANCELED)?
851 }
852
853 pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
854 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
855 let vmo_id = self
856 .session
857 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
858 .map_err(fidl_to_status)?
859 .map_err(zx::Status::from_raw)?;
860 Ok(VmoId::new(vmo_id.id))
861 }
862
863 pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
864 block_on(self.common.detach_vmo(vmo_id))
865 }
866
867 pub fn read_at(
868 &self,
869 buffer_slice: MutableBufferSlice<'_>,
870 device_offset: u64,
871 ) -> Result<(), zx::Status> {
872 block_on(self.common.read_at(
873 buffer_slice,
874 device_offset,
875 ReadOptions::default(),
876 NO_TRACE_ID,
877 ))
878 }
879
880 pub fn write_at(
881 &self,
882 buffer_slice: BufferSlice<'_>,
883 device_offset: u64,
884 ) -> Result<(), zx::Status> {
885 block_on(self.common.write_at(
886 buffer_slice,
887 device_offset,
888 WriteOptions::default(),
889 NO_TRACE_ID,
890 ))
891 }
892
893 pub fn flush(&self) -> Result<(), zx::Status> {
894 block_on(self.common.flush(NO_TRACE_ID))
895 }
896
897 pub fn close(&self) -> Result<(), zx::Status> {
898 let () = self
899 .session
900 .close(zx::MonotonicInstant::INFINITE)
901 .map_err(fidl_to_status)?
902 .map_err(zx::Status::from_raw)?;
903 Ok(())
904 }
905
906 pub fn block_size(&self) -> u32 {
907 self.common.block_size()
908 }
909
910 pub fn block_count(&self) -> u64 {
911 self.common.block_count()
912 }
913
914 pub fn is_connected(&self) -> bool {
915 self.common.is_connected()
916 }
917}
918
919impl Drop for RemoteBlockClientSync {
920 fn drop(&mut self) {
921 let _ = self.close();
923 }
924}
925
926struct FifoPoller {
928 fifo_state: FifoStateRef,
929}
930
931impl Future for FifoPoller {
932 type Output = ();
933
934 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
935 let mut state_lock = self.fifo_state.lock();
936 let state = state_lock.deref_mut(); if state.poll_send_requests(context) {
940 return Poll::Ready(());
941 }
942
943 let fifo = state.fifo.as_ref().unwrap(); loop {
946 let mut response = MaybeUninit::uninit();
947 match fifo.try_read(context, &mut response) {
948 Poll::Pending => {
949 state.poller_waker = Some(context.waker().clone());
950 return Poll::Pending;
951 }
952 Poll::Ready(Ok(_)) => {
953 let response = unsafe { response.assume_init() };
954 let request_id = response.reqid;
955 if let Some(request_state) = state.map.get_mut(&request_id) {
957 request_state.result.replace(zx::Status::from_raw(response.status));
958 if let Some(waker) = request_state.waker.take() {
959 waker.wake();
960 }
961 }
962 }
963 Poll::Ready(Err(_)) => {
964 state.terminate();
965 return Poll::Ready(());
966 }
967 }
968 }
969 }
970}
971
972fn update_outstanding_requests_counter(outstanding: usize) {
973 trace::counter!("storage", "block-requests", 0, "outstanding" => outstanding);
974}
975
976#[cfg(test)]
977mod tests {
978 use super::{
979 BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
980 RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
981 };
982 use block_protocol::ReadOptions;
983 use block_server::{BlockServer, DeviceInfo, PartitionInfo};
984 use fidl::endpoints::RequestStream as _;
985 use fidl_fuchsia_storage_block as block;
986 use fuchsia_async as fasync;
987 use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
988 use futures::join;
989 use futures::stream::StreamExt as _;
990 use futures::stream::futures_unordered::FuturesUnordered;
991 use ramdevice_client::RamdiskClient;
992 use std::borrow::Cow;
993 use std::num::NonZero;
994 use std::sync::Arc;
995 use std::sync::atomic::{AtomicBool, Ordering};
996
997 const RAMDISK_BLOCK_SIZE: u64 = 1024;
998 const RAMDISK_BLOCK_COUNT: u64 = 1024;
999
1000 pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
1001 let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
1002 .await
1003 .expect("RamdiskClient::create failed");
1004 let client_end = ramdisk.open().expect("ramdisk.open failed");
1005 let proxy = client_end.into_proxy();
1006 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1007 assert_eq!(block_client.block_size(), 1024);
1008 let client_end = ramdisk.open().expect("ramdisk.open failed");
1009 let proxy = client_end.into_proxy();
1010 (ramdisk, proxy, block_client)
1011 }
1012
1013 #[fuchsia::test]
1014 async fn test_against_ram_disk() {
1015 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1016
1017 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1018 vmo.write(b"hello", 5).expect("vmo.write failed");
1019 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1020 block_client
1021 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1022 .await
1023 .expect("write_at failed");
1024 block_client
1025 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1026 .await
1027 .expect("read_at failed");
1028 let mut buf: [u8; 5] = Default::default();
1029 vmo.read(&mut buf, 1029).expect("vmo.read failed");
1030 assert_eq!(&buf, b"hello");
1031 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1032 }
1033
1034 #[fuchsia::test]
1035 async fn test_alignment() {
1036 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1037 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1038 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1039 block_client
1040 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1041 .await
1042 .expect_err("expected failure due to bad alignment");
1043 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1044 }
1045
1046 #[fuchsia::test]
1047 async fn test_parallel_io() {
1048 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1049 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1050 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1051 let mut reads = Vec::new();
1052 for _ in 0..1024 {
1053 reads.push(
1054 block_client
1055 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1056 .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1057 );
1058 }
1059 futures::future::join_all(reads).await;
1060 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1061 }
1062
1063 #[fuchsia::test]
1064 async fn test_closed_device() {
1065 let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1066 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1067 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1068 let mut reads = Vec::new();
1069 for _ in 0..1024 {
1070 reads.push(
1071 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1072 );
1073 }
1074 assert!(block_client.is_connected());
1075 let _ = futures::join!(futures::future::join_all(reads), async {
1076 std::mem::drop(ramdisk);
1077 });
1078 while block_client
1080 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1081 .await
1082 .is_ok()
1083 {}
1084
1085 while block_client.is_connected() {
1088 fasync::Timer::new(fasync::MonotonicInstant::after(
1090 zx::MonotonicDuration::from_millis(500),
1091 ))
1092 .await;
1093 }
1094
1095 assert_eq!(block_client.is_connected(), false);
1097 let _ = block_client.detach_vmo(vmo_id).await;
1098 }
1099
1100 #[fuchsia::test]
1101 async fn test_cancelled_reads() {
1102 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1103 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1104 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1105 {
1106 let mut reads = FuturesUnordered::new();
1107 for _ in 0..1024 {
1108 reads.push(
1109 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1110 );
1111 }
1112 for _ in 0..500 {
1114 reads.next().await;
1115 }
1116 }
1117 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1118 }
1119
1120 #[fuchsia::test]
1121 async fn test_parallel_large_read_and_write_with_memory_succeds() {
1122 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1123 let block_client_ref = &block_client;
1124 let test_one = |offset, len, fill| async move {
1125 let buf = vec![fill; len];
1126 block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1127 let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1129 block_client_ref
1130 .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1131 .await
1132 .expect("read_at failed");
1133 assert_eq!(
1134 &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1135 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1136 );
1137 assert_eq!(
1138 &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1139 &buf[..]
1140 );
1141 assert_eq!(
1142 &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1143 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1144 );
1145 };
1146 const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1147 join!(
1148 test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1149 test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1150 );
1151 }
1152
1153 struct FakeBlockServer<'a> {
1157 server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1158 channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1159 fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1160 }
1161
1162 impl<'a> FakeBlockServer<'a> {
1163 fn new(
1175 server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1176 channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1177 fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1178 ) -> FakeBlockServer<'a> {
1179 FakeBlockServer {
1180 server_channel: Some(server_channel),
1181 channel_handler: Box::new(channel_handler),
1182 fifo_handler: Box::new(fifo_handler),
1183 }
1184 }
1185
1186 async fn run(&mut self) {
1188 let server = self.server_channel.take().unwrap();
1189
1190 let (server_fifo, client_fifo) =
1192 zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1193 .expect("Fifo::create failed");
1194 let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1195
1196 let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1197 let fifo_future = Abortable::new(
1198 async {
1199 let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1200 let (mut reader, mut writer) = fifo.async_io();
1201 let mut request = BlockFifoRequest::default();
1202 loop {
1203 match reader.read_entries(&mut request).await {
1204 Ok(1) => {}
1205 Err(zx::Status::PEER_CLOSED) => break,
1206 Err(e) => panic!("read_entry failed {:?}", e),
1207 _ => unreachable!(),
1208 };
1209
1210 let response = self.fifo_handler.as_ref()(request);
1211 writer
1212 .write_entries(std::slice::from_ref(&response))
1213 .await
1214 .expect("write_entries failed");
1215 }
1216 },
1217 fifo_future_abort_registration,
1218 );
1219
1220 let channel_future = async {
1221 server
1222 .into_stream()
1223 .for_each_concurrent(None, |request| async {
1224 let request = request.expect("unexpected fidl error");
1225
1226 match request {
1227 block::BlockRequest::GetInfo { responder } => {
1228 responder
1229 .send(Ok(&block::BlockInfo {
1230 block_count: 1024,
1231 block_size: 512,
1232 max_transfer_size: 1024 * 1024,
1233 flags: block::DeviceFlag::empty(),
1234 }))
1235 .expect("send failed");
1236 }
1237 block::BlockRequest::OpenSession { session, control_handle: _ } => {
1238 let stream = session.into_stream();
1239 stream
1240 .for_each(|request| async {
1241 let request = request.expect("unexpected fidl error");
1242 if self.channel_handler.as_ref()(&request) {
1245 return;
1246 }
1247 match request {
1248 block::SessionRequest::GetFifo { responder } => {
1249 match maybe_server_fifo.lock().take() {
1250 Some(fifo) => {
1251 responder.send(Ok(fifo.downcast()))
1252 }
1253 None => responder.send(Err(
1254 zx::Status::NO_RESOURCES.into_raw(),
1255 )),
1256 }
1257 .expect("send failed")
1258 }
1259 block::SessionRequest::AttachVmo {
1260 vmo: _,
1261 responder,
1262 } => responder
1263 .send(Ok(&block::VmoId { id: 1 }))
1264 .expect("send failed"),
1265 block::SessionRequest::Close { responder } => {
1266 fifo_future_abort.abort();
1267 responder.send(Ok(())).expect("send failed")
1268 }
1269 }
1270 })
1271 .await
1272 }
1273 _ => panic!("Unexpected message"),
1274 }
1275 })
1276 .await;
1277 };
1278
1279 let _result = join!(fifo_future, channel_future);
1280 }
1282 }
1283
1284 #[fuchsia::test]
1285 async fn test_block_close_is_called() {
1286 let close_called = fuchsia_sync::Mutex::new(false);
1287 let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1288
1289 std::thread::spawn(move || {
1290 let _block_client =
1291 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1292 });
1294
1295 let channel_handler = |request: &block::SessionRequest| -> bool {
1296 if let block::SessionRequest::Close { .. } = request {
1297 *close_called.lock() = true;
1298 }
1299 false
1300 };
1301 FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1302
1303 assert!(*close_called.lock());
1305 }
1306
1307 #[fuchsia::test]
1308 async fn test_block_flush_is_called() {
1309 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1310
1311 struct Interface {
1312 flush_called: Arc<AtomicBool>,
1313 }
1314 impl block_server::async_interface::Interface for Interface {
1315 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1316 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1317 device_flags: fidl_fuchsia_storage_block::DeviceFlag::empty(),
1318 max_transfer_blocks: None,
1319 block_range: Some(0..1000),
1320 type_guid: [0; 16],
1321 instance_guid: [0; 16],
1322 name: "foo".to_string(),
1323 flags: 0,
1324 }))
1325 }
1326
1327 async fn read(
1328 &self,
1329 _device_block_offset: u64,
1330 _block_count: u32,
1331 _vmo: &Arc<zx::Vmo>,
1332 _vmo_offset: u64,
1333 _opts: ReadOptions,
1334 _trace_flow_id: Option<NonZero<u64>>,
1335 ) -> Result<(), zx::Status> {
1336 unreachable!();
1337 }
1338
1339 async fn write(
1340 &self,
1341 _device_block_offset: u64,
1342 _block_count: u32,
1343 _vmo: &Arc<zx::Vmo>,
1344 _vmo_offset: u64,
1345 _opts: WriteOptions,
1346 _trace_flow_id: Option<NonZero<u64>>,
1347 ) -> Result<(), zx::Status> {
1348 unreachable!();
1349 }
1350
1351 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1352 self.flush_called.store(true, Ordering::Relaxed);
1353 Ok(())
1354 }
1355
1356 async fn trim(
1357 &self,
1358 _device_block_offset: u64,
1359 _block_count: u32,
1360 _trace_flow_id: Option<NonZero<u64>>,
1361 ) -> Result<(), zx::Status> {
1362 unreachable!();
1363 }
1364 }
1365
1366 let flush_called = Arc::new(AtomicBool::new(false));
1367
1368 futures::join!(
1369 async {
1370 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1371
1372 block_client.flush().await.expect("flush failed");
1373 },
1374 async {
1375 let block_server = BlockServer::new(
1376 512,
1377 Arc::new(Interface { flush_called: flush_called.clone() }),
1378 );
1379 block_server.handle_requests(stream.cast_stream()).await.unwrap();
1380 }
1381 );
1382
1383 assert!(flush_called.load(Ordering::Relaxed));
1384 }
1385
1386 #[fuchsia::test]
1387 async fn test_trace_flow_ids_set() {
1388 let (proxy, server) = fidl::endpoints::create_proxy();
1389
1390 futures::join!(
1391 async {
1392 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1393 block_client.flush().await.expect("flush failed");
1394 },
1395 async {
1396 let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1397 let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1398 if request.trace_flow_id > 0 {
1399 *flow_id.lock() = Some(request.trace_flow_id);
1400 }
1401 BlockFifoResponse {
1402 status: zx::Status::OK.into_raw(),
1403 reqid: request.reqid,
1404 ..Default::default()
1405 }
1406 };
1407 FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1408 assert!(flow_id.lock().is_some());
1410 }
1411 );
1412 }
1413}