1use fidl_fuchsia_storage_block as block;
13use fidl_fuchsia_storage_block::{MAX_TRANSFER_UNBOUNDED, VMOID_INVALID};
14use fuchsia_async as fasync;
15use fuchsia_sync::Mutex;
16use futures::channel::oneshot;
17use futures::executor::block_on;
18use std::borrow::Borrow;
19use std::collections::HashMap;
20use std::future::Future;
21use std::hash::{Hash, Hasher};
22use std::mem::MaybeUninit;
23use std::num::NonZero;
24use std::ops::{DerefMut, Range};
25use std::pin::Pin;
26use std::sync::atomic::{AtomicU16, Ordering};
27use std::sync::{Arc, LazyLock};
28use std::task::{Context, Poll, Waker};
29use storage_trace as trace;
30use zx::sys::zx_handle_t;
31use zx::{self as zx, HandleBased as _};
32
33pub use cache::Cache;
34
35pub use block::DeviceFlag as BlockDeviceFlag;
36
37pub use block_protocol::*;
38
39pub mod cache;
40
41const TEMP_VMO_SIZE: usize = 65536;
42
43pub const NO_TRACE_ID: u64 = 0;
45
46pub use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
47
48fn fidl_to_status(error: fidl::Error) -> zx::Status {
49 match error {
50 fidl::Error::ClientChannelClosed { status, .. } => status,
51 _ => zx::Status::INTERNAL,
52 }
53}
54
55fn opcode_str(opcode: u8) -> &'static str {
56 match BlockOpcode::from_primitive(opcode) {
57 Some(BlockOpcode::Read) => "read",
58 Some(BlockOpcode::Write) => "write",
59 Some(BlockOpcode::Flush) => "flush",
60 Some(BlockOpcode::Trim) => "trim",
61 Some(BlockOpcode::CloseVmo) => "close_vmo",
62 None => "unknown",
63 }
64}
65
66fn generate_trace_flow_id(request_id: u32) -> u64 {
69 static SELF_HANDLE: LazyLock<zx_handle_t> =
70 LazyLock::new(|| fuchsia_runtime::process_self().raw_handle());
71 *SELF_HANDLE as u64 + (request_id as u64) << 32
72}
73
74pub enum BufferSlice<'a> {
75 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
76 Memory(&'a [u8]),
77}
78
79impl<'a> BufferSlice<'a> {
80 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
81 BufferSlice::VmoId { vmo_id, offset, length }
82 }
83}
84
85impl<'a> From<&'a [u8]> for BufferSlice<'a> {
86 fn from(buf: &'a [u8]) -> Self {
87 BufferSlice::Memory(buf)
88 }
89}
90
91pub enum MutableBufferSlice<'a> {
92 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
93 Memory(&'a mut [u8]),
94}
95
96impl<'a> MutableBufferSlice<'a> {
97 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
98 MutableBufferSlice::VmoId { vmo_id, offset, length }
99 }
100}
101
102impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
103 fn from(buf: &'a mut [u8]) -> Self {
104 MutableBufferSlice::Memory(buf)
105 }
106}
107
108#[derive(Default)]
109struct RequestState {
110 result: Option<zx::Status>,
111 waker: Option<Waker>,
112}
113
114#[derive(Default)]
115struct FifoState {
116 fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
118
119 next_request_id: u32,
121
122 queue: std::collections::VecDeque<BlockFifoRequest>,
124
125 map: HashMap<u32, RequestState>,
127
128 poller_waker: Option<Waker>,
130
131 attach_barrier: bool,
133}
134
135impl FifoState {
136 fn terminate(&mut self) {
137 self.fifo.take();
138 for (_, request_state) in self.map.iter_mut() {
139 request_state.result.get_or_insert(zx::Status::CANCELED);
140 if let Some(waker) = request_state.waker.take() {
141 waker.wake();
142 }
143 }
144 if let Some(waker) = self.poller_waker.take() {
145 waker.wake();
146 }
147 }
148
149 fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
151 let fifo = if let Some(fifo) = self.fifo.as_ref() {
152 fifo
153 } else {
154 return true;
155 };
156
157 loop {
158 let slice = self.queue.as_slices().0;
159 if slice.is_empty() {
160 return false;
161 }
162 match fifo.try_write(context, slice) {
163 Poll::Ready(Ok(sent)) => {
164 self.queue.drain(0..sent);
165 }
166 Poll::Ready(Err(_)) => {
167 self.terminate();
168 return true;
169 }
170 Poll::Pending => {
171 return false;
172 }
173 }
174 }
175 }
176}
177
178type FifoStateRef = Arc<Mutex<FifoState>>;
179
180struct ResponseFuture {
182 request_id: u32,
183 fifo_state: FifoStateRef,
184}
185
186impl ResponseFuture {
187 fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
188 ResponseFuture { request_id, fifo_state }
189 }
190}
191
192impl Future for ResponseFuture {
193 type Output = Result<(), zx::Status>;
194
195 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
196 let mut state = self.fifo_state.lock();
197 let request_state = state.map.get_mut(&self.request_id).unwrap();
198 if let Some(result) = request_state.result {
199 Poll::Ready(result.into())
200 } else {
201 request_state.waker.replace(context.waker().clone());
202 Poll::Pending
203 }
204 }
205}
206
207impl Drop for ResponseFuture {
208 fn drop(&mut self) {
209 let mut state = self.fifo_state.lock();
210 state.map.remove(&self.request_id).unwrap();
211 update_outstanding_requests_counter(state.map.len());
212 }
213}
214
215#[derive(Debug)]
217#[must_use]
218pub struct VmoId(AtomicU16);
219
220impl VmoId {
221 pub fn new(id: u16) -> Self {
223 Self(AtomicU16::new(id))
224 }
225
226 pub fn take(&self) -> Self {
228 Self(AtomicU16::new(self.0.swap(VMOID_INVALID, Ordering::Relaxed)))
229 }
230
231 pub fn is_valid(&self) -> bool {
232 self.id() != VMOID_INVALID
233 }
234
235 #[must_use]
237 pub fn into_id(self) -> u16 {
238 self.0.swap(VMOID_INVALID, Ordering::Relaxed)
239 }
240
241 pub fn id(&self) -> u16 {
242 self.0.load(Ordering::Relaxed)
243 }
244}
245
246impl PartialEq for VmoId {
247 fn eq(&self, other: &Self) -> bool {
248 self.id() == other.id()
249 }
250}
251
252impl Eq for VmoId {}
253
254impl Drop for VmoId {
255 fn drop(&mut self) {
256 assert_eq!(self.0.load(Ordering::Relaxed), VMOID_INVALID, "Did you forget to detach?");
257 }
258}
259
260impl Hash for VmoId {
261 fn hash<H: Hasher>(&self, state: &mut H) {
262 self.id().hash(state);
263 }
264}
265
266pub trait BlockClient: Send + Sync {
270 fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
272
273 fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
275
276 fn read_at(
278 &self,
279 buffer_slice: MutableBufferSlice<'_>,
280 device_offset: u64,
281 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
282 self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
283 }
284
285 fn read_at_with_opts(
286 &self,
287 buffer_slice: MutableBufferSlice<'_>,
288 device_offset: u64,
289 opts: ReadOptions,
290 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
291 self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
292 }
293
294 fn read_at_with_opts_traced(
295 &self,
296 buffer_slice: MutableBufferSlice<'_>,
297 device_offset: u64,
298 opts: ReadOptions,
299 trace_flow_id: u64,
300 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
301
302 fn write_at(
304 &self,
305 buffer_slice: BufferSlice<'_>,
306 device_offset: u64,
307 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
308 self.write_at_with_opts_traced(
309 buffer_slice,
310 device_offset,
311 WriteOptions::default(),
312 NO_TRACE_ID,
313 )
314 }
315
316 fn write_at_with_opts(
317 &self,
318 buffer_slice: BufferSlice<'_>,
319 device_offset: u64,
320 opts: WriteOptions,
321 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
322 self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
323 }
324
325 fn write_at_with_opts_traced(
326 &self,
327 buffer_slice: BufferSlice<'_>,
328 device_offset: u64,
329 opts: WriteOptions,
330 trace_flow_id: u64,
331 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
332
333 fn trim(
335 &self,
336 device_range: Range<u64>,
337 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
338 self.trim_traced(device_range, NO_TRACE_ID)
339 }
340
341 fn trim_traced(
342 &self,
343 device_range: Range<u64>,
344 trace_flow_id: u64,
345 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
346
347 fn barrier(&self);
352
353 fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
354 self.flush_traced(NO_TRACE_ID)
355 }
356
357 fn flush_traced(
359 &self,
360 trace_flow_id: u64,
361 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
362
363 fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
365
366 fn block_size(&self) -> u32;
368
369 fn block_count(&self) -> u64;
371
372 fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
374
375 fn block_flags(&self) -> BlockDeviceFlag;
377
378 fn is_connected(&self) -> bool;
380}
381
382struct Common {
383 block_size: u32,
384 block_count: u64,
385 max_transfer_blocks: Option<NonZero<u32>>,
386 block_flags: BlockDeviceFlag,
387 fifo_state: FifoStateRef,
388 temp_vmo: futures::lock::Mutex<zx::Vmo>,
389 temp_vmo_id: VmoId,
390}
391
392impl Common {
393 fn new(
394 fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
395 info: &block::BlockInfo,
396 temp_vmo: zx::Vmo,
397 temp_vmo_id: VmoId,
398 ) -> Self {
399 let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
400 fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
401 Self {
402 block_size: info.block_size,
403 block_count: info.block_count,
404 max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
405 NonZero::new(info.max_transfer_size / info.block_size)
406 } else {
407 None
408 },
409 block_flags: info.flags,
410 fifo_state,
411 temp_vmo: futures::lock::Mutex::new(temp_vmo),
412 temp_vmo_id,
413 }
414 }
415
416 fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
417 if bytes % self.block_size as u64 != 0 {
418 Err(zx::Status::INVALID_ARGS)
419 } else {
420 Ok(bytes / self.block_size as u64)
421 }
422 }
423
424 async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
426 let (request_id, trace_flow_id) = {
427 let mut state = self.fifo_state.lock();
428
429 let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
430 if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
431 && state.attach_barrier
432 {
433 flags |= BlockIoFlag::PRE_BARRIER;
434 request.command.flags = flags.bits();
435 state.attach_barrier = false;
436 }
437
438 if state.fifo.is_none() {
439 return Err(zx::Status::CANCELED);
441 }
442 trace::duration!(
443 "storage",
444 "block_client::send::start",
445 "op" => opcode_str(request.command.opcode),
446 "len" => request.length * self.block_size
447 );
448 let request_id = state.next_request_id;
449 state.next_request_id = state.next_request_id.overflowing_add(1).0;
450 assert!(
451 state.map.insert(request_id, RequestState::default()).is_none(),
452 "request id in use!"
453 );
454 update_outstanding_requests_counter(state.map.len());
455 request.reqid = request_id;
456 if request.trace_flow_id == NO_TRACE_ID {
457 request.trace_flow_id = generate_trace_flow_id(request_id);
458 }
459 let trace_flow_id = request.trace_flow_id;
460 trace::flow_begin!("storage", "block_client::send", trace_flow_id.into());
461 state.queue.push_back(request);
462 if let Some(waker) = state.poller_waker.clone() {
463 state.poll_send_requests(&mut Context::from_waker(&waker));
464 }
465 (request_id, trace_flow_id)
466 };
467 ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
468 trace::duration!("storage", "block_client::send::end");
469 trace::flow_end!("storage", "block_client::send", trace_flow_id.into());
470 Ok(())
471 }
472
473 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
474 self.send(BlockFifoRequest {
475 command: BlockFifoCommand {
476 opcode: BlockOpcode::CloseVmo.into_primitive(),
477 flags: 0,
478 ..Default::default()
479 },
480 vmoid: vmo_id.into_id(),
481 ..Default::default()
482 })
483 .await
484 }
485
486 async fn read_at(
487 &self,
488 buffer_slice: MutableBufferSlice<'_>,
489 device_offset: u64,
490 opts: ReadOptions,
491 trace_flow_id: u64,
492 ) -> Result<(), zx::Status> {
493 let mut flags = BlockIoFlag::empty();
494
495 if opts.inline_crypto.is_enabled {
496 flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
497 }
498
499 match buffer_slice {
500 MutableBufferSlice::VmoId { vmo_id, offset, length } => {
501 self.send(BlockFifoRequest {
502 command: BlockFifoCommand {
503 opcode: BlockOpcode::Read.into_primitive(),
504 flags: flags.bits(),
505 ..Default::default()
506 },
507 vmoid: vmo_id.id(),
508 length: self
509 .to_blocks(length)?
510 .try_into()
511 .map_err(|_| zx::Status::INVALID_ARGS)?,
512 vmo_offset: self.to_blocks(offset)?,
513 dev_offset: self.to_blocks(device_offset)?,
514 trace_flow_id,
515 dun: opts.inline_crypto.dun,
516 slot: opts.inline_crypto.slot,
517 ..Default::default()
518 })
519 .await?
520 }
521 MutableBufferSlice::Memory(mut slice) => {
522 let temp_vmo = self.temp_vmo.lock().await;
523 let mut device_block = self.to_blocks(device_offset)?;
524 loop {
525 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
526 let block_count = self.to_blocks(to_do as u64)? as u32;
527 self.send(BlockFifoRequest {
528 command: BlockFifoCommand {
529 opcode: BlockOpcode::Read.into_primitive(),
530 flags: flags.bits(),
531 ..Default::default()
532 },
533 vmoid: self.temp_vmo_id.id(),
534 length: block_count,
535 vmo_offset: 0,
536 dev_offset: device_block,
537 trace_flow_id,
538 dun: opts.inline_crypto.dun,
539 slot: opts.inline_crypto.slot,
540 ..Default::default()
541 })
542 .await?;
543 temp_vmo.read(&mut slice[..to_do], 0)?;
544 if to_do == slice.len() {
545 break;
546 }
547 device_block += block_count as u64;
548 slice = &mut slice[to_do..];
549 }
550 }
551 }
552 Ok(())
553 }
554
555 async fn write_at(
556 &self,
557 buffer_slice: BufferSlice<'_>,
558 device_offset: u64,
559 opts: WriteOptions,
560 trace_flow_id: u64,
561 ) -> Result<(), zx::Status> {
562 let mut flags = BlockIoFlag::empty();
563
564 if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
565 flags |= BlockIoFlag::FORCE_ACCESS;
566 }
567
568 if opts.flags.contains(WriteFlags::PRE_BARRIER) {
569 flags |= BlockIoFlag::PRE_BARRIER;
570 }
571
572 if opts.inline_crypto.is_enabled {
573 flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
574 }
575
576 match buffer_slice {
577 BufferSlice::VmoId { vmo_id, offset, length } => {
578 self.send(BlockFifoRequest {
579 command: BlockFifoCommand {
580 opcode: BlockOpcode::Write.into_primitive(),
581 flags: flags.bits(),
582 ..Default::default()
583 },
584 vmoid: vmo_id.id(),
585 length: self
586 .to_blocks(length)?
587 .try_into()
588 .map_err(|_| zx::Status::INVALID_ARGS)?,
589 vmo_offset: self.to_blocks(offset)?,
590 dev_offset: self.to_blocks(device_offset)?,
591 trace_flow_id,
592 dun: opts.inline_crypto.dun,
593 slot: opts.inline_crypto.slot,
594 ..Default::default()
595 })
596 .await?;
597 }
598 BufferSlice::Memory(mut slice) => {
599 let temp_vmo = self.temp_vmo.lock().await;
600 let mut device_block = self.to_blocks(device_offset)?;
601 loop {
602 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
603 let block_count = self.to_blocks(to_do as u64)? as u32;
604 temp_vmo.write(&slice[..to_do], 0)?;
605 self.send(BlockFifoRequest {
606 command: BlockFifoCommand {
607 opcode: BlockOpcode::Write.into_primitive(),
608 flags: flags.bits(),
609 ..Default::default()
610 },
611 vmoid: self.temp_vmo_id.id(),
612 length: block_count,
613 vmo_offset: 0,
614 dev_offset: device_block,
615 trace_flow_id,
616 dun: opts.inline_crypto.dun,
617 slot: opts.inline_crypto.slot,
618 ..Default::default()
619 })
620 .await?;
621 if to_do == slice.len() {
622 break;
623 }
624 device_block += block_count as u64;
625 slice = &slice[to_do..];
626 }
627 }
628 }
629 Ok(())
630 }
631
632 async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
633 let length = self.to_blocks(device_range.end - device_range.start)? as u32;
634 let dev_offset = self.to_blocks(device_range.start)?;
635 self.send(BlockFifoRequest {
636 command: BlockFifoCommand {
637 opcode: BlockOpcode::Trim.into_primitive(),
638 flags: 0,
639 ..Default::default()
640 },
641 vmoid: VMOID_INVALID,
642 length,
643 dev_offset,
644 trace_flow_id,
645 ..Default::default()
646 })
647 .await
648 }
649
650 async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
651 self.send(BlockFifoRequest {
652 command: BlockFifoCommand {
653 opcode: BlockOpcode::Flush.into_primitive(),
654 flags: 0,
655 ..Default::default()
656 },
657 vmoid: VMOID_INVALID,
658 trace_flow_id,
659 ..Default::default()
660 })
661 .await
662 }
663
664 fn barrier(&self) {
665 self.fifo_state.lock().attach_barrier = true;
666 }
667
668 fn block_size(&self) -> u32 {
669 self.block_size
670 }
671
672 fn block_count(&self) -> u64 {
673 self.block_count
674 }
675
676 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
677 self.max_transfer_blocks.clone()
678 }
679
680 fn block_flags(&self) -> BlockDeviceFlag {
681 self.block_flags
682 }
683
684 fn is_connected(&self) -> bool {
685 self.fifo_state.lock().fifo.is_some()
686 }
687}
688
689impl Drop for Common {
690 fn drop(&mut self) {
691 let _ = self.temp_vmo_id.take().into_id();
694 self.fifo_state.lock().terminate();
695 }
696}
697
698pub struct RemoteBlockClient {
700 session: block::SessionProxy,
701 common: Common,
702}
703
704impl RemoteBlockClient {
705 pub async fn new(remote: impl Borrow<block::BlockProxy>) -> Result<Self, zx::Status> {
707 let remote = remote.borrow();
708 let info =
709 remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
710 let (session, server) = fidl::endpoints::create_proxy();
711 let () = remote.open_session(server).map_err(fidl_to_status)?;
712 Self::from_session(info, session).await
713 }
714
715 pub async fn from_session(
716 info: block::BlockInfo,
717 session: block::SessionProxy,
718 ) -> Result<Self, zx::Status> {
719 const SCRATCH_VMO_NAME: zx::Name = zx::Name::new_lossy("block-client-scratch-vmo");
720 let fifo =
721 session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
722 let fifo = fasync::Fifo::from_fifo(fifo);
723 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
724 temp_vmo.set_name(&SCRATCH_VMO_NAME)?;
725 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
726 let vmo_id =
727 session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
728 let vmo_id = VmoId::new(vmo_id.id);
729 Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
730 }
731}
732
733impl BlockClient for RemoteBlockClient {
734 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
735 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
736 let vmo_id = self
737 .session
738 .attach_vmo(dup)
739 .await
740 .map_err(fidl_to_status)?
741 .map_err(zx::Status::from_raw)?;
742 Ok(VmoId::new(vmo_id.id))
743 }
744
745 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
746 self.common.detach_vmo(vmo_id).await
747 }
748
749 async fn read_at_with_opts_traced(
750 &self,
751 buffer_slice: MutableBufferSlice<'_>,
752 device_offset: u64,
753 opts: ReadOptions,
754 trace_flow_id: u64,
755 ) -> Result<(), zx::Status> {
756 self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
757 }
758
759 async fn write_at_with_opts_traced(
760 &self,
761 buffer_slice: BufferSlice<'_>,
762 device_offset: u64,
763 opts: WriteOptions,
764 trace_flow_id: u64,
765 ) -> Result<(), zx::Status> {
766 self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
767 }
768
769 async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
770 self.common.trim(range, trace_flow_id).await
771 }
772
773 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
774 self.common.flush(trace_flow_id).await
775 }
776
777 fn barrier(&self) {
778 self.common.barrier()
779 }
780
781 async fn close(&self) -> Result<(), zx::Status> {
782 let () =
783 self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
784 Ok(())
785 }
786
787 fn block_size(&self) -> u32 {
788 self.common.block_size()
789 }
790
791 fn block_count(&self) -> u64 {
792 self.common.block_count()
793 }
794
795 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
796 self.common.max_transfer_blocks()
797 }
798
799 fn block_flags(&self) -> BlockDeviceFlag {
800 self.common.block_flags()
801 }
802
803 fn is_connected(&self) -> bool {
804 self.common.is_connected()
805 }
806}
807
808pub struct RemoteBlockClientSync {
809 session: block::SessionSynchronousProxy,
810 common: Common,
811}
812
813impl RemoteBlockClientSync {
814 pub fn new(
818 client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
819 ) -> Result<Self, zx::Status> {
820 let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
821 let info = remote
822 .get_info(zx::MonotonicInstant::INFINITE)
823 .map_err(fidl_to_status)?
824 .map_err(zx::Status::from_raw)?;
825 let (client, server) = fidl::endpoints::create_endpoints();
826 let () = remote.open_session(server).map_err(fidl_to_status)?;
827 let session = block::SessionSynchronousProxy::new(client.into_channel());
828 let fifo = session
829 .get_fifo(zx::MonotonicInstant::INFINITE)
830 .map_err(fidl_to_status)?
831 .map_err(zx::Status::from_raw)?;
832 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
833 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
834 let vmo_id = session
835 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
836 .map_err(fidl_to_status)?
837 .map_err(zx::Status::from_raw)?;
838 let vmo_id = VmoId::new(vmo_id.id);
839
840 let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
843 std::thread::spawn(move || {
844 let mut executor = fasync::LocalExecutor::default();
845 let fifo = fasync::Fifo::from_fifo(fifo);
846 let common = Common::new(fifo, &info, temp_vmo, vmo_id);
847 let fifo_state = common.fifo_state.clone();
848 let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
849 executor.run_singlethreaded(FifoPoller { fifo_state });
850 });
851 block_on(receiver).map_err(|_| zx::Status::CANCELED)?
852 }
853
854 pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
855 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
856 let vmo_id = self
857 .session
858 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
859 .map_err(fidl_to_status)?
860 .map_err(zx::Status::from_raw)?;
861 Ok(VmoId::new(vmo_id.id))
862 }
863
864 pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
865 block_on(self.common.detach_vmo(vmo_id))
866 }
867
868 pub fn read_at(
869 &self,
870 buffer_slice: MutableBufferSlice<'_>,
871 device_offset: u64,
872 ) -> Result<(), zx::Status> {
873 block_on(self.common.read_at(
874 buffer_slice,
875 device_offset,
876 ReadOptions::default(),
877 NO_TRACE_ID,
878 ))
879 }
880
881 pub fn write_at(
882 &self,
883 buffer_slice: BufferSlice<'_>,
884 device_offset: u64,
885 ) -> Result<(), zx::Status> {
886 block_on(self.common.write_at(
887 buffer_slice,
888 device_offset,
889 WriteOptions::default(),
890 NO_TRACE_ID,
891 ))
892 }
893
894 pub fn flush(&self) -> Result<(), zx::Status> {
895 block_on(self.common.flush(NO_TRACE_ID))
896 }
897
898 pub fn close(&self) -> Result<(), zx::Status> {
899 let () = self
900 .session
901 .close(zx::MonotonicInstant::INFINITE)
902 .map_err(fidl_to_status)?
903 .map_err(zx::Status::from_raw)?;
904 Ok(())
905 }
906
907 pub fn block_size(&self) -> u32 {
908 self.common.block_size()
909 }
910
911 pub fn block_count(&self) -> u64 {
912 self.common.block_count()
913 }
914
915 pub fn is_connected(&self) -> bool {
916 self.common.is_connected()
917 }
918}
919
920impl Drop for RemoteBlockClientSync {
921 fn drop(&mut self) {
922 let _ = self.close();
924 }
925}
926
927struct FifoPoller {
929 fifo_state: FifoStateRef,
930}
931
932impl Future for FifoPoller {
933 type Output = ();
934
935 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
936 let mut state_lock = self.fifo_state.lock();
937 let state = state_lock.deref_mut(); if state.poll_send_requests(context) {
941 return Poll::Ready(());
942 }
943
944 let fifo = state.fifo.as_ref().unwrap(); loop {
947 let mut response = MaybeUninit::uninit();
948 match fifo.try_read(context, &mut response) {
949 Poll::Pending => {
950 state.poller_waker = Some(context.waker().clone());
951 return Poll::Pending;
952 }
953 Poll::Ready(Ok(_)) => {
954 let response = unsafe { response.assume_init() };
955 let request_id = response.reqid;
956 if let Some(request_state) = state.map.get_mut(&request_id) {
958 request_state.result.replace(zx::Status::from_raw(response.status));
959 if let Some(waker) = request_state.waker.take() {
960 waker.wake();
961 }
962 }
963 }
964 Poll::Ready(Err(_)) => {
965 state.terminate();
966 return Poll::Ready(());
967 }
968 }
969 }
970 }
971}
972
973fn update_outstanding_requests_counter(outstanding: usize) {
974 trace::counter!("storage", "block-requests", 0, "outstanding" => outstanding);
975}
976
977#[cfg(test)]
978mod tests {
979 use super::{
980 BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
981 RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
982 };
983 use block_protocol::ReadOptions;
984 use block_server::{BlockServer, DeviceInfo, PartitionInfo};
985 use fidl::endpoints::RequestStream as _;
986 use fidl_fuchsia_storage_block as block;
987 use fuchsia_async as fasync;
988 use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
989 use futures::join;
990 use futures::stream::StreamExt as _;
991 use futures::stream::futures_unordered::FuturesUnordered;
992 use ramdevice_client::RamdiskClient;
993 use std::borrow::Cow;
994 use std::num::NonZero;
995 use std::sync::Arc;
996 use std::sync::atomic::{AtomicBool, Ordering};
997
998 const RAMDISK_BLOCK_SIZE: u64 = 1024;
999 const RAMDISK_BLOCK_COUNT: u64 = 1024;
1000
1001 pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
1002 let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
1003 .await
1004 .expect("RamdiskClient::create failed");
1005 let client_end = ramdisk.open().expect("ramdisk.open failed");
1006 let proxy = client_end.into_proxy();
1007 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1008 assert_eq!(block_client.block_size(), 1024);
1009 let client_end = ramdisk.open().expect("ramdisk.open failed");
1010 let proxy = client_end.into_proxy();
1011 (ramdisk, proxy, block_client)
1012 }
1013
1014 #[fuchsia::test]
1015 async fn test_against_ram_disk() {
1016 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1017
1018 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1019 vmo.write(b"hello", 5).expect("vmo.write failed");
1020 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1021 block_client
1022 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1023 .await
1024 .expect("write_at failed");
1025 block_client
1026 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1027 .await
1028 .expect("read_at failed");
1029 let mut buf: [u8; 5] = Default::default();
1030 vmo.read(&mut buf, 1029).expect("vmo.read failed");
1031 assert_eq!(&buf, b"hello");
1032 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1033 }
1034
1035 #[fuchsia::test]
1036 async fn test_alignment() {
1037 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1038 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1039 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1040 block_client
1041 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1042 .await
1043 .expect_err("expected failure due to bad alignment");
1044 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1045 }
1046
1047 #[fuchsia::test]
1048 async fn test_parallel_io() {
1049 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1050 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1051 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1052 let mut reads = Vec::new();
1053 for _ in 0..1024 {
1054 reads.push(
1055 block_client
1056 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1057 .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1058 );
1059 }
1060 futures::future::join_all(reads).await;
1061 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1062 }
1063
1064 #[fuchsia::test]
1065 async fn test_closed_device() {
1066 let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1067 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1068 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1069 let mut reads = Vec::new();
1070 for _ in 0..1024 {
1071 reads.push(
1072 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1073 );
1074 }
1075 assert!(block_client.is_connected());
1076 let _ = futures::join!(futures::future::join_all(reads), async {
1077 ramdisk.destroy().await.expect("ramdisk.destroy failed")
1078 });
1079 while block_client
1081 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1082 .await
1083 .is_ok()
1084 {}
1085
1086 while block_client.is_connected() {
1089 fasync::Timer::new(fasync::MonotonicInstant::after(
1091 zx::MonotonicDuration::from_millis(500),
1092 ))
1093 .await;
1094 }
1095
1096 assert_eq!(block_client.is_connected(), false);
1098 let _ = block_client.detach_vmo(vmo_id).await;
1099 }
1100
1101 #[fuchsia::test]
1102 async fn test_cancelled_reads() {
1103 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1104 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1105 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1106 {
1107 let mut reads = FuturesUnordered::new();
1108 for _ in 0..1024 {
1109 reads.push(
1110 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1111 );
1112 }
1113 for _ in 0..500 {
1115 reads.next().await;
1116 }
1117 }
1118 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1119 }
1120
1121 #[fuchsia::test]
1122 async fn test_parallel_large_read_and_write_with_memory_succeds() {
1123 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1124 let block_client_ref = &block_client;
1125 let test_one = |offset, len, fill| async move {
1126 let buf = vec![fill; len];
1127 block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1128 let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1130 block_client_ref
1131 .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1132 .await
1133 .expect("read_at failed");
1134 assert_eq!(
1135 &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1136 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1137 );
1138 assert_eq!(
1139 &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1140 &buf[..]
1141 );
1142 assert_eq!(
1143 &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1144 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1145 );
1146 };
1147 const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1148 join!(
1149 test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1150 test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1151 );
1152 }
1153
1154 struct FakeBlockServer<'a> {
1158 server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1159 channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1160 fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1161 }
1162
1163 impl<'a> FakeBlockServer<'a> {
1164 fn new(
1176 server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1177 channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1178 fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1179 ) -> FakeBlockServer<'a> {
1180 FakeBlockServer {
1181 server_channel: Some(server_channel),
1182 channel_handler: Box::new(channel_handler),
1183 fifo_handler: Box::new(fifo_handler),
1184 }
1185 }
1186
1187 async fn run(&mut self) {
1189 let server = self.server_channel.take().unwrap();
1190
1191 let (server_fifo, client_fifo) =
1193 zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1194 .expect("Fifo::create failed");
1195 let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1196
1197 let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1198 let fifo_future = Abortable::new(
1199 async {
1200 let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1201 let (mut reader, mut writer) = fifo.async_io();
1202 let mut request = BlockFifoRequest::default();
1203 loop {
1204 match reader.read_entries(&mut request).await {
1205 Ok(1) => {}
1206 Err(zx::Status::PEER_CLOSED) => break,
1207 Err(e) => panic!("read_entry failed {:?}", e),
1208 _ => unreachable!(),
1209 };
1210
1211 let response = self.fifo_handler.as_ref()(request);
1212 writer
1213 .write_entries(std::slice::from_ref(&response))
1214 .await
1215 .expect("write_entries failed");
1216 }
1217 },
1218 fifo_future_abort_registration,
1219 );
1220
1221 let channel_future = async {
1222 server
1223 .into_stream()
1224 .for_each_concurrent(None, |request| async {
1225 let request = request.expect("unexpected fidl error");
1226
1227 match request {
1228 block::BlockRequest::GetInfo { responder } => {
1229 responder
1230 .send(Ok(&block::BlockInfo {
1231 block_count: 1024,
1232 block_size: 512,
1233 max_transfer_size: 1024 * 1024,
1234 flags: block::DeviceFlag::empty(),
1235 }))
1236 .expect("send failed");
1237 }
1238 block::BlockRequest::OpenSession { session, control_handle: _ } => {
1239 let stream = session.into_stream();
1240 stream
1241 .for_each(|request| async {
1242 let request = request.expect("unexpected fidl error");
1243 if self.channel_handler.as_ref()(&request) {
1246 return;
1247 }
1248 match request {
1249 block::SessionRequest::GetFifo { responder } => {
1250 match maybe_server_fifo.lock().take() {
1251 Some(fifo) => {
1252 responder.send(Ok(fifo.downcast()))
1253 }
1254 None => responder.send(Err(
1255 zx::Status::NO_RESOURCES.into_raw(),
1256 )),
1257 }
1258 .expect("send failed")
1259 }
1260 block::SessionRequest::AttachVmo {
1261 vmo: _,
1262 responder,
1263 } => responder
1264 .send(Ok(&block::VmoId { id: 1 }))
1265 .expect("send failed"),
1266 block::SessionRequest::Close { responder } => {
1267 fifo_future_abort.abort();
1268 responder.send(Ok(())).expect("send failed")
1269 }
1270 }
1271 })
1272 .await
1273 }
1274 _ => panic!("Unexpected message"),
1275 }
1276 })
1277 .await;
1278 };
1279
1280 let _result = join!(fifo_future, channel_future);
1281 }
1283 }
1284
1285 #[fuchsia::test]
1286 async fn test_block_close_is_called() {
1287 let close_called = fuchsia_sync::Mutex::new(false);
1288 let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1289
1290 std::thread::spawn(move || {
1291 let _block_client =
1292 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1293 });
1295
1296 let channel_handler = |request: &block::SessionRequest| -> bool {
1297 if let block::SessionRequest::Close { .. } = request {
1298 *close_called.lock() = true;
1299 }
1300 false
1301 };
1302 FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1303
1304 assert!(*close_called.lock());
1306 }
1307
1308 #[fuchsia::test]
1309 async fn test_block_flush_is_called() {
1310 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1311
1312 struct Interface {
1313 flush_called: Arc<AtomicBool>,
1314 }
1315 impl block_server::async_interface::Interface for Interface {
1316 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1317 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1318 device_flags: fidl_fuchsia_storage_block::DeviceFlag::empty(),
1319 max_transfer_blocks: None,
1320 block_range: Some(0..1000),
1321 type_guid: [0; 16],
1322 instance_guid: [0; 16],
1323 name: "foo".to_string(),
1324 flags: 0,
1325 }))
1326 }
1327
1328 async fn read(
1329 &self,
1330 _device_block_offset: u64,
1331 _block_count: u32,
1332 _vmo: &Arc<zx::Vmo>,
1333 _vmo_offset: u64,
1334 _opts: ReadOptions,
1335 _trace_flow_id: Option<NonZero<u64>>,
1336 ) -> Result<(), zx::Status> {
1337 unreachable!();
1338 }
1339
1340 async fn write(
1341 &self,
1342 _device_block_offset: u64,
1343 _block_count: u32,
1344 _vmo: &Arc<zx::Vmo>,
1345 _vmo_offset: u64,
1346 _opts: WriteOptions,
1347 _trace_flow_id: Option<NonZero<u64>>,
1348 ) -> Result<(), zx::Status> {
1349 unreachable!();
1350 }
1351
1352 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1353 self.flush_called.store(true, Ordering::Relaxed);
1354 Ok(())
1355 }
1356
1357 async fn trim(
1358 &self,
1359 _device_block_offset: u64,
1360 _block_count: u32,
1361 _trace_flow_id: Option<NonZero<u64>>,
1362 ) -> Result<(), zx::Status> {
1363 unreachable!();
1364 }
1365 }
1366
1367 let flush_called = Arc::new(AtomicBool::new(false));
1368
1369 futures::join!(
1370 async {
1371 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1372
1373 block_client.flush().await.expect("flush failed");
1374 },
1375 async {
1376 let block_server = BlockServer::new(
1377 512,
1378 Arc::new(Interface { flush_called: flush_called.clone() }),
1379 );
1380 block_server.handle_requests(stream.cast_stream()).await.unwrap();
1381 }
1382 );
1383
1384 assert!(flush_called.load(Ordering::Relaxed));
1385 }
1386
1387 #[fuchsia::test]
1388 async fn test_trace_flow_ids_set() {
1389 let (proxy, server) = fidl::endpoints::create_proxy();
1390
1391 futures::join!(
1392 async {
1393 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1394 block_client.flush().await.expect("flush failed");
1395 },
1396 async {
1397 let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1398 let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1399 if request.trace_flow_id > 0 {
1400 *flow_id.lock() = Some(request.trace_flow_id);
1401 }
1402 BlockFifoResponse {
1403 status: zx::Status::OK.into_raw(),
1404 reqid: request.reqid,
1405 ..Default::default()
1406 }
1407 };
1408 FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1409 assert!(flow_id.lock().is_some());
1411 }
1412 );
1413 }
1414}