1use fidl_fuchsia_storage_block::{MAX_TRANSFER_UNBOUNDED, VMOID_INVALID};
13use fuchsia_sync::Mutex;
14use futures::channel::oneshot;
15use futures::executor::block_on;
16use std::borrow::Borrow;
17use std::collections::HashMap;
18use std::future::Future;
19use std::hash::{Hash, Hasher};
20use std::mem::MaybeUninit;
21use std::num::NonZero;
22use std::ops::{DerefMut, Range};
23use std::pin::Pin;
24use std::sync::atomic::{AtomicU16, Ordering};
25use std::sync::{Arc, LazyLock};
26use std::task::{Context, Poll, Waker};
27use zx::sys::zx_handle_t;
28use zx::{self as zx, HandleBased as _};
29use {fidl_fuchsia_storage_block as block, fuchsia_async as fasync, storage_trace as trace};
30
31pub use cache::Cache;
32
33pub use block::DeviceFlag as BlockDeviceFlag;
34
35pub use block_protocol::*;
36
37pub mod cache;
38
39const TEMP_VMO_SIZE: usize = 65536;
40
41pub const NO_TRACE_ID: u64 = 0;
43
44pub use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
45
46fn fidl_to_status(error: fidl::Error) -> zx::Status {
47 match error {
48 fidl::Error::ClientChannelClosed { status, .. } => status,
49 _ => zx::Status::INTERNAL,
50 }
51}
52
53fn opcode_str(opcode: u8) -> &'static str {
54 match BlockOpcode::from_primitive(opcode) {
55 Some(BlockOpcode::Read) => "read",
56 Some(BlockOpcode::Write) => "write",
57 Some(BlockOpcode::Flush) => "flush",
58 Some(BlockOpcode::Trim) => "trim",
59 Some(BlockOpcode::CloseVmo) => "close_vmo",
60 None => "unknown",
61 }
62}
63
64fn generate_trace_flow_id(request_id: u32) -> u64 {
67 static SELF_HANDLE: LazyLock<zx_handle_t> =
68 LazyLock::new(|| fuchsia_runtime::process_self().raw_handle());
69 *SELF_HANDLE as u64 + (request_id as u64) << 32
70}
71
72pub enum BufferSlice<'a> {
73 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
74 Memory(&'a [u8]),
75}
76
77impl<'a> BufferSlice<'a> {
78 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
79 BufferSlice::VmoId { vmo_id, offset, length }
80 }
81}
82
83impl<'a> From<&'a [u8]> for BufferSlice<'a> {
84 fn from(buf: &'a [u8]) -> Self {
85 BufferSlice::Memory(buf)
86 }
87}
88
89pub enum MutableBufferSlice<'a> {
90 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
91 Memory(&'a mut [u8]),
92}
93
94impl<'a> MutableBufferSlice<'a> {
95 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
96 MutableBufferSlice::VmoId { vmo_id, offset, length }
97 }
98}
99
100impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
101 fn from(buf: &'a mut [u8]) -> Self {
102 MutableBufferSlice::Memory(buf)
103 }
104}
105
106#[derive(Default)]
107struct RequestState {
108 result: Option<zx::Status>,
109 waker: Option<Waker>,
110}
111
112#[derive(Default)]
113struct FifoState {
114 fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
116
117 next_request_id: u32,
119
120 queue: std::collections::VecDeque<BlockFifoRequest>,
122
123 map: HashMap<u32, RequestState>,
125
126 poller_waker: Option<Waker>,
128
129 attach_barrier: bool,
131}
132
133impl FifoState {
134 fn terminate(&mut self) {
135 self.fifo.take();
136 for (_, request_state) in self.map.iter_mut() {
137 request_state.result.get_or_insert(zx::Status::CANCELED);
138 if let Some(waker) = request_state.waker.take() {
139 waker.wake();
140 }
141 }
142 if let Some(waker) = self.poller_waker.take() {
143 waker.wake();
144 }
145 }
146
147 fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
149 let fifo = if let Some(fifo) = self.fifo.as_ref() {
150 fifo
151 } else {
152 return true;
153 };
154
155 loop {
156 let slice = self.queue.as_slices().0;
157 if slice.is_empty() {
158 return false;
159 }
160 match fifo.try_write(context, slice) {
161 Poll::Ready(Ok(sent)) => {
162 self.queue.drain(0..sent);
163 }
164 Poll::Ready(Err(_)) => {
165 self.terminate();
166 return true;
167 }
168 Poll::Pending => {
169 return false;
170 }
171 }
172 }
173 }
174}
175
176type FifoStateRef = Arc<Mutex<FifoState>>;
177
178struct ResponseFuture {
180 request_id: u32,
181 fifo_state: FifoStateRef,
182}
183
184impl ResponseFuture {
185 fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
186 ResponseFuture { request_id, fifo_state }
187 }
188}
189
190impl Future for ResponseFuture {
191 type Output = Result<(), zx::Status>;
192
193 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
194 let mut state = self.fifo_state.lock();
195 let request_state = state.map.get_mut(&self.request_id).unwrap();
196 if let Some(result) = request_state.result {
197 Poll::Ready(result.into())
198 } else {
199 request_state.waker.replace(context.waker().clone());
200 Poll::Pending
201 }
202 }
203}
204
205impl Drop for ResponseFuture {
206 fn drop(&mut self) {
207 self.fifo_state.lock().map.remove(&self.request_id).unwrap();
208 }
209}
210
211#[derive(Debug)]
213#[must_use]
214pub struct VmoId(AtomicU16);
215
216impl VmoId {
217 pub fn new(id: u16) -> Self {
219 Self(AtomicU16::new(id))
220 }
221
222 pub fn take(&self) -> Self {
224 Self(AtomicU16::new(self.0.swap(VMOID_INVALID, Ordering::Relaxed)))
225 }
226
227 pub fn is_valid(&self) -> bool {
228 self.id() != VMOID_INVALID
229 }
230
231 #[must_use]
233 pub fn into_id(self) -> u16 {
234 self.0.swap(VMOID_INVALID, Ordering::Relaxed)
235 }
236
237 pub fn id(&self) -> u16 {
238 self.0.load(Ordering::Relaxed)
239 }
240}
241
242impl PartialEq for VmoId {
243 fn eq(&self, other: &Self) -> bool {
244 self.id() == other.id()
245 }
246}
247
248impl Eq for VmoId {}
249
250impl Drop for VmoId {
251 fn drop(&mut self) {
252 assert_eq!(self.0.load(Ordering::Relaxed), VMOID_INVALID, "Did you forget to detach?");
253 }
254}
255
256impl Hash for VmoId {
257 fn hash<H: Hasher>(&self, state: &mut H) {
258 self.id().hash(state);
259 }
260}
261
262pub trait BlockClient: Send + Sync {
266 fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
268
269 fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
271
272 fn read_at(
274 &self,
275 buffer_slice: MutableBufferSlice<'_>,
276 device_offset: u64,
277 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
278 self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
279 }
280
281 fn read_at_with_opts(
282 &self,
283 buffer_slice: MutableBufferSlice<'_>,
284 device_offset: u64,
285 opts: ReadOptions,
286 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
287 self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
288 }
289
290 fn read_at_with_opts_traced(
291 &self,
292 buffer_slice: MutableBufferSlice<'_>,
293 device_offset: u64,
294 opts: ReadOptions,
295 trace_flow_id: u64,
296 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
297
298 fn write_at(
300 &self,
301 buffer_slice: BufferSlice<'_>,
302 device_offset: u64,
303 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
304 self.write_at_with_opts_traced(
305 buffer_slice,
306 device_offset,
307 WriteOptions::default(),
308 NO_TRACE_ID,
309 )
310 }
311
312 fn write_at_with_opts(
313 &self,
314 buffer_slice: BufferSlice<'_>,
315 device_offset: u64,
316 opts: WriteOptions,
317 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
318 self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
319 }
320
321 fn write_at_with_opts_traced(
322 &self,
323 buffer_slice: BufferSlice<'_>,
324 device_offset: u64,
325 opts: WriteOptions,
326 trace_flow_id: u64,
327 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
328
329 fn trim(
331 &self,
332 device_range: Range<u64>,
333 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
334 self.trim_traced(device_range, NO_TRACE_ID)
335 }
336
337 fn trim_traced(
338 &self,
339 device_range: Range<u64>,
340 trace_flow_id: u64,
341 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
342
343 fn barrier(&self);
348
349 fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
350 self.flush_traced(NO_TRACE_ID)
351 }
352
353 fn flush_traced(
355 &self,
356 trace_flow_id: u64,
357 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
358
359 fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
361
362 fn block_size(&self) -> u32;
364
365 fn block_count(&self) -> u64;
367
368 fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
370
371 fn block_flags(&self) -> BlockDeviceFlag;
373
374 fn is_connected(&self) -> bool;
376}
377
378struct Common {
379 block_size: u32,
380 block_count: u64,
381 max_transfer_blocks: Option<NonZero<u32>>,
382 block_flags: BlockDeviceFlag,
383 fifo_state: FifoStateRef,
384 temp_vmo: futures::lock::Mutex<zx::Vmo>,
385 temp_vmo_id: VmoId,
386}
387
388impl Common {
389 fn new(
390 fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
391 info: &block::BlockInfo,
392 temp_vmo: zx::Vmo,
393 temp_vmo_id: VmoId,
394 ) -> Self {
395 let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
396 fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
397 Self {
398 block_size: info.block_size,
399 block_count: info.block_count,
400 max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
401 NonZero::new(info.max_transfer_size / info.block_size)
402 } else {
403 None
404 },
405 block_flags: info.flags,
406 fifo_state,
407 temp_vmo: futures::lock::Mutex::new(temp_vmo),
408 temp_vmo_id,
409 }
410 }
411
412 fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
413 if bytes % self.block_size as u64 != 0 {
414 Err(zx::Status::INVALID_ARGS)
415 } else {
416 Ok(bytes / self.block_size as u64)
417 }
418 }
419
420 async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
422 let (request_id, trace_flow_id) = {
423 let mut state = self.fifo_state.lock();
424
425 let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
426 if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
427 && state.attach_barrier
428 {
429 flags |= BlockIoFlag::PRE_BARRIER;
430 request.command.flags = flags.bits();
431 state.attach_barrier = false;
432 }
433
434 if state.fifo.is_none() {
435 return Err(zx::Status::CANCELED);
437 }
438 trace::duration!(
439 "storage",
440 "block_client::send::start",
441 "op" => opcode_str(request.command.opcode),
442 "len" => request.length * self.block_size
443 );
444 let request_id = state.next_request_id;
445 state.next_request_id = state.next_request_id.overflowing_add(1).0;
446 assert!(
447 state.map.insert(request_id, RequestState::default()).is_none(),
448 "request id in use!"
449 );
450 request.reqid = request_id;
451 if request.trace_flow_id == NO_TRACE_ID {
452 request.trace_flow_id = generate_trace_flow_id(request_id);
453 }
454 let trace_flow_id = request.trace_flow_id;
455 trace::flow_begin!("storage", "block_client::send", trace_flow_id.into());
456 state.queue.push_back(request);
457 if let Some(waker) = state.poller_waker.clone() {
458 state.poll_send_requests(&mut Context::from_waker(&waker));
459 }
460 (request_id, trace_flow_id)
461 };
462 ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
463 trace::duration!("storage", "block_client::send::end");
464 trace::flow_end!("storage", "block_client::send", trace_flow_id.into());
465 Ok(())
466 }
467
468 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
469 self.send(BlockFifoRequest {
470 command: BlockFifoCommand {
471 opcode: BlockOpcode::CloseVmo.into_primitive(),
472 flags: 0,
473 ..Default::default()
474 },
475 vmoid: vmo_id.into_id(),
476 ..Default::default()
477 })
478 .await
479 }
480
481 async fn read_at(
482 &self,
483 buffer_slice: MutableBufferSlice<'_>,
484 device_offset: u64,
485 opts: ReadOptions,
486 trace_flow_id: u64,
487 ) -> Result<(), zx::Status> {
488 let mut flags = BlockIoFlag::empty();
489
490 if opts.inline_crypto.is_enabled {
491 flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
492 }
493
494 match buffer_slice {
495 MutableBufferSlice::VmoId { vmo_id, offset, length } => {
496 self.send(BlockFifoRequest {
497 command: BlockFifoCommand {
498 opcode: BlockOpcode::Read.into_primitive(),
499 flags: flags.bits(),
500 ..Default::default()
501 },
502 vmoid: vmo_id.id(),
503 length: self
504 .to_blocks(length)?
505 .try_into()
506 .map_err(|_| zx::Status::INVALID_ARGS)?,
507 vmo_offset: self.to_blocks(offset)?,
508 dev_offset: self.to_blocks(device_offset)?,
509 trace_flow_id,
510 dun: opts.inline_crypto.dun,
511 slot: opts.inline_crypto.slot,
512 ..Default::default()
513 })
514 .await?
515 }
516 MutableBufferSlice::Memory(mut slice) => {
517 let temp_vmo = self.temp_vmo.lock().await;
518 let mut device_block = self.to_blocks(device_offset)?;
519 loop {
520 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
521 let block_count = self.to_blocks(to_do as u64)? as u32;
522 self.send(BlockFifoRequest {
523 command: BlockFifoCommand {
524 opcode: BlockOpcode::Read.into_primitive(),
525 flags: flags.bits(),
526 ..Default::default()
527 },
528 vmoid: self.temp_vmo_id.id(),
529 length: block_count,
530 vmo_offset: 0,
531 dev_offset: device_block,
532 trace_flow_id,
533 dun: opts.inline_crypto.dun,
534 slot: opts.inline_crypto.slot,
535 ..Default::default()
536 })
537 .await?;
538 temp_vmo.read(&mut slice[..to_do], 0)?;
539 if to_do == slice.len() {
540 break;
541 }
542 device_block += block_count as u64;
543 slice = &mut slice[to_do..];
544 }
545 }
546 }
547 Ok(())
548 }
549
550 async fn write_at(
551 &self,
552 buffer_slice: BufferSlice<'_>,
553 device_offset: u64,
554 opts: WriteOptions,
555 trace_flow_id: u64,
556 ) -> Result<(), zx::Status> {
557 let mut flags = BlockIoFlag::empty();
558
559 if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
560 flags |= BlockIoFlag::FORCE_ACCESS;
561 }
562
563 if opts.flags.contains(WriteFlags::PRE_BARRIER) {
564 flags |= BlockIoFlag::PRE_BARRIER;
565 }
566
567 if opts.inline_crypto.is_enabled {
568 flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
569 }
570
571 match buffer_slice {
572 BufferSlice::VmoId { vmo_id, offset, length } => {
573 self.send(BlockFifoRequest {
574 command: BlockFifoCommand {
575 opcode: BlockOpcode::Write.into_primitive(),
576 flags: flags.bits(),
577 ..Default::default()
578 },
579 vmoid: vmo_id.id(),
580 length: self
581 .to_blocks(length)?
582 .try_into()
583 .map_err(|_| zx::Status::INVALID_ARGS)?,
584 vmo_offset: self.to_blocks(offset)?,
585 dev_offset: self.to_blocks(device_offset)?,
586 trace_flow_id,
587 dun: opts.inline_crypto.dun,
588 slot: opts.inline_crypto.slot,
589 ..Default::default()
590 })
591 .await?;
592 }
593 BufferSlice::Memory(mut slice) => {
594 let temp_vmo = self.temp_vmo.lock().await;
595 let mut device_block = self.to_blocks(device_offset)?;
596 loop {
597 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
598 let block_count = self.to_blocks(to_do as u64)? as u32;
599 temp_vmo.write(&slice[..to_do], 0)?;
600 self.send(BlockFifoRequest {
601 command: BlockFifoCommand {
602 opcode: BlockOpcode::Write.into_primitive(),
603 flags: flags.bits(),
604 ..Default::default()
605 },
606 vmoid: self.temp_vmo_id.id(),
607 length: block_count,
608 vmo_offset: 0,
609 dev_offset: device_block,
610 trace_flow_id,
611 dun: opts.inline_crypto.dun,
612 slot: opts.inline_crypto.slot,
613 ..Default::default()
614 })
615 .await?;
616 if to_do == slice.len() {
617 break;
618 }
619 device_block += block_count as u64;
620 slice = &slice[to_do..];
621 }
622 }
623 }
624 Ok(())
625 }
626
627 async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
628 let length = self.to_blocks(device_range.end - device_range.start)? as u32;
629 let dev_offset = self.to_blocks(device_range.start)?;
630 self.send(BlockFifoRequest {
631 command: BlockFifoCommand {
632 opcode: BlockOpcode::Trim.into_primitive(),
633 flags: 0,
634 ..Default::default()
635 },
636 vmoid: VMOID_INVALID,
637 length,
638 dev_offset,
639 trace_flow_id,
640 ..Default::default()
641 })
642 .await
643 }
644
645 async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
646 self.send(BlockFifoRequest {
647 command: BlockFifoCommand {
648 opcode: BlockOpcode::Flush.into_primitive(),
649 flags: 0,
650 ..Default::default()
651 },
652 vmoid: VMOID_INVALID,
653 trace_flow_id,
654 ..Default::default()
655 })
656 .await
657 }
658
659 fn barrier(&self) {
660 self.fifo_state.lock().attach_barrier = true;
661 }
662
663 fn block_size(&self) -> u32 {
664 self.block_size
665 }
666
667 fn block_count(&self) -> u64 {
668 self.block_count
669 }
670
671 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
672 self.max_transfer_blocks.clone()
673 }
674
675 fn block_flags(&self) -> BlockDeviceFlag {
676 self.block_flags
677 }
678
679 fn is_connected(&self) -> bool {
680 self.fifo_state.lock().fifo.is_some()
681 }
682}
683
684impl Drop for Common {
685 fn drop(&mut self) {
686 let _ = self.temp_vmo_id.take().into_id();
689 self.fifo_state.lock().terminate();
690 }
691}
692
693pub struct RemoteBlockClient {
695 session: block::SessionProxy,
696 common: Common,
697}
698
699impl RemoteBlockClient {
700 pub async fn new(remote: impl Borrow<block::BlockProxy>) -> Result<Self, zx::Status> {
702 let remote = remote.borrow();
703 let info =
704 remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
705 let (session, server) = fidl::endpoints::create_proxy();
706 let () = remote.open_session(server).map_err(fidl_to_status)?;
707 Self::from_session(info, session).await
708 }
709
710 pub async fn from_session(
711 info: block::BlockInfo,
712 session: block::SessionProxy,
713 ) -> Result<Self, zx::Status> {
714 let fifo =
715 session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
716 let fifo = fasync::Fifo::from_fifo(fifo);
717 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
718 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
719 let vmo_id =
720 session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
721 let vmo_id = VmoId::new(vmo_id.id);
722 Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
723 }
724}
725
726impl BlockClient for RemoteBlockClient {
727 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
728 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
729 let vmo_id = self
730 .session
731 .attach_vmo(dup)
732 .await
733 .map_err(fidl_to_status)?
734 .map_err(zx::Status::from_raw)?;
735 Ok(VmoId::new(vmo_id.id))
736 }
737
738 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
739 self.common.detach_vmo(vmo_id).await
740 }
741
742 async fn read_at_with_opts_traced(
743 &self,
744 buffer_slice: MutableBufferSlice<'_>,
745 device_offset: u64,
746 opts: ReadOptions,
747 trace_flow_id: u64,
748 ) -> Result<(), zx::Status> {
749 self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
750 }
751
752 async fn write_at_with_opts_traced(
753 &self,
754 buffer_slice: BufferSlice<'_>,
755 device_offset: u64,
756 opts: WriteOptions,
757 trace_flow_id: u64,
758 ) -> Result<(), zx::Status> {
759 self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
760 }
761
762 async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
763 self.common.trim(range, trace_flow_id).await
764 }
765
766 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
767 self.common.flush(trace_flow_id).await
768 }
769
770 fn barrier(&self) {
771 self.common.barrier()
772 }
773
774 async fn close(&self) -> Result<(), zx::Status> {
775 let () =
776 self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
777 Ok(())
778 }
779
780 fn block_size(&self) -> u32 {
781 self.common.block_size()
782 }
783
784 fn block_count(&self) -> u64 {
785 self.common.block_count()
786 }
787
788 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
789 self.common.max_transfer_blocks()
790 }
791
792 fn block_flags(&self) -> BlockDeviceFlag {
793 self.common.block_flags()
794 }
795
796 fn is_connected(&self) -> bool {
797 self.common.is_connected()
798 }
799}
800
801pub struct RemoteBlockClientSync {
802 session: block::SessionSynchronousProxy,
803 common: Common,
804}
805
806impl RemoteBlockClientSync {
807 pub fn new(
811 client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
812 ) -> Result<Self, zx::Status> {
813 let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
814 let info = remote
815 .get_info(zx::MonotonicInstant::INFINITE)
816 .map_err(fidl_to_status)?
817 .map_err(zx::Status::from_raw)?;
818 let (client, server) = fidl::endpoints::create_endpoints();
819 let () = remote.open_session(server).map_err(fidl_to_status)?;
820 let session = block::SessionSynchronousProxy::new(client.into_channel());
821 let fifo = session
822 .get_fifo(zx::MonotonicInstant::INFINITE)
823 .map_err(fidl_to_status)?
824 .map_err(zx::Status::from_raw)?;
825 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
826 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
827 let vmo_id = session
828 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
829 .map_err(fidl_to_status)?
830 .map_err(zx::Status::from_raw)?;
831 let vmo_id = VmoId::new(vmo_id.id);
832
833 let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
836 std::thread::spawn(move || {
837 let mut executor = fasync::LocalExecutor::default();
838 let fifo = fasync::Fifo::from_fifo(fifo);
839 let common = Common::new(fifo, &info, temp_vmo, vmo_id);
840 let fifo_state = common.fifo_state.clone();
841 let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
842 executor.run_singlethreaded(FifoPoller { fifo_state });
843 });
844 block_on(receiver).map_err(|_| zx::Status::CANCELED)?
845 }
846
847 pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
848 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
849 let vmo_id = self
850 .session
851 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
852 .map_err(fidl_to_status)?
853 .map_err(zx::Status::from_raw)?;
854 Ok(VmoId::new(vmo_id.id))
855 }
856
857 pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
858 block_on(self.common.detach_vmo(vmo_id))
859 }
860
861 pub fn read_at(
862 &self,
863 buffer_slice: MutableBufferSlice<'_>,
864 device_offset: u64,
865 ) -> Result<(), zx::Status> {
866 block_on(self.common.read_at(
867 buffer_slice,
868 device_offset,
869 ReadOptions::default(),
870 NO_TRACE_ID,
871 ))
872 }
873
874 pub fn write_at(
875 &self,
876 buffer_slice: BufferSlice<'_>,
877 device_offset: u64,
878 ) -> Result<(), zx::Status> {
879 block_on(self.common.write_at(
880 buffer_slice,
881 device_offset,
882 WriteOptions::default(),
883 NO_TRACE_ID,
884 ))
885 }
886
887 pub fn flush(&self) -> Result<(), zx::Status> {
888 block_on(self.common.flush(NO_TRACE_ID))
889 }
890
891 pub fn close(&self) -> Result<(), zx::Status> {
892 let () = self
893 .session
894 .close(zx::MonotonicInstant::INFINITE)
895 .map_err(fidl_to_status)?
896 .map_err(zx::Status::from_raw)?;
897 Ok(())
898 }
899
900 pub fn block_size(&self) -> u32 {
901 self.common.block_size()
902 }
903
904 pub fn block_count(&self) -> u64 {
905 self.common.block_count()
906 }
907
908 pub fn is_connected(&self) -> bool {
909 self.common.is_connected()
910 }
911}
912
913impl Drop for RemoteBlockClientSync {
914 fn drop(&mut self) {
915 let _ = self.close();
917 }
918}
919
920struct FifoPoller {
922 fifo_state: FifoStateRef,
923}
924
925impl Future for FifoPoller {
926 type Output = ();
927
928 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
929 let mut state_lock = self.fifo_state.lock();
930 let state = state_lock.deref_mut(); if state.poll_send_requests(context) {
934 return Poll::Ready(());
935 }
936
937 let fifo = state.fifo.as_ref().unwrap(); loop {
940 let mut response = MaybeUninit::uninit();
941 match fifo.try_read(context, &mut response) {
942 Poll::Pending => {
943 state.poller_waker = Some(context.waker().clone());
944 return Poll::Pending;
945 }
946 Poll::Ready(Ok(_)) => {
947 let response = unsafe { response.assume_init() };
948 let request_id = response.reqid;
949 if let Some(request_state) = state.map.get_mut(&request_id) {
951 request_state.result.replace(zx::Status::from_raw(response.status));
952 if let Some(waker) = request_state.waker.take() {
953 waker.wake();
954 }
955 }
956 }
957 Poll::Ready(Err(_)) => {
958 state.terminate();
959 return Poll::Ready(());
960 }
961 }
962 }
963 }
964}
965
966#[cfg(test)]
967mod tests {
968 use super::{
969 BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
970 RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
971 };
972 use block_protocol::ReadOptions;
973 use block_server::{BlockServer, DeviceInfo, PartitionInfo};
974 use fidl::endpoints::RequestStream as _;
975 use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
976 use futures::join;
977 use futures::stream::StreamExt as _;
978 use futures::stream::futures_unordered::FuturesUnordered;
979 use ramdevice_client::RamdiskClient;
980 use std::borrow::Cow;
981 use std::num::NonZero;
982 use std::sync::Arc;
983 use std::sync::atomic::{AtomicBool, Ordering};
984 use {fidl_fuchsia_storage_block as block, fuchsia_async as fasync};
985
986 const RAMDISK_BLOCK_SIZE: u64 = 1024;
987 const RAMDISK_BLOCK_COUNT: u64 = 1024;
988
989 pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
990 let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
991 .await
992 .expect("RamdiskClient::create failed");
993 let client_end = ramdisk.open().expect("ramdisk.open failed");
994 let proxy = client_end.into_proxy();
995 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
996 assert_eq!(block_client.block_size(), 1024);
997 let client_end = ramdisk.open().expect("ramdisk.open failed");
998 let proxy = client_end.into_proxy();
999 (ramdisk, proxy, block_client)
1000 }
1001
1002 #[fuchsia::test]
1003 async fn test_against_ram_disk() {
1004 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1005
1006 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1007 vmo.write(b"hello", 5).expect("vmo.write failed");
1008 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1009 block_client
1010 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1011 .await
1012 .expect("write_at failed");
1013 block_client
1014 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1015 .await
1016 .expect("read_at failed");
1017 let mut buf: [u8; 5] = Default::default();
1018 vmo.read(&mut buf, 1029).expect("vmo.read failed");
1019 assert_eq!(&buf, b"hello");
1020 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1021 }
1022
1023 #[fuchsia::test]
1024 async fn test_alignment() {
1025 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1026 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1027 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1028 block_client
1029 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1030 .await
1031 .expect_err("expected failure due to bad alignment");
1032 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1033 }
1034
1035 #[fuchsia::test]
1036 async fn test_parallel_io() {
1037 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1038 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1039 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1040 let mut reads = Vec::new();
1041 for _ in 0..1024 {
1042 reads.push(
1043 block_client
1044 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1045 .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1046 );
1047 }
1048 futures::future::join_all(reads).await;
1049 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1050 }
1051
1052 #[fuchsia::test]
1053 async fn test_closed_device() {
1054 let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1055 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1056 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1057 let mut reads = Vec::new();
1058 for _ in 0..1024 {
1059 reads.push(
1060 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1061 );
1062 }
1063 assert!(block_client.is_connected());
1064 let _ = futures::join!(futures::future::join_all(reads), async {
1065 ramdisk.destroy().await.expect("ramdisk.destroy failed")
1066 });
1067 while block_client
1069 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1070 .await
1071 .is_ok()
1072 {}
1073
1074 while block_client.is_connected() {
1077 fasync::Timer::new(fasync::MonotonicInstant::after(
1079 zx::MonotonicDuration::from_millis(500),
1080 ))
1081 .await;
1082 }
1083
1084 assert_eq!(block_client.is_connected(), false);
1086 let _ = block_client.detach_vmo(vmo_id).await;
1087 }
1088
1089 #[fuchsia::test]
1090 async fn test_cancelled_reads() {
1091 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1092 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1093 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1094 {
1095 let mut reads = FuturesUnordered::new();
1096 for _ in 0..1024 {
1097 reads.push(
1098 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1099 );
1100 }
1101 for _ in 0..500 {
1103 reads.next().await;
1104 }
1105 }
1106 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1107 }
1108
1109 #[fuchsia::test]
1110 async fn test_parallel_large_read_and_write_with_memory_succeds() {
1111 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1112 let block_client_ref = &block_client;
1113 let test_one = |offset, len, fill| async move {
1114 let buf = vec![fill; len];
1115 block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1116 let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1118 block_client_ref
1119 .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1120 .await
1121 .expect("read_at failed");
1122 assert_eq!(
1123 &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1124 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1125 );
1126 assert_eq!(
1127 &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1128 &buf[..]
1129 );
1130 assert_eq!(
1131 &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1132 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1133 );
1134 };
1135 const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1136 join!(
1137 test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1138 test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1139 );
1140 }
1141
1142 struct FakeBlockServer<'a> {
1146 server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1147 channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1148 fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1149 }
1150
1151 impl<'a> FakeBlockServer<'a> {
1152 fn new(
1164 server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1165 channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1166 fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1167 ) -> FakeBlockServer<'a> {
1168 FakeBlockServer {
1169 server_channel: Some(server_channel),
1170 channel_handler: Box::new(channel_handler),
1171 fifo_handler: Box::new(fifo_handler),
1172 }
1173 }
1174
1175 async fn run(&mut self) {
1177 let server = self.server_channel.take().unwrap();
1178
1179 let (server_fifo, client_fifo) =
1181 zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1182 .expect("Fifo::create failed");
1183 let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1184
1185 let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1186 let fifo_future = Abortable::new(
1187 async {
1188 let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1189 let (mut reader, mut writer) = fifo.async_io();
1190 let mut request = BlockFifoRequest::default();
1191 loop {
1192 match reader.read_entries(&mut request).await {
1193 Ok(1) => {}
1194 Err(zx::Status::PEER_CLOSED) => break,
1195 Err(e) => panic!("read_entry failed {:?}", e),
1196 _ => unreachable!(),
1197 };
1198
1199 let response = self.fifo_handler.as_ref()(request);
1200 writer
1201 .write_entries(std::slice::from_ref(&response))
1202 .await
1203 .expect("write_entries failed");
1204 }
1205 },
1206 fifo_future_abort_registration,
1207 );
1208
1209 let channel_future = async {
1210 server
1211 .into_stream()
1212 .for_each_concurrent(None, |request| async {
1213 let request = request.expect("unexpected fidl error");
1214
1215 match request {
1216 block::BlockRequest::GetInfo { responder } => {
1217 responder
1218 .send(Ok(&block::BlockInfo {
1219 block_count: 1024,
1220 block_size: 512,
1221 max_transfer_size: 1024 * 1024,
1222 flags: block::DeviceFlag::empty(),
1223 }))
1224 .expect("send failed");
1225 }
1226 block::BlockRequest::OpenSession { session, control_handle: _ } => {
1227 let stream = session.into_stream();
1228 stream
1229 .for_each(|request| async {
1230 let request = request.expect("unexpected fidl error");
1231 if self.channel_handler.as_ref()(&request) {
1234 return;
1235 }
1236 match request {
1237 block::SessionRequest::GetFifo { responder } => {
1238 match maybe_server_fifo.lock().take() {
1239 Some(fifo) => {
1240 responder.send(Ok(fifo.downcast()))
1241 }
1242 None => responder.send(Err(
1243 zx::Status::NO_RESOURCES.into_raw(),
1244 )),
1245 }
1246 .expect("send failed")
1247 }
1248 block::SessionRequest::AttachVmo {
1249 vmo: _,
1250 responder,
1251 } => responder
1252 .send(Ok(&block::VmoId { id: 1 }))
1253 .expect("send failed"),
1254 block::SessionRequest::Close { responder } => {
1255 fifo_future_abort.abort();
1256 responder.send(Ok(())).expect("send failed")
1257 }
1258 }
1259 })
1260 .await
1261 }
1262 _ => panic!("Unexpected message"),
1263 }
1264 })
1265 .await;
1266 };
1267
1268 let _result = join!(fifo_future, channel_future);
1269 }
1271 }
1272
1273 #[fuchsia::test]
1274 async fn test_block_close_is_called() {
1275 let close_called = fuchsia_sync::Mutex::new(false);
1276 let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1277
1278 std::thread::spawn(move || {
1279 let _block_client =
1280 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1281 });
1283
1284 let channel_handler = |request: &block::SessionRequest| -> bool {
1285 if let block::SessionRequest::Close { .. } = request {
1286 *close_called.lock() = true;
1287 }
1288 false
1289 };
1290 FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1291
1292 assert!(*close_called.lock());
1294 }
1295
1296 #[fuchsia::test]
1297 async fn test_block_flush_is_called() {
1298 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1299
1300 struct Interface {
1301 flush_called: Arc<AtomicBool>,
1302 }
1303 impl block_server::async_interface::Interface for Interface {
1304 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1305 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1306 device_flags: fidl_fuchsia_storage_block::DeviceFlag::empty(),
1307 max_transfer_blocks: None,
1308 block_range: Some(0..1000),
1309 type_guid: [0; 16],
1310 instance_guid: [0; 16],
1311 name: "foo".to_string(),
1312 flags: 0,
1313 }))
1314 }
1315
1316 async fn read(
1317 &self,
1318 _device_block_offset: u64,
1319 _block_count: u32,
1320 _vmo: &Arc<zx::Vmo>,
1321 _vmo_offset: u64,
1322 _opts: ReadOptions,
1323 _trace_flow_id: Option<NonZero<u64>>,
1324 ) -> Result<(), zx::Status> {
1325 unreachable!();
1326 }
1327
1328 async fn write(
1329 &self,
1330 _device_block_offset: u64,
1331 _block_count: u32,
1332 _vmo: &Arc<zx::Vmo>,
1333 _vmo_offset: u64,
1334 _opts: WriteOptions,
1335 _trace_flow_id: Option<NonZero<u64>>,
1336 ) -> Result<(), zx::Status> {
1337 unreachable!();
1338 }
1339
1340 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1341 self.flush_called.store(true, Ordering::Relaxed);
1342 Ok(())
1343 }
1344
1345 async fn trim(
1346 &self,
1347 _device_block_offset: u64,
1348 _block_count: u32,
1349 _trace_flow_id: Option<NonZero<u64>>,
1350 ) -> Result<(), zx::Status> {
1351 unreachable!();
1352 }
1353 }
1354
1355 let flush_called = Arc::new(AtomicBool::new(false));
1356
1357 futures::join!(
1358 async {
1359 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1360
1361 block_client.flush().await.expect("flush failed");
1362 },
1363 async {
1364 let block_server = BlockServer::new(
1365 512,
1366 Arc::new(Interface { flush_called: flush_called.clone() }),
1367 );
1368 block_server.handle_requests(stream.cast_stream()).await.unwrap();
1369 }
1370 );
1371
1372 assert!(flush_called.load(Ordering::Relaxed));
1373 }
1374
1375 #[fuchsia::test]
1376 async fn test_trace_flow_ids_set() {
1377 let (proxy, server) = fidl::endpoints::create_proxy();
1378
1379 futures::join!(
1380 async {
1381 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1382 block_client.flush().await.expect("flush failed");
1383 },
1384 async {
1385 let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1386 let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1387 if request.trace_flow_id > 0 {
1388 *flow_id.lock() = Some(request.trace_flow_id);
1389 }
1390 BlockFifoResponse {
1391 status: zx::Status::OK.into_raw(),
1392 reqid: request.reqid,
1393 ..Default::default()
1394 }
1395 };
1396 FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1397 assert!(flow_id.lock().is_some());
1399 }
1400 );
1401 }
1402}