1use async_trait::async_trait;
6use fidl::endpoints::ServerEnd;
7use fidl_fuchsia_hardware_block::BlockProxy;
8use fidl_fuchsia_hardware_block_partition::PartitionProxy;
9use fidl_fuchsia_hardware_block_volume::VolumeProxy;
10use fuchsia_async::{self as fasync, FifoReadable as _, FifoWritable as _};
11use futures::channel::oneshot;
12use futures::executor::block_on;
13use lazy_static::lazy_static;
14use std::collections::HashMap;
15use std::future::Future;
16use std::hash::{Hash, Hasher};
17use std::ops::{DerefMut, Range};
18use std::pin::Pin;
19use std::sync::atomic::{AtomicU16, Ordering};
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll, Waker};
22use zx::sys::zx_handle_t;
23use zx::{self as zx, HandleBased as _};
24use {
25 fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
26 storage_trace as trace,
27};
28
29pub use cache::Cache;
30
31pub use block::Flag as BlockFlags;
32
33pub use block_protocol::*;
34
35pub mod cache;
36
37const TEMP_VMO_SIZE: usize = 65536;
38
39pub const NO_TRACE_ID: u64 = 0;
41
42pub use block_driver::{BlockIoFlag, BlockOpcode};
43
44fn fidl_to_status(error: fidl::Error) -> zx::Status {
45 match error {
46 fidl::Error::ClientChannelClosed { status, .. } => status,
47 _ => zx::Status::INTERNAL,
48 }
49}
50
51fn opcode_str(opcode: u8) -> &'static str {
52 match BlockOpcode::from_primitive(opcode) {
53 Some(BlockOpcode::Read) => "read",
54 Some(BlockOpcode::Write) => "write",
55 Some(BlockOpcode::Flush) => "flush",
56 Some(BlockOpcode::Trim) => "trim",
57 Some(BlockOpcode::CloseVmo) => "close_vmo",
58 None => "unknown",
59 }
60}
61
62fn generate_trace_flow_id(request_id: u32) -> u64 {
65 lazy_static! {
66 static ref SELF_HANDLE: zx_handle_t = fuchsia_runtime::process_self().raw_handle();
67 };
68 *SELF_HANDLE as u64 + (request_id as u64) << 32
69}
70
71pub enum BufferSlice<'a> {
72 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
73 Memory(&'a [u8]),
74}
75
76impl<'a> BufferSlice<'a> {
77 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
78 BufferSlice::VmoId { vmo_id, offset, length }
79 }
80}
81
82impl<'a> From<&'a [u8]> for BufferSlice<'a> {
83 fn from(buf: &'a [u8]) -> Self {
84 BufferSlice::Memory(buf)
85 }
86}
87
88pub enum MutableBufferSlice<'a> {
89 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
90 Memory(&'a mut [u8]),
91}
92
93impl<'a> MutableBufferSlice<'a> {
94 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
95 MutableBufferSlice::VmoId { vmo_id, offset, length }
96 }
97}
98
99impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
100 fn from(buf: &'a mut [u8]) -> Self {
101 MutableBufferSlice::Memory(buf)
102 }
103}
104
105#[derive(Default)]
106struct RequestState {
107 result: Option<zx::Status>,
108 waker: Option<Waker>,
109}
110
111#[derive(Default)]
112struct FifoState {
113 fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
115
116 next_request_id: u32,
118
119 queue: std::collections::VecDeque<BlockFifoRequest>,
121
122 map: HashMap<u32, RequestState>,
124
125 poller_waker: Option<Waker>,
127}
128
129impl FifoState {
130 fn terminate(&mut self) {
131 self.fifo.take();
132 for (_, request_state) in self.map.iter_mut() {
133 request_state.result.get_or_insert(zx::Status::CANCELED);
134 if let Some(waker) = request_state.waker.take() {
135 waker.wake();
136 }
137 }
138 if let Some(waker) = self.poller_waker.take() {
139 waker.wake();
140 }
141 }
142
143 fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
145 let fifo = if let Some(fifo) = self.fifo.as_ref() {
146 fifo
147 } else {
148 return true;
149 };
150
151 loop {
152 let slice = self.queue.as_slices().0;
153 if slice.is_empty() {
154 return false;
155 }
156 match fifo.write(context, slice) {
157 Poll::Ready(Ok(sent)) => {
158 self.queue.drain(0..sent);
159 }
160 Poll::Ready(Err(_)) => {
161 self.terminate();
162 return true;
163 }
164 Poll::Pending => {
165 return false;
166 }
167 }
168 }
169 }
170}
171
172type FifoStateRef = Arc<Mutex<FifoState>>;
173
174struct ResponseFuture {
176 request_id: u32,
177 fifo_state: FifoStateRef,
178}
179
180impl ResponseFuture {
181 fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
182 ResponseFuture { request_id, fifo_state }
183 }
184}
185
186impl Future for ResponseFuture {
187 type Output = Result<(), zx::Status>;
188
189 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
190 let mut state = self.fifo_state.lock().unwrap();
191 let request_state = state.map.get_mut(&self.request_id).unwrap();
192 if let Some(result) = request_state.result {
193 Poll::Ready(result.into())
194 } else {
195 request_state.waker.replace(context.waker().clone());
196 Poll::Pending
197 }
198 }
199}
200
201impl Drop for ResponseFuture {
202 fn drop(&mut self) {
203 self.fifo_state.lock().unwrap().map.remove(&self.request_id).unwrap();
204 }
205}
206
207#[derive(Debug)]
209#[must_use]
210pub struct VmoId(AtomicU16);
211
212impl VmoId {
213 pub fn new(id: u16) -> Self {
215 Self(AtomicU16::new(id))
216 }
217
218 pub fn take(&self) -> Self {
220 Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
221 }
222
223 pub fn is_valid(&self) -> bool {
224 self.id() != block_driver::BLOCK_VMOID_INVALID
225 }
226
227 #[must_use]
229 pub fn into_id(self) -> u16 {
230 self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
231 }
232
233 pub fn id(&self) -> u16 {
234 self.0.load(Ordering::Relaxed)
235 }
236}
237
238impl PartialEq for VmoId {
239 fn eq(&self, other: &Self) -> bool {
240 self.id() == other.id()
241 }
242}
243
244impl Eq for VmoId {}
245
246impl Drop for VmoId {
247 fn drop(&mut self) {
248 assert_eq!(
249 self.0.load(Ordering::Relaxed),
250 block_driver::BLOCK_VMOID_INVALID,
251 "Did you forget to detach?"
252 );
253 }
254}
255
256impl Hash for VmoId {
257 fn hash<H: Hasher>(&self, state: &mut H) {
258 self.id().hash(state);
259 }
260}
261
262#[async_trait]
266pub trait BlockClient: Send + Sync {
267 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status>;
269
270 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status>;
272
273 async fn read_at(
275 &self,
276 buffer_slice: MutableBufferSlice<'_>,
277 device_offset: u64,
278 ) -> Result<(), zx::Status> {
279 self.read_at_traced(buffer_slice, device_offset, 0).await
280 }
281
282 async fn read_at_traced(
283 &self,
284 buffer_slice: MutableBufferSlice<'_>,
285 device_offset: u64,
286 trace_flow_id: u64,
287 ) -> Result<(), zx::Status>;
288
289 async fn write_at(
291 &self,
292 buffer_slice: BufferSlice<'_>,
293 device_offset: u64,
294 ) -> Result<(), zx::Status> {
295 self.write_at_with_opts_traced(
296 buffer_slice,
297 device_offset,
298 WriteOptions::empty(),
299 NO_TRACE_ID,
300 )
301 .await
302 }
303
304 async fn write_at_with_opts(
305 &self,
306 buffer_slice: BufferSlice<'_>,
307 device_offset: u64,
308 opts: WriteOptions,
309 ) -> Result<(), zx::Status> {
310 self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID).await
311 }
312
313 async fn write_at_with_opts_traced(
314 &self,
315 buffer_slice: BufferSlice<'_>,
316 device_offset: u64,
317 opts: WriteOptions,
318 trace_flow_id: u64,
319 ) -> Result<(), zx::Status>;
320
321 async fn trim(&self, device_range: Range<u64>) -> Result<(), zx::Status> {
323 self.trim_traced(device_range, NO_TRACE_ID).await
324 }
325
326 async fn trim_traced(
327 &self,
328 device_range: Range<u64>,
329 trace_flow_id: u64,
330 ) -> Result<(), zx::Status>;
331
332 async fn flush(&self) -> Result<(), zx::Status> {
333 self.flush_traced(NO_TRACE_ID).await
334 }
335
336 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status>;
338
339 async fn close(&self) -> Result<(), zx::Status>;
341
342 fn block_size(&self) -> u32;
344
345 fn block_count(&self) -> u64;
347
348 fn block_flags(&self) -> BlockFlags;
350
351 fn is_connected(&self) -> bool;
353}
354
355struct Common {
356 block_size: u32,
357 block_count: u64,
358 block_flags: BlockFlags,
359 fifo_state: FifoStateRef,
360 temp_vmo: futures::lock::Mutex<zx::Vmo>,
361 temp_vmo_id: VmoId,
362}
363
364impl Common {
365 fn new(
366 fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
367 info: &block::BlockInfo,
368 temp_vmo: zx::Vmo,
369 temp_vmo_id: VmoId,
370 ) -> Self {
371 let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
372 fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
373 Self {
374 block_size: info.block_size,
375 block_count: info.block_count,
376 block_flags: info.flags,
377 fifo_state,
378 temp_vmo: futures::lock::Mutex::new(temp_vmo),
379 temp_vmo_id,
380 }
381 }
382
383 fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
384 if bytes % self.block_size as u64 != 0 {
385 Err(zx::Status::INVALID_ARGS)
386 } else {
387 Ok(bytes / self.block_size as u64)
388 }
389 }
390
391 async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
393 async move {
394 let (request_id, trace_flow_id) = {
395 let mut state = self.fifo_state.lock().unwrap();
396 if state.fifo.is_none() {
397 return Err(zx::Status::CANCELED);
399 }
400 trace::duration!(
401 c"storage",
402 c"block_client::send::start",
403 "op" => opcode_str(request.command.opcode)
404 );
405 let request_id = state.next_request_id;
406 state.next_request_id = state.next_request_id.overflowing_add(1).0;
407 assert!(
408 state.map.insert(request_id, RequestState::default()).is_none(),
409 "request id in use!"
410 );
411 request.reqid = request_id;
412 if request.trace_flow_id == NO_TRACE_ID {
413 request.trace_flow_id = generate_trace_flow_id(request_id);
414 }
415 let trace_flow_id = request.trace_flow_id;
416 trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
417 state.queue.push_back(request);
418 if let Some(waker) = state.poller_waker.clone() {
419 state.poll_send_requests(&mut Context::from_waker(&waker));
420 }
421 (request_id, trace_flow_id)
422 };
423 ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
424 trace::duration!(c"storage", c"block_client::send::end");
425 trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
426 Ok(())
427 }
428 .await
429 }
430
431 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
432 self.send(BlockFifoRequest {
433 command: BlockFifoCommand {
434 opcode: BlockOpcode::CloseVmo.into_primitive(),
435 flags: 0,
436 ..Default::default()
437 },
438 vmoid: vmo_id.into_id(),
439 ..Default::default()
440 })
441 .await
442 }
443
444 async fn read_at(
445 &self,
446 buffer_slice: MutableBufferSlice<'_>,
447 device_offset: u64,
448 trace_flow_id: u64,
449 ) -> Result<(), zx::Status> {
450 match buffer_slice {
451 MutableBufferSlice::VmoId { vmo_id, offset, length } => {
452 self.send(BlockFifoRequest {
453 command: BlockFifoCommand {
454 opcode: BlockOpcode::Read.into_primitive(),
455 flags: 0,
456 ..Default::default()
457 },
458 vmoid: vmo_id.id(),
459 length: self
460 .to_blocks(length)?
461 .try_into()
462 .map_err(|_| zx::Status::INVALID_ARGS)?,
463 vmo_offset: self.to_blocks(offset)?,
464 dev_offset: self.to_blocks(device_offset)?,
465 trace_flow_id,
466 ..Default::default()
467 })
468 .await?
469 }
470 MutableBufferSlice::Memory(mut slice) => {
471 let temp_vmo = self.temp_vmo.lock().await;
472 let mut device_block = self.to_blocks(device_offset)?;
473 loop {
474 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
475 let block_count = self.to_blocks(to_do as u64)? as u32;
476 self.send(BlockFifoRequest {
477 command: BlockFifoCommand {
478 opcode: BlockOpcode::Read.into_primitive(),
479 flags: 0,
480 ..Default::default()
481 },
482 vmoid: self.temp_vmo_id.id(),
483 length: block_count,
484 vmo_offset: 0,
485 dev_offset: device_block,
486 trace_flow_id,
487 ..Default::default()
488 })
489 .await?;
490 temp_vmo.read(&mut slice[..to_do], 0)?;
491 if to_do == slice.len() {
492 break;
493 }
494 device_block += block_count as u64;
495 slice = &mut slice[to_do..];
496 }
497 }
498 }
499 Ok(())
500 }
501
502 async fn write_at(
503 &self,
504 buffer_slice: BufferSlice<'_>,
505 device_offset: u64,
506 opts: WriteOptions,
507 trace_flow_id: u64,
508 ) -> Result<(), zx::Status> {
509 let flags = if opts.contains(WriteOptions::FORCE_ACCESS) {
510 BlockIoFlag::FORCE_ACCESS.bits()
511 } else {
512 0
513 };
514 match buffer_slice {
515 BufferSlice::VmoId { vmo_id, offset, length } => {
516 self.send(BlockFifoRequest {
517 command: BlockFifoCommand {
518 opcode: BlockOpcode::Write.into_primitive(),
519 flags,
520 ..Default::default()
521 },
522 vmoid: vmo_id.id(),
523 length: self
524 .to_blocks(length)?
525 .try_into()
526 .map_err(|_| zx::Status::INVALID_ARGS)?,
527 vmo_offset: self.to_blocks(offset)?,
528 dev_offset: self.to_blocks(device_offset)?,
529 trace_flow_id,
530 ..Default::default()
531 })
532 .await?;
533 }
534 BufferSlice::Memory(mut slice) => {
535 let temp_vmo = self.temp_vmo.lock().await;
536 let mut device_block = self.to_blocks(device_offset)?;
537 loop {
538 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
539 let block_count = self.to_blocks(to_do as u64)? as u32;
540 temp_vmo.write(&slice[..to_do], 0)?;
541 self.send(BlockFifoRequest {
542 command: BlockFifoCommand {
543 opcode: BlockOpcode::Write.into_primitive(),
544 flags,
545 ..Default::default()
546 },
547 vmoid: self.temp_vmo_id.id(),
548 length: block_count,
549 vmo_offset: 0,
550 dev_offset: device_block,
551 trace_flow_id,
552 ..Default::default()
553 })
554 .await?;
555 if to_do == slice.len() {
556 break;
557 }
558 device_block += block_count as u64;
559 slice = &slice[to_do..];
560 }
561 }
562 }
563 Ok(())
564 }
565
566 async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
567 let length = self.to_blocks(device_range.end - device_range.start)? as u32;
568 let dev_offset = self.to_blocks(device_range.start)?;
569 self.send(BlockFifoRequest {
570 command: BlockFifoCommand {
571 opcode: BlockOpcode::Trim.into_primitive(),
572 flags: 0,
573 ..Default::default()
574 },
575 vmoid: block_driver::BLOCK_VMOID_INVALID,
576 length,
577 dev_offset,
578 trace_flow_id,
579 ..Default::default()
580 })
581 .await
582 }
583
584 async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
585 self.send(BlockFifoRequest {
586 command: BlockFifoCommand {
587 opcode: BlockOpcode::Flush.into_primitive(),
588 flags: 0,
589 ..Default::default()
590 },
591 vmoid: block_driver::BLOCK_VMOID_INVALID,
592 trace_flow_id,
593 ..Default::default()
594 })
595 .await
596 }
597
598 fn block_size(&self) -> u32 {
599 self.block_size
600 }
601
602 fn block_count(&self) -> u64 {
603 self.block_count
604 }
605
606 fn block_flags(&self) -> BlockFlags {
607 self.block_flags
608 }
609
610 fn is_connected(&self) -> bool {
611 self.fifo_state.lock().unwrap().fifo.is_some()
612 }
613}
614
615impl Drop for Common {
616 fn drop(&mut self) {
617 let _ = self.temp_vmo_id.take().into_id();
620 self.fifo_state.lock().unwrap().terminate();
621 }
622}
623
624pub struct RemoteBlockClient {
626 session: block::SessionProxy,
627 common: Common,
628}
629
630pub trait AsBlockProxy {
631 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
632
633 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
634}
635
636impl<T: AsBlockProxy> AsBlockProxy for &T {
637 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
638 AsBlockProxy::get_info(*self)
639 }
640 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
641 AsBlockProxy::open_session(*self, session)
642 }
643}
644
645macro_rules! impl_as_block_proxy {
646 ($name:ident) => {
647 impl AsBlockProxy for $name {
648 async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
649 $name::get_info(self).await
650 }
651
652 fn open_session(
653 &self,
654 session: ServerEnd<block::SessionMarker>,
655 ) -> Result<(), fidl::Error> {
656 $name::open_session(self, session)
657 }
658 }
659 };
660}
661
662impl_as_block_proxy!(BlockProxy);
663impl_as_block_proxy!(PartitionProxy);
664impl_as_block_proxy!(VolumeProxy);
665
666impl RemoteBlockClient {
667 pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
669 let info =
670 remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
671 let (session, server) = fidl::endpoints::create_proxy();
672 let () = remote.open_session(server).map_err(fidl_to_status)?;
673 Self::from_session(info, session).await
674 }
675
676 pub async fn from_session(
677 info: block::BlockInfo,
678 session: block::SessionProxy,
679 ) -> Result<Self, zx::Status> {
680 let fifo =
681 session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
682 let fifo = fasync::Fifo::from_fifo(fifo);
683 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
684 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
685 let vmo_id =
686 session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
687 let vmo_id = VmoId::new(vmo_id.id);
688 Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
689 }
690}
691
692#[async_trait]
693impl BlockClient for RemoteBlockClient {
694 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
695 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
696 let vmo_id = self
697 .session
698 .attach_vmo(dup)
699 .await
700 .map_err(fidl_to_status)?
701 .map_err(zx::Status::from_raw)?;
702 Ok(VmoId::new(vmo_id.id))
703 }
704
705 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
706 self.common.detach_vmo(vmo_id).await
707 }
708
709 async fn read_at_traced(
710 &self,
711 buffer_slice: MutableBufferSlice<'_>,
712 device_offset: u64,
713 trace_flow_id: u64,
714 ) -> Result<(), zx::Status> {
715 self.common.read_at(buffer_slice, device_offset, trace_flow_id).await
716 }
717
718 async fn write_at_with_opts_traced(
719 &self,
720 buffer_slice: BufferSlice<'_>,
721 device_offset: u64,
722 opts: WriteOptions,
723 trace_flow_id: u64,
724 ) -> Result<(), zx::Status> {
725 self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
726 }
727
728 async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
729 self.common.trim(range, trace_flow_id).await
730 }
731
732 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
733 self.common.flush(trace_flow_id).await
734 }
735
736 async fn close(&self) -> Result<(), zx::Status> {
737 let () =
738 self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
739 Ok(())
740 }
741
742 fn block_size(&self) -> u32 {
743 self.common.block_size()
744 }
745
746 fn block_count(&self) -> u64 {
747 self.common.block_count()
748 }
749
750 fn block_flags(&self) -> BlockFlags {
751 self.common.block_flags()
752 }
753
754 fn is_connected(&self) -> bool {
755 self.common.is_connected()
756 }
757}
758
759pub struct RemoteBlockClientSync {
760 session: block::SessionSynchronousProxy,
761 common: Common,
762}
763
764impl RemoteBlockClientSync {
765 pub fn new(
769 client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
770 ) -> Result<Self, zx::Status> {
771 let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
772 let info = remote
773 .get_info(zx::MonotonicInstant::INFINITE)
774 .map_err(fidl_to_status)?
775 .map_err(zx::Status::from_raw)?;
776 let (client, server) = fidl::endpoints::create_endpoints();
777 let () = remote.open_session(server).map_err(fidl_to_status)?;
778 let session = block::SessionSynchronousProxy::new(client.into_channel());
779 let fifo = session
780 .get_fifo(zx::MonotonicInstant::INFINITE)
781 .map_err(fidl_to_status)?
782 .map_err(zx::Status::from_raw)?;
783 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
784 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
785 let vmo_id = session
786 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
787 .map_err(fidl_to_status)?
788 .map_err(zx::Status::from_raw)?;
789 let vmo_id = VmoId::new(vmo_id.id);
790
791 let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
794 std::thread::spawn(move || {
795 let mut executor = fasync::LocalExecutor::new();
796 let fifo = fasync::Fifo::from_fifo(fifo);
797 let common = Common::new(fifo, &info, temp_vmo, vmo_id);
798 let fifo_state = common.fifo_state.clone();
799 let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
800 executor.run_singlethreaded(FifoPoller { fifo_state });
801 });
802 block_on(receiver).map_err(|_| zx::Status::CANCELED)?
803 }
804
805 pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
806 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
807 let vmo_id = self
808 .session
809 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
810 .map_err(fidl_to_status)?
811 .map_err(zx::Status::from_raw)?;
812 Ok(VmoId::new(vmo_id.id))
813 }
814
815 pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
816 block_on(self.common.detach_vmo(vmo_id))
817 }
818
819 pub fn read_at(
820 &self,
821 buffer_slice: MutableBufferSlice<'_>,
822 device_offset: u64,
823 ) -> Result<(), zx::Status> {
824 block_on(self.common.read_at(buffer_slice, device_offset, NO_TRACE_ID))
825 }
826
827 pub fn write_at(
828 &self,
829 buffer_slice: BufferSlice<'_>,
830 device_offset: u64,
831 ) -> Result<(), zx::Status> {
832 block_on(self.common.write_at(
833 buffer_slice,
834 device_offset,
835 WriteOptions::empty(),
836 NO_TRACE_ID,
837 ))
838 }
839
840 pub fn flush(&self) -> Result<(), zx::Status> {
841 block_on(self.common.flush(NO_TRACE_ID))
842 }
843
844 pub fn close(&self) -> Result<(), zx::Status> {
845 let () = self
846 .session
847 .close(zx::MonotonicInstant::INFINITE)
848 .map_err(fidl_to_status)?
849 .map_err(zx::Status::from_raw)?;
850 Ok(())
851 }
852
853 pub fn block_size(&self) -> u32 {
854 self.common.block_size()
855 }
856
857 pub fn block_count(&self) -> u64 {
858 self.common.block_count()
859 }
860
861 pub fn is_connected(&self) -> bool {
862 self.common.is_connected()
863 }
864}
865
866impl Drop for RemoteBlockClientSync {
867 fn drop(&mut self) {
868 let _ = self.close();
870 }
871}
872
873struct FifoPoller {
875 fifo_state: FifoStateRef,
876}
877
878impl Future for FifoPoller {
879 type Output = ();
880
881 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
882 let mut state_lock = self.fifo_state.lock().unwrap();
883 let state = state_lock.deref_mut(); if state.poll_send_requests(context) {
887 return Poll::Ready(());
888 }
889
890 let fifo = state.fifo.as_ref().unwrap(); while let Poll::Ready(result) = fifo.read_one(context) {
893 match result {
894 Ok(response) => {
895 let request_id = response.reqid;
896 if let Some(request_state) = state.map.get_mut(&request_id) {
898 request_state.result.replace(zx::Status::from_raw(response.status));
899 if let Some(waker) = request_state.waker.take() {
900 waker.wake();
901 }
902 }
903 }
904 Err(_) => {
905 state.terminate();
906 return Poll::Ready(());
907 }
908 }
909 }
910
911 state.poller_waker = Some(context.waker().clone());
912 Poll::Pending
913 }
914}
915
916#[cfg(test)]
917mod tests {
918 use super::{
919 BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
920 RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
921 };
922 use block_server::{BlockServer, DeviceInfo, PartitionInfo};
923 use fidl::endpoints::RequestStream as _;
924 use fidl_fuchsia_hardware_block as block;
925 use fuchsia_async::{self as fasync, FifoReadable as _, FifoWritable as _};
926 use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
927 use futures::join;
928 use futures::stream::futures_unordered::FuturesUnordered;
929 use futures::stream::StreamExt as _;
930 use ramdevice_client::RamdiskClient;
931 use std::borrow::Cow;
932 use std::num::NonZero;
933 use std::sync::atomic::{AtomicBool, Ordering};
934 use std::sync::Arc;
935
936 const RAMDISK_BLOCK_SIZE: u64 = 1024;
937 const RAMDISK_BLOCK_COUNT: u64 = 1024;
938
939 pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
940 let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
941 .await
942 .expect("RamdiskClient::create failed");
943 let client_end = ramdisk.open().expect("ramdisk.open failed");
944 let proxy = client_end.into_proxy();
945 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
946 assert_eq!(block_client.block_size(), 1024);
947 let client_end = ramdisk.open().expect("ramdisk.open failed");
948 let proxy = client_end.into_proxy();
949 (ramdisk, proxy, block_client)
950 }
951
952 #[fuchsia::test]
953 async fn test_against_ram_disk() {
954 let (_ramdisk, block_proxy, block_client) = make_ramdisk().await;
955
956 let stats_before = block_proxy
957 .get_stats(false)
958 .await
959 .expect("get_stats failed")
960 .map_err(zx::Status::from_raw)
961 .expect("get_stats error");
962
963 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
964 vmo.write(b"hello", 5).expect("vmo.write failed");
965 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
966 block_client
967 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
968 .await
969 .expect("write_at failed");
970 block_client
971 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
972 .await
973 .expect("read_at failed");
974 let mut buf: [u8; 5] = Default::default();
975 vmo.read(&mut buf, 1029).expect("vmo.read failed");
976 assert_eq!(&buf, b"hello");
977 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
978
979 let stats_after = block_proxy
981 .get_stats(false)
982 .await
983 .expect("get_stats failed")
984 .map_err(zx::Status::from_raw)
985 .expect("get_stats error");
986 assert_eq!(
988 stats_before.write.success.total_calls + 1,
989 stats_after.write.success.total_calls
990 );
991 assert_eq!(
992 stats_before.write.success.bytes_transferred + 1024,
993 stats_after.write.success.bytes_transferred
994 );
995 assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls);
996 assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls);
998 assert_eq!(
999 stats_before.read.success.bytes_transferred + 2048,
1000 stats_after.read.success.bytes_transferred
1001 );
1002 assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls);
1003 }
1004
1005 #[fuchsia::test]
1006 async fn test_against_ram_disk_with_flush() {
1007 let (_ramdisk, block_proxy, block_client) = make_ramdisk().await;
1008
1009 let stats_before = block_proxy
1010 .get_stats(false)
1011 .await
1012 .expect("get_stats failed")
1013 .map_err(zx::Status::from_raw)
1014 .expect("get_stats error");
1015
1016 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1017 vmo.write(b"hello", 5).expect("vmo.write failed");
1018 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1019 block_client
1020 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1021 .await
1022 .expect("write_at failed");
1023 block_client.flush().await.expect("flush failed");
1024 block_client
1025 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1026 .await
1027 .expect("read_at failed");
1028 let mut buf: [u8; 5] = Default::default();
1029 vmo.read(&mut buf, 1029).expect("vmo.read failed");
1030 assert_eq!(&buf, b"hello");
1031 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1032
1033 let stats_after = block_proxy
1035 .get_stats(false)
1036 .await
1037 .expect("get_stats failed")
1038 .map_err(zx::Status::from_raw)
1039 .expect("get_stats error");
1040 assert_eq!(
1042 stats_before.write.success.total_calls + 1,
1043 stats_after.write.success.total_calls
1044 );
1045 assert_eq!(
1046 stats_before.write.success.bytes_transferred + 1024,
1047 stats_after.write.success.bytes_transferred
1048 );
1049 assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls);
1050 assert_eq!(
1052 stats_before.flush.success.total_calls + 1,
1053 stats_after.flush.success.total_calls
1054 );
1055 assert_eq!(stats_before.flush.failure.total_calls, stats_after.flush.failure.total_calls);
1056 assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls);
1058 assert_eq!(
1059 stats_before.read.success.bytes_transferred + 2048,
1060 stats_after.read.success.bytes_transferred
1061 );
1062 assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls);
1063 }
1064
1065 #[fuchsia::test]
1066 async fn test_alignment() {
1067 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1068 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1069 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1070 block_client
1071 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1072 .await
1073 .expect_err("expected failure due to bad alignment");
1074 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1075 }
1076
1077 #[fuchsia::test]
1078 async fn test_parallel_io() {
1079 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1080 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1081 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1082 let mut reads = Vec::new();
1083 for _ in 0..1024 {
1084 reads.push(
1085 block_client
1086 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1087 .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1088 );
1089 }
1090 futures::future::join_all(reads).await;
1091 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1092 }
1093
1094 #[fuchsia::test]
1095 async fn test_closed_device() {
1096 let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1097 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1098 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1099 let mut reads = Vec::new();
1100 for _ in 0..1024 {
1101 reads.push(
1102 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1103 );
1104 }
1105 assert!(block_client.is_connected());
1106 let _ = futures::join!(futures::future::join_all(reads), async {
1107 ramdisk.destroy().await.expect("ramdisk.destroy failed")
1108 });
1109 while block_client
1111 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1112 .await
1113 .is_ok()
1114 {}
1115
1116 while block_client.is_connected() {
1119 fasync::Timer::new(fasync::MonotonicInstant::after(
1121 zx::MonotonicDuration::from_millis(500),
1122 ))
1123 .await;
1124 }
1125
1126 assert_eq!(block_client.is_connected(), false);
1128 let _ = block_client.detach_vmo(vmo_id).await;
1129 }
1130
1131 #[fuchsia::test]
1132 async fn test_cancelled_reads() {
1133 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1134 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1135 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1136 {
1137 let mut reads = FuturesUnordered::new();
1138 for _ in 0..1024 {
1139 reads.push(
1140 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1141 );
1142 }
1143 for _ in 0..500 {
1145 reads.next().await;
1146 }
1147 }
1148 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1149 }
1150
1151 #[fuchsia::test]
1152 async fn test_parallel_large_read_and_write_with_memory_succeds() {
1153 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1154 let block_client_ref = &block_client;
1155 let test_one = |offset, len, fill| async move {
1156 let buf = vec![fill; len];
1157 block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1158 let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1160 block_client_ref
1161 .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1162 .await
1163 .expect("read_at failed");
1164 assert_eq!(
1165 &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1166 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1167 );
1168 assert_eq!(
1169 &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1170 &buf[..]
1171 );
1172 assert_eq!(
1173 &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1174 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1175 );
1176 };
1177 const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1178 join!(
1179 test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1180 test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1181 );
1182 }
1183
1184 struct FakeBlockServer<'a> {
1188 server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1189 channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1190 fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1191 }
1192
1193 impl<'a> FakeBlockServer<'a> {
1194 fn new(
1206 server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1207 channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1208 fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1209 ) -> FakeBlockServer<'a> {
1210 FakeBlockServer {
1211 server_channel: Some(server_channel),
1212 channel_handler: Box::new(channel_handler),
1213 fifo_handler: Box::new(fifo_handler),
1214 }
1215 }
1216
1217 async fn run(&mut self) {
1219 let server = self.server_channel.take().unwrap();
1220
1221 let (server_fifo, client_fifo) =
1223 zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1224 .expect("Fifo::create failed");
1225 let maybe_server_fifo = std::sync::Mutex::new(Some(client_fifo));
1226
1227 let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1228 let fifo_future = Abortable::new(
1229 async {
1230 let fifo = fasync::Fifo::from_fifo(server_fifo);
1231 loop {
1232 let request = match fifo.read_entry().await {
1233 Ok(r) => r,
1234 Err(zx::Status::PEER_CLOSED) => break,
1235 Err(e) => panic!("read_entry failed {:?}", e),
1236 };
1237
1238 let response = self.fifo_handler.as_ref()(request);
1239 fifo.write_entries(std::slice::from_ref(&response))
1240 .await
1241 .expect("write_entries failed");
1242 }
1243 },
1244 fifo_future_abort_registration,
1245 );
1246
1247 let channel_future = async {
1248 server
1249 .into_stream()
1250 .for_each_concurrent(None, |request| async {
1251 let request = request.expect("unexpected fidl error");
1252
1253 match request {
1254 block::BlockRequest::GetInfo { responder } => {
1255 responder
1256 .send(Ok(&block::BlockInfo {
1257 block_count: 1024,
1258 block_size: 512,
1259 max_transfer_size: 1024 * 1024,
1260 flags: block::Flag::empty(),
1261 }))
1262 .expect("send failed");
1263 }
1264 block::BlockRequest::OpenSession { session, control_handle: _ } => {
1265 let stream = session.into_stream();
1266 stream
1267 .for_each(|request| async {
1268 let request = request.expect("unexpected fidl error");
1269 if self.channel_handler.as_ref()(&request) {
1272 return;
1273 }
1274 match request {
1275 block::SessionRequest::GetFifo { responder } => {
1276 match maybe_server_fifo.lock().unwrap().take() {
1277 Some(fifo) => {
1278 responder.send(Ok(fifo.downcast()))
1279 }
1280 None => responder.send(Err(
1281 zx::Status::NO_RESOURCES.into_raw(),
1282 )),
1283 }
1284 .expect("send failed")
1285 }
1286 block::SessionRequest::AttachVmo {
1287 vmo: _,
1288 responder,
1289 } => responder
1290 .send(Ok(&block::VmoId { id: 1 }))
1291 .expect("send failed"),
1292 block::SessionRequest::Close { responder } => {
1293 fifo_future_abort.abort();
1294 responder.send(Ok(())).expect("send failed")
1295 }
1296 }
1297 })
1298 .await
1299 }
1300 _ => panic!("Unexpected message"),
1301 }
1302 })
1303 .await;
1304 };
1305
1306 let _result = join!(fifo_future, channel_future);
1307 }
1309 }
1310
1311 #[fuchsia::test]
1312 async fn test_block_close_is_called() {
1313 let close_called = std::sync::Mutex::new(false);
1314 let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1315
1316 std::thread::spawn(move || {
1317 let _block_client =
1318 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1319 });
1321
1322 let channel_handler = |request: &block::SessionRequest| -> bool {
1323 if let block::SessionRequest::Close { .. } = request {
1324 *close_called.lock().unwrap() = true;
1325 }
1326 false
1327 };
1328 FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1329
1330 assert!(*close_called.lock().unwrap());
1332 }
1333
1334 #[fuchsia::test]
1335 async fn test_block_flush_is_called() {
1336 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1337
1338 struct Interface {
1339 flush_called: Arc<AtomicBool>,
1340 }
1341 impl block_server::async_interface::Interface for Interface {
1342 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1343 Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1344 device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1345 block_range: Some(0..1000),
1346 type_guid: [0; 16],
1347 instance_guid: [0; 16],
1348 name: "foo".to_string(),
1349 flags: 0,
1350 })))
1351 }
1352
1353 async fn read(
1354 &self,
1355 _device_block_offset: u64,
1356 _block_count: u32,
1357 _vmo: &Arc<zx::Vmo>,
1358 _vmo_offset: u64,
1359 _trace_flow_id: Option<NonZero<u64>>,
1360 ) -> Result<(), zx::Status> {
1361 unreachable!();
1362 }
1363
1364 async fn write(
1365 &self,
1366 _device_block_offset: u64,
1367 _block_count: u32,
1368 _vmo: &Arc<zx::Vmo>,
1369 _vmo_offset: u64,
1370 _opts: WriteOptions,
1371 _trace_flow_id: Option<NonZero<u64>>,
1372 ) -> Result<(), zx::Status> {
1373 unreachable!();
1374 }
1375
1376 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1377 self.flush_called.store(true, Ordering::Relaxed);
1378 Ok(())
1379 }
1380
1381 async fn trim(
1382 &self,
1383 _device_block_offset: u64,
1384 _block_count: u32,
1385 _trace_flow_id: Option<NonZero<u64>>,
1386 ) -> Result<(), zx::Status> {
1387 unreachable!();
1388 }
1389 }
1390
1391 let flush_called = Arc::new(AtomicBool::new(false));
1392
1393 futures::join!(
1394 async {
1395 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1396
1397 block_client.flush().await.expect("flush failed");
1398 },
1399 async {
1400 let block_server = BlockServer::new(
1401 512,
1402 Arc::new(Interface { flush_called: flush_called.clone() }),
1403 );
1404 block_server.handle_requests(stream.cast_stream()).await.unwrap();
1405 }
1406 );
1407
1408 assert!(flush_called.load(Ordering::Relaxed));
1409 }
1410
1411 #[fuchsia::test]
1412 async fn test_trace_flow_ids_set() {
1413 let (proxy, server) = fidl::endpoints::create_proxy();
1414
1415 futures::join!(
1416 async {
1417 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1418 block_client.flush().await.expect("flush failed");
1419 },
1420 async {
1421 let flow_id: std::sync::Mutex<Option<u64>> = std::sync::Mutex::new(None);
1422 let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1423 if request.trace_flow_id > 0 {
1424 *flow_id.lock().unwrap() = Some(request.trace_flow_id);
1425 }
1426 BlockFifoResponse {
1427 status: zx::Status::OK.into_raw(),
1428 reqid: request.reqid,
1429 ..Default::default()
1430 }
1431 };
1432 FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1433 assert!(flow_id.lock().unwrap().is_some());
1435 }
1436 );
1437 }
1438}