1use anyhow::{anyhow, Error};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
7use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
8use fuchsia_sync::Mutex;
9use futures::{Future, FutureExt as _, TryStreamExt as _};
10use std::borrow::Cow;
11use std::collections::btree_map::Entry;
12use std::collections::BTreeMap;
13use std::num::NonZero;
14use std::ops::Range;
15use std::sync::atomic::AtomicU64;
16use std::sync::Arc;
17use zx::HandleBased;
18use {
19 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
20 fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
21};
22
23pub mod async_interface;
24pub mod c_interface;
25
26#[derive(Clone)]
27pub enum DeviceInfo {
28 Block(BlockInfo),
29 Partition(PartitionInfo),
30}
31
32impl DeviceInfo {
33 fn max_transfer_size(&self, block_size: u32) -> u32 {
34 let max_blocks = match self {
35 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks,
36 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => max_transfer_blocks,
37 };
38 if let Some(max_blocks) = max_blocks {
39 max_blocks.get() * block_size
40 } else {
41 MAX_TRANSFER_UNBOUNDED
42 }
43 }
44}
45
46#[derive(Clone)]
48pub struct BlockInfo {
49 pub device_flags: fblock::Flag,
50 pub block_count: u64,
51 pub max_transfer_blocks: Option<NonZero<u32>>,
52}
53
54#[derive(Clone)]
56pub struct PartitionInfo {
57 pub device_flags: fblock::Flag,
59 pub max_transfer_blocks: Option<NonZero<u32>>,
60 pub block_range: Option<Range<u64>>,
64 pub type_guid: [u8; 16],
65 pub instance_guid: [u8; 16],
66 pub name: String,
67 pub flags: u64,
68}
69
70struct FifoMessageGroup {
83 status: zx::Status,
84 count: u32,
85 req_id: Option<u32>,
86}
87
88impl FifoMessageGroup {
89 fn new() -> Self {
90 FifoMessageGroup { status: zx::Status::OK, count: 0, req_id: None }
91 }
92}
93
94#[derive(Default)]
95struct FifoMessageGroups(Mutex<BTreeMap<u16, FifoMessageGroup>>);
96
97impl FifoMessageGroups {
99 fn complete(&self, group_id: u16, status: zx::Status) -> Option<BlockFifoResponse> {
102 let mut map = self.0.lock();
103 let Entry::Occupied(mut o) = map.entry(group_id) else { unreachable!() };
104 let group = o.get_mut();
105 if group.count == 1 {
106 if let Some(reqid) = group.req_id {
107 let status =
108 if group.status != zx::Status::OK { group.status } else { status }.into_raw();
109
110 o.remove();
111
112 return Some(BlockFifoResponse {
113 status,
114 reqid,
115 group: group_id,
116 ..Default::default()
117 });
118 }
119 }
120
121 group.count = group.count.checked_sub(1).unwrap();
122 if status != zx::Status::OK && group.status == zx::Status::OK {
123 group.status = status
124 }
125 None
126 }
127}
128
129pub struct BlockServer<SM> {
132 block_size: u32,
133 session_manager: Arc<SM>,
134}
135
136pub struct BlockOffsetMapping {
138 source_block_offset: u64,
139 target_block_offset: u64,
140 length: u64,
141}
142
143impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
144 type Error = zx::Status;
145
146 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
147 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
148 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
149 Ok(Self {
150 source_block_offset: wire.source_block_offset,
151 target_block_offset: wire.target_block_offset,
152 length: wire.length,
153 })
154 }
155}
156
157pub struct OffsetMap {
163 mapping: BlockOffsetMapping,
164}
165
166impl OffsetMap {
167 pub fn new(mapping: BlockOffsetMapping) -> Self {
168 Self { mapping }
169 }
170
171 pub fn adjust_request(&self, dev_offset: u64, length: u64) -> Option<u64> {
174 if length == 0 {
175 return None;
176 }
177 let end = dev_offset.checked_add(length)?;
178 if self.mapping.source_block_offset > dev_offset
179 || end > self.mapping.source_block_offset + self.mapping.length
180 {
181 return None;
182 }
183 let delta = dev_offset - self.mapping.source_block_offset;
184 Some(self.mapping.target_block_offset + delta)
185 }
186}
187
188pub trait SessionManager: 'static {
191 fn on_attach_vmo(
192 self: Arc<Self>,
193 vmo: &Arc<zx::Vmo>,
194 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
195
196 fn open_session(
201 self: Arc<Self>,
202 stream: fblock::SessionRequestStream,
203 offset_map: Option<OffsetMap>,
204 block_size: u32,
205 ) -> impl Future<Output = Result<(), Error>> + Send;
206
207 fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
209
210 fn get_volume_info(
212 &self,
213 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
214 {
215 async { Err(zx::Status::NOT_SUPPORTED) }
216 }
217
218 fn query_slices(
220 &self,
221 _start_slices: &[u64],
222 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
223 async { Err(zx::Status::NOT_SUPPORTED) }
224 }
225
226 fn extend(
228 &self,
229 _start_slice: u64,
230 _slice_count: u64,
231 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
232 async { Err(zx::Status::NOT_SUPPORTED) }
233 }
234
235 fn shrink(
237 &self,
238 _start_slice: u64,
239 _slice_count: u64,
240 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
241 async { Err(zx::Status::NOT_SUPPORTED) }
242 }
243}
244
245pub trait IntoSessionManager {
246 type SM: SessionManager;
247
248 fn into_session_manager(self) -> Arc<Self::SM>;
249}
250
251impl<SM: SessionManager> BlockServer<SM> {
252 pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
253 Self { block_size, session_manager: session_manager.into_session_manager() }
254 }
255
256 pub async fn handle_requests(
258 &self,
259 mut requests: fvolume::VolumeRequestStream,
260 ) -> Result<(), Error> {
261 let scope = fasync::Scope::new();
262 while let Some(request) = requests.try_next().await.unwrap() {
263 if let Some(session) = self.handle_request(request).await? {
264 scope.spawn(session.map(|_| ()));
265 }
266 }
267 scope.await;
268 Ok(())
269 }
270
271 async fn handle_request(
274 &self,
275 request: fvolume::VolumeRequest,
276 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
277 match request {
278 fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
279 Ok(info) => {
280 let max_transfer_size = info.max_transfer_size(self.block_size);
281 let (block_count, flags) = match info.as_ref() {
282 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
283 (*block_count, *device_flags)
284 }
285 DeviceInfo::Partition(partition_info) => {
286 let block_count = if let Some(range) =
287 partition_info.block_range.as_ref()
288 {
289 range.end - range.start
290 } else {
291 let volume_info = self.session_manager.get_volume_info().await?;
292 volume_info.0.slice_size * volume_info.1.partition_slice_count
293 / self.block_size as u64
294 };
295 (block_count, partition_info.device_flags)
296 }
297 };
298 responder.send(Ok(&fblock::BlockInfo {
299 block_count,
300 block_size: self.block_size,
301 max_transfer_size,
302 flags,
303 }))?;
304 }
305 Err(status) => responder.send(Err(status.into_raw()))?,
306 },
307 fvolume::VolumeRequest::GetStats { clear: _, responder } => {
308 responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw()))?;
310 }
311 fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
312 return Ok(Some(self.session_manager.clone().open_session(
313 session.into_stream(),
314 None,
315 self.block_size,
316 )));
317 }
318 fvolume::VolumeRequest::OpenSessionWithOffsetMap {
319 session,
320 offset_map,
321 initial_mappings,
322 control_handle: _,
323 } => {
324 if offset_map.is_some() || initial_mappings.as_ref().is_none_or(|m| m.len() != 1) {
325 session.close_with_epitaph(zx::Status::NOT_SUPPORTED)?;
329 return Ok(None);
330 }
331 let initial_mapping = match initial_mappings.unwrap().pop().unwrap().try_into() {
332 Ok(m) => m,
333 Err(status) => {
334 session.close_with_epitaph(status)?;
335 return Ok(None);
336 }
337 };
338 let offset_map = OffsetMap::new(initial_mapping);
339 return Ok(Some(self.session_manager.clone().open_session(
340 session.into_stream(),
341 Some(offset_map),
342 self.block_size,
343 )));
344 }
345 fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
346 Ok(info) => {
347 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
348 let mut guid =
349 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
350 guid.value.copy_from_slice(&partition_info.type_guid);
351 responder.send(zx::sys::ZX_OK, Some(&guid))?;
352 } else {
353 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
354 }
355 }
356 Err(status) => {
357 responder.send(status.into_raw(), None)?;
358 }
359 },
360 fvolume::VolumeRequest::GetInstanceGuid { responder } => {
361 match self.device_info().await {
362 Ok(info) => {
363 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
364 let mut guid =
365 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
366 guid.value.copy_from_slice(&partition_info.instance_guid);
367 responder.send(zx::sys::ZX_OK, Some(&guid))?;
368 } else {
369 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
370 }
371 }
372 Err(status) => {
373 responder.send(status.into_raw(), None)?;
374 }
375 }
376 }
377 fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
378 Ok(info) => {
379 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
380 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
381 } else {
382 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
383 }
384 }
385 Err(status) => {
386 responder.send(status.into_raw(), None)?;
387 }
388 },
389 fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
390 Ok(info) => {
391 if let DeviceInfo::Partition(info) = info.as_ref() {
392 let mut type_guid =
393 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
394 type_guid.value.copy_from_slice(&info.type_guid);
395 let mut instance_guid =
396 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
397 instance_guid.value.copy_from_slice(&info.instance_guid);
398 responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
399 name: Some(info.name.clone()),
400 type_guid: Some(type_guid),
401 instance_guid: Some(instance_guid),
402 start_block_offset: info.block_range.as_ref().map(|range| range.start),
403 num_blocks: info
404 .block_range
405 .as_ref()
406 .map(|range| range.end - range.start),
407 flags: Some(info.flags),
408 ..Default::default()
409 }))?;
410 }
411 }
412 Err(status) => responder.send(Err(status.into_raw()))?,
413 },
414 fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
415 match self.session_manager.query_slices(&start_slices).await {
416 Ok(mut results) => {
417 let results_len = results.len();
418 assert!(results_len <= 16);
419 results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
420 responder.send(
421 zx::sys::ZX_OK,
422 &results.try_into().unwrap(),
423 results_len as u64,
424 )?;
425 }
426 Err(s) => {
427 responder.send(
428 s.into_raw(),
429 &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
430 0,
431 )?;
432 }
433 }
434 }
435 fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
436 match self.session_manager.get_volume_info().await {
437 Ok((manager_info, volume_info)) => {
438 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
439 }
440 Err(s) => responder.send(s.into_raw(), None, None)?,
441 }
442 }
443 fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
444 responder.send(
445 zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
446 .into_raw(),
447 )?;
448 }
449 fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
450 responder.send(
451 zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
452 .into_raw(),
453 )?;
454 }
455 fvolume::VolumeRequest::Destroy { responder, .. } => {
456 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
457 }
458 }
459 Ok(None)
460 }
461
462 async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
463 self.session_manager.get_info().await
464 }
465}
466
467struct SessionHelper<SM: SessionManager> {
468 session_manager: Arc<SM>,
469 offset_map: Option<OffsetMap>,
470 block_size: u32,
471 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
472 vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
473 message_groups: FifoMessageGroups,
474}
475
476impl<SM: SessionManager> SessionHelper<SM> {
477 fn new(
478 session_manager: Arc<SM>,
479 offset_map: Option<OffsetMap>,
480 block_size: u32,
481 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
482 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
483 Ok((
484 Self {
485 session_manager,
486 offset_map,
487 block_size,
488 peer_fifo,
489 vmos: Mutex::default(),
490 message_groups: FifoMessageGroups::default(),
491 },
492 fifo,
493 ))
494 }
495
496 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
497 match request {
498 fblock::SessionRequest::GetFifo { responder } => {
499 let rights = zx::Rights::TRANSFER
500 | zx::Rights::READ
501 | zx::Rights::WRITE
502 | zx::Rights::SIGNAL
503 | zx::Rights::WAIT;
504 match self.peer_fifo.duplicate_handle(rights) {
505 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
506 Err(s) => responder.send(Err(s.into_raw()))?,
507 }
508 Ok(())
509 }
510 fblock::SessionRequest::AttachVmo { vmo, responder } => {
511 let vmo = Arc::new(vmo);
512 let vmo_id = {
513 let mut vmos = self.vmos.lock();
514 if vmos.len() == u16::MAX as usize {
515 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
516 return Ok(());
517 } else {
518 let vmo_id = match vmos.last_entry() {
519 None => 1,
520 Some(o) => {
521 o.key().checked_add(1).unwrap_or_else(|| {
522 let mut vmo_id = 1;
523 for (&id, _) in &*vmos {
525 if id > vmo_id {
526 break;
527 }
528 vmo_id = id + 1;
529 }
530 vmo_id
531 })
532 }
533 };
534 vmos.insert(vmo_id, vmo.clone());
535 vmo_id
536 }
537 };
538 self.session_manager.clone().on_attach_vmo(&vmo).await?;
539 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
540 Ok(())
541 }
542 fblock::SessionRequest::Close { responder } => {
543 responder.send(Ok(()))?;
544 Err(anyhow!("Closed"))
545 }
546 }
547 }
548
549 fn finish_fifo_request(
550 &self,
551 request: RequestTracking,
552 status: zx::Status,
553 ) -> Option<BlockFifoResponse> {
554 match request.group_or_request {
555 GroupOrRequest::Group(group_id) => {
556 let response = self.message_groups.complete(group_id, status);
557 fuchsia_trace::duration!(
558 c"storage",
559 c"block_server::finish_transaction_in_group",
560 "group" => u32::from(group_id),
561 "group_completed" => response.is_some(),
562 "status" => status.into_raw());
563 if let Some(trace_flow_id) = request.trace_flow_id {
564 fuchsia_trace::flow_step!(
565 c"storage",
566 c"block_server::finish_transaction",
567 trace_flow_id.get().into()
568 );
569 }
570 response
571 }
572 GroupOrRequest::Request(reqid) => {
573 fuchsia_trace::duration!(
574 c"storage", c"block_server::finish_transaction", "status" => status.into_raw());
575 if let Some(trace_flow_id) = request.trace_flow_id {
576 fuchsia_trace::flow_step!(
577 c"storage",
578 c"block_server::finish_transaction",
579 trace_flow_id.get().into()
580 );
581 }
582 Some(BlockFifoResponse { status: status.into_raw(), reqid, ..Default::default() })
583 }
584 }
585 }
586
587 fn decode_fifo_request(&self, request: &BlockFifoRequest) -> Option<DecodedRequest> {
588 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
589 let is_group = flags.contains(BlockIoFlag::GROUP_ITEM);
590 let last_in_group = flags.contains(BlockIoFlag::GROUP_LAST);
591 let mut op_code =
592 BlockOpcode::from_primitive(request.command.opcode).ok_or(zx::Status::INVALID_ARGS);
593
594 let group_or_request = if is_group {
595 let mut groups = self.message_groups.0.lock();
596 let group = groups.entry(request.group).or_insert_with(|| FifoMessageGroup::new());
597 if group.req_id.is_some() {
598 if group.status == zx::Status::OK {
600 group.status = zx::Status::INVALID_ARGS;
601 }
602 return None;
603 }
604 if last_in_group {
605 group.req_id = Some(request.reqid);
606 if group.status != zx::Status::OK {
608 op_code = Err(group.status);
609 }
610 } else if group.status != zx::Status::OK {
611 return None;
614 }
615 group.count += 1;
616 GroupOrRequest::Group(request.group)
617 } else {
618 GroupOrRequest::Request(request.reqid)
619 };
620
621 let mut vmo_offset = 0;
622 let vmo = match op_code {
623 Ok(BlockOpcode::Read) | Ok(BlockOpcode::Write) => (|| {
624 if request.length == 0 {
625 return Err(zx::Status::INVALID_ARGS);
626 }
627 vmo_offset = request
628 .vmo_offset
629 .checked_mul(self.block_size as u64)
630 .ok_or(zx::Status::OUT_OF_RANGE)?;
631 self.vmos
632 .lock()
633 .get(&request.vmoid)
634 .cloned()
635 .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo)))
636 })(),
637 Ok(BlockOpcode::CloseVmo) => self
638 .vmos
639 .lock()
640 .remove(&request.vmoid)
641 .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo))),
642 _ => Ok(None),
643 }
644 .unwrap_or_else(|e| {
645 op_code = Err(e);
646 None
647 });
648
649 let operation = op_code.map(|code| match code {
650 BlockOpcode::Read => Operation::Read {
651 device_block_offset: request.dev_offset,
652 block_count: request.length,
653 _unused: 0,
654 vmo_offset,
655 },
656 BlockOpcode::Write => Operation::Write {
657 device_block_offset: request.dev_offset,
658 block_count: request.length,
659 options: if flags.contains(BlockIoFlag::FORCE_ACCESS) {
660 WriteOptions::FORCE_ACCESS
661 } else {
662 WriteOptions::empty()
663 },
664 vmo_offset,
665 },
666 BlockOpcode::Flush => Operation::Flush,
667 BlockOpcode::Trim => Operation::Trim {
668 device_block_offset: request.dev_offset,
669 block_count: request.length,
670 },
671 BlockOpcode::CloseVmo => Operation::CloseVmo,
672 });
673 if let Ok(operation) = operation.as_ref() {
674 use fuchsia_trace::ArgValue;
675 static CACHE: AtomicU64 = AtomicU64::new(0);
676 if let Some(context) =
677 fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
678 {
679 let trace_args_with_group = [
680 ArgValue::of("group", u32::from(group_or_request.group_id())),
681 ArgValue::of("opcode", operation.trace_label()),
682 ];
683 let trace_args = [ArgValue::of("opcode", operation.trace_label())];
684 let _scope = if group_or_request.is_group() {
685 fuchsia_trace::duration(
686 c"storage",
687 c"block_server::start_transaction",
688 &trace_args_with_group,
689 )
690 } else {
691 fuchsia_trace::duration(
692 c"storage",
693 c"block_server::start_transaction",
694 &trace_args,
695 )
696 };
697 let trace_flow_id = NonZero::new(request.trace_flow_id);
698 if let Some(trace_flow_id) = trace_flow_id.clone() {
699 fuchsia_trace::flow_step(
700 &context,
701 c"block_server::start_trnsaction",
702 trace_flow_id.get().into(),
703 &[],
704 );
705 }
706 }
707 }
708 Some(DecodedRequest::new(
709 RequestTracking {
710 group_or_request,
711 trace_flow_id: NonZero::new(request.trace_flow_id),
712 },
713 operation,
714 vmo,
715 self.offset_map.as_ref(),
716 ))
717 }
718
719 fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
720 std::mem::take(&mut *self.vmos.lock())
721 }
722}
723
724#[derive(Debug)]
725struct DecodedRequest {
726 request_tracking: RequestTracking,
727 operation: Result<Operation, zx::Status>,
728 vmo: Option<Arc<zx::Vmo>>,
729}
730
731impl DecodedRequest {
732 fn new(
733 request_tracking: RequestTracking,
734 operation: Result<Operation, zx::Status>,
735 vmo: Option<Arc<zx::Vmo>>,
736 offset_map: Option<&OffsetMap>,
737 ) -> Self {
738 let operation =
739 operation.and_then(|operation| operation.validate_and_adjust_range(offset_map));
740 Self { request_tracking, operation, vmo }
741 }
742}
743
744pub type WriteOptions = block_protocol::WriteOptions;
746
747#[repr(C)]
748#[derive(Debug)]
749pub enum Operation {
750 Read {
755 device_block_offset: u64,
756 block_count: u32,
757 _unused: u32,
758 vmo_offset: u64,
759 },
760 Write {
761 device_block_offset: u64,
762 block_count: u32,
763 options: WriteOptions,
764 vmo_offset: u64,
765 },
766 Flush,
767 Trim {
768 device_block_offset: u64,
769 block_count: u32,
770 },
771 CloseVmo,
773}
774
775impl Operation {
776 fn validate_and_adjust_range(
779 mut self,
780 offset_map: Option<&OffsetMap>,
781 ) -> Result<Operation, zx::Status> {
782 let adjust_offset = |dev_offset, length| {
783 if length == 0 {
785 return Err(zx::Status::INVALID_ARGS);
786 }
787 if let Some(offset_map) = offset_map {
788 offset_map.adjust_request(dev_offset, length as u64).ok_or(zx::Status::OUT_OF_RANGE)
789 } else {
790 Ok(dev_offset)
791 }
792 };
793 match &mut self {
794 Operation::Read { device_block_offset, block_count, .. } => {
795 *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
796 }
797 Operation::Write { device_block_offset, block_count, .. } => {
798 *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
799 }
800 Operation::Trim { device_block_offset, block_count, .. } => {
801 *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
802 }
803 _ => {}
804 }
805 Ok(self)
806 }
807 fn trace_label(&self) -> &'static str {
808 match self {
809 Operation::Read { .. } => "read",
810 Operation::Write { .. } => "write",
811 Operation::Flush { .. } => "flush",
812 Operation::Trim { .. } => "trim",
813 Operation::CloseVmo { .. } => "close_vmo",
814 }
815 }
816}
817
818#[derive(Clone, Copy, Debug)]
819pub struct RequestTracking {
820 group_or_request: GroupOrRequest,
821 trace_flow_id: Option<NonZero<u64>>,
822}
823
824#[derive(Clone, Copy, Debug)]
825pub enum GroupOrRequest {
826 Group(u16),
827 Request(u32),
828}
829
830impl GroupOrRequest {
831 fn is_group(&self) -> bool {
832 if let Self::Group(_) = self {
833 true
834 } else {
835 false
836 }
837 }
838
839 fn group_id(&self) -> u16 {
840 match self {
841 Self::Group(id) => *id,
842 Self::Request(_) => 0,
843 }
844 }
845}
846
847const IS_GROUP: u64 = 0x8000_0000_0000_0000;
849const USED_VMO: u64 = 0x4000_0000_0000_0000;
851#[repr(transparent)]
852#[derive(Clone, Copy, Eq, PartialEq, Hash)]
853pub struct RequestId(u64);
854
855impl RequestId {
856 fn with_vmo(self) -> Self {
859 Self(self.0 | USED_VMO)
860 }
861
862 fn did_have_vmo(&self) -> bool {
864 self.0 & USED_VMO != 0
865 }
866}
867
868impl From<GroupOrRequest> for RequestId {
869 fn from(value: GroupOrRequest) -> Self {
870 match value {
871 GroupOrRequest::Group(group) => RequestId(group as u64 | IS_GROUP),
872 GroupOrRequest::Request(request) => RequestId(request as u64),
873 }
874 }
875}
876
877impl From<RequestId> for GroupOrRequest {
878 fn from(value: RequestId) -> Self {
879 if value.0 & IS_GROUP == 0 {
880 GroupOrRequest::Request(value.0 as u32)
881 } else {
882 GroupOrRequest::Group(value.0 as u16)
883 }
884 }
885}
886
887#[cfg(test)]
888mod tests {
889 use super::{BlockServer, DeviceInfo, PartitionInfo};
890 use crate::async_interface::FIFO_MAX_REQUESTS;
891 use assert_matches::assert_matches;
892 use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteOptions};
893 use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
894 use fuchsia_sync::Mutex;
895 use futures::channel::oneshot;
896 use futures::future::BoxFuture;
897 use futures::FutureExt as _;
898 use std::borrow::Cow;
899 use std::future::poll_fn;
900 use std::num::NonZero;
901 use std::pin::pin;
902 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
903 use std::sync::Arc;
904 use std::task::{Context, Poll};
905 use zx::{AsHandleRef as _, HandleBased as _};
906 use {
907 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
908 fuchsia_async as fasync,
909 };
910
911 #[derive(Default)]
912 struct MockInterface {
913 read_hook: Option<
914 Box<
915 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
916 + Send
917 + Sync,
918 >,
919 >,
920 }
921
922 impl super::async_interface::Interface for MockInterface {
923 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
924 Ok(())
925 }
926
927 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
928 Ok(Cow::Owned(test_device_info()))
929 }
930
931 async fn read(
932 &self,
933 device_block_offset: u64,
934 block_count: u32,
935 vmo: &Arc<zx::Vmo>,
936 vmo_offset: u64,
937 _trace_flow_id: Option<NonZero<u64>>,
938 ) -> Result<(), zx::Status> {
939 if let Some(read_hook) = &self.read_hook {
940 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
941 } else {
942 unimplemented!();
943 }
944 }
945
946 async fn write(
947 &self,
948 _device_block_offset: u64,
949 _block_count: u32,
950 _vmo: &Arc<zx::Vmo>,
951 _vmo_offset: u64,
952 _opts: WriteOptions,
953 _trace_flow_id: Option<NonZero<u64>>,
954 ) -> Result<(), zx::Status> {
955 unreachable!();
956 }
957
958 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
959 unreachable!();
960 }
961
962 async fn trim(
963 &self,
964 _device_block_offset: u64,
965 _block_count: u32,
966 _trace_flow_id: Option<NonZero<u64>>,
967 ) -> Result<(), zx::Status> {
968 unreachable!();
969 }
970
971 async fn get_volume_info(
972 &self,
973 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
974 let () = std::future::pending().await;
976 unreachable!();
977 }
978 }
979
980 const BLOCK_SIZE: u32 = 512;
981
982 fn test_device_info() -> DeviceInfo {
983 DeviceInfo::Partition(PartitionInfo {
984 device_flags: fblock::Flag::READONLY,
985 max_transfer_blocks: None,
986 block_range: Some(12..34),
987 type_guid: [1; 16],
988 instance_guid: [2; 16],
989 name: "foo".to_string(),
990 flags: 0xabcd,
991 })
992 }
993
994 #[fuchsia::test]
995 async fn test_info() {
996 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
997
998 futures::join!(
999 async {
1000 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1001 block_server.handle_requests(stream).await.unwrap();
1002 },
1003 async {
1004 let expected_info = test_device_info();
1005 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1006 info
1007 } else {
1008 unreachable!()
1009 };
1010
1011 let block_info = proxy.get_info().await.unwrap().unwrap();
1012 assert_eq!(
1013 block_info.block_count,
1014 partition_info.block_range.as_ref().unwrap().end
1015 - partition_info.block_range.as_ref().unwrap().start
1016 );
1017 assert_eq!(block_info.flags, fblock::Flag::READONLY);
1018
1019 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1022 assert_eq!(status, zx::sys::ZX_OK);
1023 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1024
1025 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1026 assert_eq!(status, zx::sys::ZX_OK);
1027 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1028
1029 let (status, name) = proxy.get_name().await.unwrap();
1030 assert_eq!(status, zx::sys::ZX_OK);
1031 assert_eq!(name.as_ref(), Some(&partition_info.name));
1032
1033 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1034 assert_eq!(metadata.name, name);
1035 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1036 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1037 assert_eq!(
1038 metadata.start_block_offset,
1039 Some(partition_info.block_range.as_ref().unwrap().start)
1040 );
1041 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1042 assert_eq!(metadata.flags, Some(partition_info.flags));
1043
1044 std::mem::drop(proxy);
1045 }
1046 );
1047 }
1048
1049 #[fuchsia::test]
1050 async fn test_attach_vmo() {
1051 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1052
1053 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1054 let koid = vmo.get_koid().unwrap();
1055
1056 futures::join!(
1057 async {
1058 let block_server = BlockServer::new(
1059 BLOCK_SIZE,
1060 Arc::new(MockInterface {
1061 read_hook: Some(Box::new(move |_, _, vmo, _| {
1062 assert_eq!(vmo.get_koid().unwrap(), koid);
1063 Box::pin(async { Ok(()) })
1064 })),
1065 ..MockInterface::default()
1066 }),
1067 );
1068 block_server.handle_requests(stream).await.unwrap();
1069 },
1070 async move {
1071 let (session_proxy, server) = fidl::endpoints::create_proxy();
1072
1073 proxy.open_session(server).unwrap();
1074
1075 let vmo_id = session_proxy
1076 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1077 .await
1078 .unwrap()
1079 .unwrap();
1080 assert_ne!(vmo_id.id, 0);
1081
1082 let mut fifo =
1083 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1084 let (mut reader, mut writer) = fifo.async_io();
1085
1086 let mut count = 1;
1088 loop {
1089 match session_proxy
1090 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1091 .await
1092 .unwrap()
1093 {
1094 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1095 Err(e) => {
1096 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1097 break;
1098 }
1099 }
1100
1101 if count % 10 == 0 {
1103 writer
1104 .write_entries(&BlockFifoRequest {
1105 command: BlockFifoCommand {
1106 opcode: BlockOpcode::Read.into_primitive(),
1107 ..Default::default()
1108 },
1109 vmoid: vmo_id.id,
1110 length: 1,
1111 ..Default::default()
1112 })
1113 .await
1114 .unwrap();
1115
1116 let mut response = BlockFifoResponse::default();
1117 reader.read_entries(&mut response).await.unwrap();
1118 assert_eq!(response.status, zx::sys::ZX_OK);
1119 }
1120
1121 count += 1;
1122 }
1123
1124 assert_eq!(count, u16::MAX as u64);
1125
1126 writer
1128 .write_entries(&BlockFifoRequest {
1129 command: BlockFifoCommand {
1130 opcode: BlockOpcode::CloseVmo.into_primitive(),
1131 ..Default::default()
1132 },
1133 vmoid: vmo_id.id,
1134 ..Default::default()
1135 })
1136 .await
1137 .unwrap();
1138
1139 let mut response = BlockFifoResponse::default();
1140 reader.read_entries(&mut response).await.unwrap();
1141 assert_eq!(response.status, zx::sys::ZX_OK);
1142
1143 let new_vmo_id = session_proxy
1144 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1145 .await
1146 .unwrap()
1147 .unwrap();
1148 assert_eq!(new_vmo_id.id, vmo_id.id);
1150
1151 std::mem::drop(proxy);
1152 }
1153 );
1154 }
1155
1156 #[fuchsia::test]
1157 async fn test_close() {
1158 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1159
1160 let mut server = std::pin::pin!(async {
1161 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1162 block_server.handle_requests(stream).await.unwrap();
1163 }
1164 .fuse());
1165
1166 let mut client = std::pin::pin!(async {
1167 let (session_proxy, server) = fidl::endpoints::create_proxy();
1168
1169 proxy.open_session(server).unwrap();
1170
1171 std::mem::drop(proxy);
1174
1175 session_proxy.close().await.unwrap().unwrap();
1176
1177 let _: () = std::future::pending().await;
1179 }
1180 .fuse());
1181
1182 futures::select!(
1183 _ = server => {}
1184 _ = client => unreachable!(),
1185 );
1186 }
1187
1188 #[derive(Default)]
1189 struct IoMockInterface {
1190 do_checks: bool,
1191 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1192 return_errors: bool,
1193 }
1194
1195 #[derive(Debug)]
1196 enum ExpectedOp {
1197 Read(u64, u32, u64),
1198 Write(u64, u32, u64),
1199 Trim(u64, u32),
1200 Flush,
1201 }
1202
1203 impl super::async_interface::Interface for IoMockInterface {
1204 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1205 Ok(())
1206 }
1207
1208 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1209 Ok(Cow::Owned(test_device_info()))
1210 }
1211
1212 async fn read(
1213 &self,
1214 device_block_offset: u64,
1215 block_count: u32,
1216 _vmo: &Arc<zx::Vmo>,
1217 vmo_offset: u64,
1218 _trace_flow_id: Option<NonZero<u64>>,
1219 ) -> Result<(), zx::Status> {
1220 if self.return_errors {
1221 Err(zx::Status::INTERNAL)
1222 } else {
1223 if self.do_checks {
1224 assert_matches!(
1225 self.expected_op.lock().take(),
1226 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1227 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1228 "Read {device_block_offset} {block_count} {vmo_offset}"
1229 );
1230 }
1231 Ok(())
1232 }
1233 }
1234
1235 async fn write(
1236 &self,
1237 device_block_offset: u64,
1238 block_count: u32,
1239 _vmo: &Arc<zx::Vmo>,
1240 vmo_offset: u64,
1241 _opts: WriteOptions,
1242 _trace_flow_id: Option<NonZero<u64>>,
1243 ) -> Result<(), zx::Status> {
1244 if self.return_errors {
1245 Err(zx::Status::NOT_SUPPORTED)
1246 } else {
1247 if self.do_checks {
1248 assert_matches!(
1249 self.expected_op.lock().take(),
1250 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1251 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1252 "Write {device_block_offset} {block_count} {vmo_offset}"
1253 );
1254 }
1255 Ok(())
1256 }
1257 }
1258
1259 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1260 if self.return_errors {
1261 Err(zx::Status::NO_RESOURCES)
1262 } else {
1263 if self.do_checks {
1264 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1265 }
1266 Ok(())
1267 }
1268 }
1269
1270 async fn trim(
1271 &self,
1272 device_block_offset: u64,
1273 block_count: u32,
1274 _trace_flow_id: Option<NonZero<u64>>,
1275 ) -> Result<(), zx::Status> {
1276 if self.return_errors {
1277 Err(zx::Status::NO_MEMORY)
1278 } else {
1279 if self.do_checks {
1280 assert_matches!(
1281 self.expected_op.lock().take(),
1282 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1283 block_count == b,
1284 "Trim {device_block_offset} {block_count}"
1285 );
1286 }
1287 Ok(())
1288 }
1289 }
1290 }
1291
1292 #[fuchsia::test]
1293 async fn test_io() {
1294 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1295
1296 let expected_op = Arc::new(Mutex::new(None));
1297 let expected_op_clone = expected_op.clone();
1298
1299 let server = async {
1300 let block_server = BlockServer::new(
1301 BLOCK_SIZE,
1302 Arc::new(IoMockInterface {
1303 return_errors: false,
1304 do_checks: true,
1305 expected_op: expected_op_clone,
1306 }),
1307 );
1308 block_server.handle_requests(stream).await.unwrap();
1309 };
1310
1311 let client = async move {
1312 let (session_proxy, server) = fidl::endpoints::create_proxy();
1313
1314 proxy.open_session(server).unwrap();
1315
1316 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1317 let vmo_id = session_proxy
1318 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1319 .await
1320 .unwrap()
1321 .unwrap();
1322
1323 let mut fifo =
1324 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1325 let (mut reader, mut writer) = fifo.async_io();
1326
1327 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1329 writer
1330 .write_entries(&BlockFifoRequest {
1331 command: BlockFifoCommand {
1332 opcode: BlockOpcode::Read.into_primitive(),
1333 ..Default::default()
1334 },
1335 vmoid: vmo_id.id,
1336 dev_offset: 1,
1337 length: 2,
1338 vmo_offset: 3,
1339 ..Default::default()
1340 })
1341 .await
1342 .unwrap();
1343
1344 let mut response = BlockFifoResponse::default();
1345 reader.read_entries(&mut response).await.unwrap();
1346 assert_eq!(response.status, zx::sys::ZX_OK);
1347
1348 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1350 writer
1351 .write_entries(&BlockFifoRequest {
1352 command: BlockFifoCommand {
1353 opcode: BlockOpcode::Write.into_primitive(),
1354 ..Default::default()
1355 },
1356 vmoid: vmo_id.id,
1357 dev_offset: 4,
1358 length: 5,
1359 vmo_offset: 6,
1360 ..Default::default()
1361 })
1362 .await
1363 .unwrap();
1364
1365 let mut response = BlockFifoResponse::default();
1366 reader.read_entries(&mut response).await.unwrap();
1367 assert_eq!(response.status, zx::sys::ZX_OK);
1368
1369 *expected_op.lock() = Some(ExpectedOp::Flush);
1371 writer
1372 .write_entries(&BlockFifoRequest {
1373 command: BlockFifoCommand {
1374 opcode: BlockOpcode::Flush.into_primitive(),
1375 ..Default::default()
1376 },
1377 ..Default::default()
1378 })
1379 .await
1380 .unwrap();
1381
1382 reader.read_entries(&mut response).await.unwrap();
1383 assert_eq!(response.status, zx::sys::ZX_OK);
1384
1385 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1387 writer
1388 .write_entries(&BlockFifoRequest {
1389 command: BlockFifoCommand {
1390 opcode: BlockOpcode::Trim.into_primitive(),
1391 ..Default::default()
1392 },
1393 dev_offset: 7,
1394 length: 8,
1395 ..Default::default()
1396 })
1397 .await
1398 .unwrap();
1399
1400 reader.read_entries(&mut response).await.unwrap();
1401 assert_eq!(response.status, zx::sys::ZX_OK);
1402
1403 std::mem::drop(proxy);
1404 };
1405
1406 futures::join!(server, client);
1407 }
1408
1409 #[fuchsia::test]
1410 async fn test_io_errors() {
1411 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1412
1413 futures::join!(
1414 async {
1415 let block_server = BlockServer::new(
1416 BLOCK_SIZE,
1417 Arc::new(IoMockInterface {
1418 return_errors: true,
1419 do_checks: false,
1420 expected_op: Arc::new(Mutex::new(None)),
1421 }),
1422 );
1423 block_server.handle_requests(stream).await.unwrap();
1424 },
1425 async move {
1426 let (session_proxy, server) = fidl::endpoints::create_proxy();
1427
1428 proxy.open_session(server).unwrap();
1429
1430 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1431 let vmo_id = session_proxy
1432 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1433 .await
1434 .unwrap()
1435 .unwrap();
1436
1437 let mut fifo =
1438 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1439 let (mut reader, mut writer) = fifo.async_io();
1440
1441 writer
1443 .write_entries(&BlockFifoRequest {
1444 command: BlockFifoCommand {
1445 opcode: BlockOpcode::Read.into_primitive(),
1446 ..Default::default()
1447 },
1448 vmoid: vmo_id.id,
1449 length: 1,
1450 reqid: 1,
1451 ..Default::default()
1452 })
1453 .await
1454 .unwrap();
1455
1456 let mut response = BlockFifoResponse::default();
1457 reader.read_entries(&mut response).await.unwrap();
1458 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1459
1460 writer
1462 .write_entries(&BlockFifoRequest {
1463 command: BlockFifoCommand {
1464 opcode: BlockOpcode::Write.into_primitive(),
1465 ..Default::default()
1466 },
1467 vmoid: vmo_id.id,
1468 length: 1,
1469 reqid: 2,
1470 ..Default::default()
1471 })
1472 .await
1473 .unwrap();
1474
1475 reader.read_entries(&mut response).await.unwrap();
1476 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1477
1478 writer
1480 .write_entries(&BlockFifoRequest {
1481 command: BlockFifoCommand {
1482 opcode: BlockOpcode::Flush.into_primitive(),
1483 ..Default::default()
1484 },
1485 reqid: 3,
1486 ..Default::default()
1487 })
1488 .await
1489 .unwrap();
1490
1491 reader.read_entries(&mut response).await.unwrap();
1492 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1493
1494 writer
1496 .write_entries(&BlockFifoRequest {
1497 command: BlockFifoCommand {
1498 opcode: BlockOpcode::Trim.into_primitive(),
1499 ..Default::default()
1500 },
1501 reqid: 4,
1502 length: 1,
1503 ..Default::default()
1504 })
1505 .await
1506 .unwrap();
1507
1508 reader.read_entries(&mut response).await.unwrap();
1509 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1510
1511 std::mem::drop(proxy);
1512 }
1513 );
1514 }
1515
1516 #[fuchsia::test]
1517 async fn test_invalid_args() {
1518 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1519
1520 futures::join!(
1521 async {
1522 let block_server = BlockServer::new(
1523 BLOCK_SIZE,
1524 Arc::new(IoMockInterface {
1525 return_errors: false,
1526 do_checks: false,
1527 expected_op: Arc::new(Mutex::new(None)),
1528 }),
1529 );
1530 block_server.handle_requests(stream).await.unwrap();
1531 },
1532 async move {
1533 let (session_proxy, server) = fidl::endpoints::create_proxy();
1534
1535 proxy.open_session(server).unwrap();
1536
1537 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1538 let vmo_id = session_proxy
1539 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1540 .await
1541 .unwrap()
1542 .unwrap();
1543
1544 let mut fifo =
1545 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1546
1547 async fn test(
1548 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1549 request: BlockFifoRequest,
1550 ) -> Result<(), zx::Status> {
1551 let (mut reader, mut writer) = fifo.async_io();
1552 writer.write_entries(&request).await.unwrap();
1553 let mut response = BlockFifoResponse::default();
1554 reader.read_entries(&mut response).await.unwrap();
1555 zx::Status::ok(response.status)
1556 }
1557
1558 let good_read_request = || BlockFifoRequest {
1561 command: BlockFifoCommand {
1562 opcode: BlockOpcode::Read.into_primitive(),
1563 ..Default::default()
1564 },
1565 vmoid: vmo_id.id,
1566 ..Default::default()
1567 };
1568
1569 assert_eq!(
1570 test(
1571 &mut fifo,
1572 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1573 )
1574 .await,
1575 Err(zx::Status::INVALID_ARGS)
1576 );
1577
1578 assert_eq!(
1579 test(
1580 &mut fifo,
1581 BlockFifoRequest {
1582 vmo_offset: 0xffff_ffff_ffff_ffff,
1583 ..good_read_request()
1584 }
1585 )
1586 .await,
1587 Err(zx::Status::INVALID_ARGS)
1588 );
1589
1590 let good_write_request = || BlockFifoRequest {
1593 command: BlockFifoCommand {
1594 opcode: BlockOpcode::Write.into_primitive(),
1595 ..Default::default()
1596 },
1597 vmoid: vmo_id.id,
1598 ..Default::default()
1599 };
1600
1601 assert_eq!(
1602 test(
1603 &mut fifo,
1604 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
1605 )
1606 .await,
1607 Err(zx::Status::INVALID_ARGS)
1608 );
1609
1610 assert_eq!(
1611 test(
1612 &mut fifo,
1613 BlockFifoRequest {
1614 vmo_offset: 0xffff_ffff_ffff_ffff,
1615 ..good_write_request()
1616 }
1617 )
1618 .await,
1619 Err(zx::Status::INVALID_ARGS)
1620 );
1621
1622 assert_eq!(
1625 test(
1626 &mut fifo,
1627 BlockFifoRequest {
1628 command: BlockFifoCommand {
1629 opcode: BlockOpcode::CloseVmo.into_primitive(),
1630 ..Default::default()
1631 },
1632 vmoid: vmo_id.id + 1,
1633 ..Default::default()
1634 }
1635 )
1636 .await,
1637 Err(zx::Status::IO)
1638 );
1639
1640 std::mem::drop(proxy);
1641 }
1642 );
1643 }
1644
1645 #[fuchsia::test]
1646 async fn test_concurrent_requests() {
1647 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1648
1649 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
1650 let waiting_readers_clone = waiting_readers.clone();
1651
1652 futures::join!(
1653 async move {
1654 let block_server = BlockServer::new(
1655 BLOCK_SIZE,
1656 Arc::new(MockInterface {
1657 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
1658 let (tx, rx) = oneshot::channel();
1659 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
1660 Box::pin(async move {
1661 let _ = rx.await;
1662 Ok(())
1663 })
1664 })),
1665 }),
1666 );
1667 block_server.handle_requests(stream).await.unwrap();
1668 },
1669 async move {
1670 let (session_proxy, server) = fidl::endpoints::create_proxy();
1671
1672 proxy.open_session(server).unwrap();
1673
1674 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1675 let vmo_id = session_proxy
1676 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1677 .await
1678 .unwrap()
1679 .unwrap();
1680
1681 let mut fifo =
1682 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1683 let (mut reader, mut writer) = fifo.async_io();
1684
1685 writer
1686 .write_entries(&BlockFifoRequest {
1687 command: BlockFifoCommand {
1688 opcode: BlockOpcode::Read.into_primitive(),
1689 ..Default::default()
1690 },
1691 reqid: 1,
1692 dev_offset: 1, vmoid: vmo_id.id,
1694 length: 1,
1695 ..Default::default()
1696 })
1697 .await
1698 .unwrap();
1699
1700 writer
1701 .write_entries(&BlockFifoRequest {
1702 command: BlockFifoCommand {
1703 opcode: BlockOpcode::Read.into_primitive(),
1704 ..Default::default()
1705 },
1706 reqid: 2,
1707 dev_offset: 2,
1708 vmoid: vmo_id.id,
1709 length: 1,
1710 ..Default::default()
1711 })
1712 .await
1713 .unwrap();
1714
1715 poll_fn(|cx: &mut Context<'_>| {
1717 if waiting_readers.lock().len() == 2 {
1718 Poll::Ready(())
1719 } else {
1720 cx.waker().wake_by_ref();
1722 Poll::Pending
1723 }
1724 })
1725 .await;
1726
1727 let mut response = BlockFifoResponse::default();
1728 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1729
1730 let (id, tx) = waiting_readers.lock().pop().unwrap();
1731 tx.send(()).unwrap();
1732
1733 reader.read_entries(&mut response).await.unwrap();
1734 assert_eq!(response.status, zx::sys::ZX_OK);
1735 assert_eq!(response.reqid, id);
1736
1737 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1738
1739 let (id, tx) = waiting_readers.lock().pop().unwrap();
1740 tx.send(()).unwrap();
1741
1742 reader.read_entries(&mut response).await.unwrap();
1743 assert_eq!(response.status, zx::sys::ZX_OK);
1744 assert_eq!(response.reqid, id);
1745 }
1746 );
1747 }
1748
1749 #[fuchsia::test]
1750 async fn test_groups() {
1751 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1752
1753 futures::join!(
1754 async move {
1755 let block_server = BlockServer::new(
1756 BLOCK_SIZE,
1757 Arc::new(MockInterface {
1758 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
1759 }),
1760 );
1761 block_server.handle_requests(stream).await.unwrap();
1762 },
1763 async move {
1764 let (session_proxy, server) = fidl::endpoints::create_proxy();
1765
1766 proxy.open_session(server).unwrap();
1767
1768 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1769 let vmo_id = session_proxy
1770 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1771 .await
1772 .unwrap()
1773 .unwrap();
1774
1775 let mut fifo =
1776 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1777 let (mut reader, mut writer) = fifo.async_io();
1778
1779 writer
1780 .write_entries(&BlockFifoRequest {
1781 command: BlockFifoCommand {
1782 opcode: BlockOpcode::Read.into_primitive(),
1783 flags: BlockIoFlag::GROUP_ITEM.bits(),
1784 ..Default::default()
1785 },
1786 group: 1,
1787 vmoid: vmo_id.id,
1788 length: 1,
1789 ..Default::default()
1790 })
1791 .await
1792 .unwrap();
1793
1794 writer
1795 .write_entries(&BlockFifoRequest {
1796 command: BlockFifoCommand {
1797 opcode: BlockOpcode::Read.into_primitive(),
1798 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1799 ..Default::default()
1800 },
1801 reqid: 2,
1802 group: 1,
1803 vmoid: vmo_id.id,
1804 length: 1,
1805 ..Default::default()
1806 })
1807 .await
1808 .unwrap();
1809
1810 let mut response = BlockFifoResponse::default();
1811 reader.read_entries(&mut response).await.unwrap();
1812 assert_eq!(response.status, zx::sys::ZX_OK);
1813 assert_eq!(response.reqid, 2);
1814 assert_eq!(response.group, 1);
1815 }
1816 );
1817 }
1818
1819 #[fuchsia::test]
1820 async fn test_group_error() {
1821 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1822
1823 let counter = Arc::new(AtomicU64::new(0));
1824 let counter_clone = counter.clone();
1825
1826 futures::join!(
1827 async move {
1828 let block_server = BlockServer::new(
1829 BLOCK_SIZE,
1830 Arc::new(MockInterface {
1831 read_hook: Some(Box::new(move |_, _, _, _| {
1832 counter_clone.fetch_add(1, Ordering::Relaxed);
1833 Box::pin(async { Err(zx::Status::BAD_STATE) })
1834 })),
1835 }),
1836 );
1837 block_server.handle_requests(stream).await.unwrap();
1838 },
1839 async move {
1840 let (session_proxy, server) = fidl::endpoints::create_proxy();
1841
1842 proxy.open_session(server).unwrap();
1843
1844 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1845 let vmo_id = session_proxy
1846 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1847 .await
1848 .unwrap()
1849 .unwrap();
1850
1851 let mut fifo =
1852 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1853 let (mut reader, mut writer) = fifo.async_io();
1854
1855 writer
1856 .write_entries(&BlockFifoRequest {
1857 command: BlockFifoCommand {
1858 opcode: BlockOpcode::Read.into_primitive(),
1859 flags: BlockIoFlag::GROUP_ITEM.bits(),
1860 ..Default::default()
1861 },
1862 group: 1,
1863 vmoid: vmo_id.id,
1864 length: 1,
1865 ..Default::default()
1866 })
1867 .await
1868 .unwrap();
1869
1870 poll_fn(|cx: &mut Context<'_>| {
1872 if counter.load(Ordering::Relaxed) == 1 {
1873 Poll::Ready(())
1874 } else {
1875 cx.waker().wake_by_ref();
1877 Poll::Pending
1878 }
1879 })
1880 .await;
1881
1882 let mut response = BlockFifoResponse::default();
1883 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1884
1885 writer
1886 .write_entries(&BlockFifoRequest {
1887 command: BlockFifoCommand {
1888 opcode: BlockOpcode::Read.into_primitive(),
1889 flags: BlockIoFlag::GROUP_ITEM.bits(),
1890 ..Default::default()
1891 },
1892 group: 1,
1893 vmoid: vmo_id.id,
1894 length: 1,
1895 ..Default::default()
1896 })
1897 .await
1898 .unwrap();
1899
1900 writer
1901 .write_entries(&BlockFifoRequest {
1902 command: BlockFifoCommand {
1903 opcode: BlockOpcode::Read.into_primitive(),
1904 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1905 ..Default::default()
1906 },
1907 reqid: 2,
1908 group: 1,
1909 vmoid: vmo_id.id,
1910 length: 1,
1911 ..Default::default()
1912 })
1913 .await
1914 .unwrap();
1915
1916 reader.read_entries(&mut response).await.unwrap();
1917 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
1918 assert_eq!(response.reqid, 2);
1919 assert_eq!(response.group, 1);
1920
1921 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1922
1923 assert_eq!(counter.load(Ordering::Relaxed), 1);
1925 }
1926 );
1927 }
1928
1929 #[fuchsia::test]
1930 async fn test_group_with_two_lasts() {
1931 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1932
1933 let (tx, rx) = oneshot::channel();
1934
1935 futures::join!(
1936 async move {
1937 let rx = Mutex::new(Some(rx));
1938 let block_server = BlockServer::new(
1939 BLOCK_SIZE,
1940 Arc::new(MockInterface {
1941 read_hook: Some(Box::new(move |_, _, _, _| {
1942 let rx = rx.lock().take().unwrap();
1943 Box::pin(async {
1944 let _ = rx.await;
1945 Ok(())
1946 })
1947 })),
1948 }),
1949 );
1950 block_server.handle_requests(stream).await.unwrap();
1951 },
1952 async move {
1953 let (session_proxy, server) = fidl::endpoints::create_proxy();
1954
1955 proxy.open_session(server).unwrap();
1956
1957 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1958 let vmo_id = session_proxy
1959 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1960 .await
1961 .unwrap()
1962 .unwrap();
1963
1964 let mut fifo =
1965 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1966 let (mut reader, mut writer) = fifo.async_io();
1967
1968 writer
1969 .write_entries(&BlockFifoRequest {
1970 command: BlockFifoCommand {
1971 opcode: BlockOpcode::Read.into_primitive(),
1972 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1973 ..Default::default()
1974 },
1975 reqid: 1,
1976 group: 1,
1977 vmoid: vmo_id.id,
1978 length: 1,
1979 ..Default::default()
1980 })
1981 .await
1982 .unwrap();
1983
1984 writer
1985 .write_entries(&BlockFifoRequest {
1986 command: BlockFifoCommand {
1987 opcode: BlockOpcode::Read.into_primitive(),
1988 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1989 ..Default::default()
1990 },
1991 reqid: 2,
1992 group: 1,
1993 vmoid: vmo_id.id,
1994 length: 1,
1995 ..Default::default()
1996 })
1997 .await
1998 .unwrap();
1999
2000 writer
2002 .write_entries(&BlockFifoRequest {
2003 command: BlockFifoCommand {
2004 opcode: BlockOpcode::CloseVmo.into_primitive(),
2005 ..Default::default()
2006 },
2007 reqid: 3,
2008 vmoid: vmo_id.id,
2009 ..Default::default()
2010 })
2011 .await
2012 .unwrap();
2013
2014 let mut response = BlockFifoResponse::default();
2016 reader.read_entries(&mut response).await.unwrap();
2017 assert_eq!(response.status, zx::sys::ZX_OK);
2018 assert_eq!(response.reqid, 3);
2019
2020 tx.send(()).unwrap();
2022
2023 let mut response = BlockFifoResponse::default();
2026 reader.read_entries(&mut response).await.unwrap();
2027 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2028 assert_eq!(response.reqid, 1);
2029 assert_eq!(response.group, 1);
2030 }
2031 );
2032 }
2033
2034 #[fuchsia::test(allow_stalls = false)]
2035 async fn test_requests_dont_block_sessions() {
2036 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2037
2038 let (tx, rx) = oneshot::channel();
2039
2040 fasync::Task::local(async move {
2041 let rx = Mutex::new(Some(rx));
2042 let block_server = BlockServer::new(
2043 BLOCK_SIZE,
2044 Arc::new(MockInterface {
2045 read_hook: Some(Box::new(move |_, _, _, _| {
2046 let rx = rx.lock().take().unwrap();
2047 Box::pin(async {
2048 let _ = rx.await;
2049 Ok(())
2050 })
2051 })),
2052 }),
2053 );
2054 block_server.handle_requests(stream).await.unwrap();
2055 })
2056 .detach();
2057
2058 let mut fut = pin!(async {
2059 let (session_proxy, server) = fidl::endpoints::create_proxy();
2060
2061 proxy.open_session(server).unwrap();
2062
2063 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2064 let vmo_id = session_proxy
2065 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2066 .await
2067 .unwrap()
2068 .unwrap();
2069
2070 let mut fifo =
2071 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2072 let (mut reader, mut writer) = fifo.async_io();
2073
2074 writer
2075 .write_entries(&BlockFifoRequest {
2076 command: BlockFifoCommand {
2077 opcode: BlockOpcode::Read.into_primitive(),
2078 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2079 ..Default::default()
2080 },
2081 reqid: 1,
2082 group: 1,
2083 vmoid: vmo_id.id,
2084 length: 1,
2085 ..Default::default()
2086 })
2087 .await
2088 .unwrap();
2089
2090 let mut response = BlockFifoResponse::default();
2091 reader.read_entries(&mut response).await.unwrap();
2092 assert_eq!(response.status, zx::sys::ZX_OK);
2093 });
2094
2095 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2097
2098 let mut fut2 = pin!(proxy.get_volume_info());
2099
2100 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2102
2103 let _ = tx.send(());
2106
2107 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2108 }
2109
2110 #[fuchsia::test]
2111 async fn test_request_flow_control() {
2112 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2113
2114 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2117 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2118 let event_clone = event.clone();
2119 futures::join!(
2120 async move {
2121 let block_server = BlockServer::new(
2122 BLOCK_SIZE,
2123 Arc::new(MockInterface {
2124 read_hook: Some(Box::new(move |_, _, _, _| {
2125 let event_clone = event_clone.clone();
2126 Box::pin(async move {
2127 if !event_clone.1.load(Ordering::SeqCst) {
2128 event_clone.0.listen().await;
2129 }
2130 Ok(())
2131 })
2132 })),
2133 }),
2134 );
2135 block_server.handle_requests(stream).await.unwrap();
2136 },
2137 async move {
2138 let (session_proxy, server) = fidl::endpoints::create_proxy();
2139
2140 proxy.open_session(server).unwrap();
2141
2142 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2143 let vmo_id = session_proxy
2144 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2145 .await
2146 .unwrap()
2147 .unwrap();
2148
2149 let mut fifo =
2150 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2151 let (mut reader, mut writer) = fifo.async_io();
2152
2153 for i in 0..MAX_REQUESTS {
2154 writer
2155 .write_entries(&BlockFifoRequest {
2156 command: BlockFifoCommand {
2157 opcode: BlockOpcode::Read.into_primitive(),
2158 ..Default::default()
2159 },
2160 reqid: (i + 1) as u32,
2161 dev_offset: i,
2162 vmoid: vmo_id.id,
2163 length: 1,
2164 ..Default::default()
2165 })
2166 .await
2167 .unwrap();
2168 }
2169 assert!(futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2170 command: BlockFifoCommand {
2171 opcode: BlockOpcode::Read.into_primitive(),
2172 ..Default::default()
2173 },
2174 reqid: u32::MAX,
2175 dev_offset: MAX_REQUESTS,
2176 vmoid: vmo_id.id,
2177 length: 1,
2178 ..Default::default()
2179 })))
2180 .is_pending());
2181 event.1.store(true, Ordering::SeqCst);
2183 event.0.notify(usize::MAX);
2184 let mut finished_reqids = vec![];
2186 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2187 let mut response = BlockFifoResponse::default();
2188 reader.read_entries(&mut response).await.unwrap();
2189 assert_eq!(response.status, zx::sys::ZX_OK);
2190 finished_reqids.push(response.reqid);
2191 writer
2192 .write_entries(&BlockFifoRequest {
2193 command: BlockFifoCommand {
2194 opcode: BlockOpcode::Read.into_primitive(),
2195 ..Default::default()
2196 },
2197 reqid: (i + 1) as u32,
2198 dev_offset: i,
2199 vmoid: vmo_id.id,
2200 length: 1,
2201 ..Default::default()
2202 })
2203 .await
2204 .unwrap();
2205 }
2206 let mut response = BlockFifoResponse::default();
2207 for _ in 0..MAX_REQUESTS {
2208 reader.read_entries(&mut response).await.unwrap();
2209 assert_eq!(response.status, zx::sys::ZX_OK);
2210 finished_reqids.push(response.reqid);
2211 }
2212 finished_reqids.sort();
2215 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2216 let mut i = 1;
2217 for reqid in finished_reqids {
2218 assert_eq!(reqid, i);
2219 i += 1;
2220 }
2221 }
2222 );
2223 }
2224
2225 #[fuchsia::test]
2226 async fn test_passthrough_io_with_fixed_map() {
2227 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2228
2229 let expected_op = Arc::new(Mutex::new(None));
2230 let expected_op_clone = expected_op.clone();
2231 futures::join!(
2232 async {
2233 let block_server = BlockServer::new(
2234 BLOCK_SIZE,
2235 Arc::new(IoMockInterface {
2236 return_errors: false,
2237 do_checks: true,
2238 expected_op: expected_op_clone,
2239 }),
2240 );
2241 block_server.handle_requests(stream).await.unwrap();
2242 },
2243 async move {
2244 let (session_proxy, server) = fidl::endpoints::create_proxy();
2245
2246 let mappings = [fblock::BlockOffsetMapping {
2247 source_block_offset: 0,
2248 target_block_offset: 10,
2249 length: 20,
2250 }];
2251 proxy.open_session_with_offset_map(server, None, Some(&mappings[..])).unwrap();
2252
2253 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2254 let vmo_id = session_proxy
2255 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2256 .await
2257 .unwrap()
2258 .unwrap();
2259
2260 let mut fifo =
2261 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2262 let (mut reader, mut writer) = fifo.async_io();
2263
2264 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2266 writer
2267 .write_entries(&BlockFifoRequest {
2268 command: BlockFifoCommand {
2269 opcode: BlockOpcode::Read.into_primitive(),
2270 ..Default::default()
2271 },
2272 vmoid: vmo_id.id,
2273 dev_offset: 1,
2274 length: 2,
2275 vmo_offset: 3,
2276 ..Default::default()
2277 })
2278 .await
2279 .unwrap();
2280
2281 let mut response = BlockFifoResponse::default();
2282 reader.read_entries(&mut response).await.unwrap();
2283 assert_eq!(response.status, zx::sys::ZX_OK);
2284
2285 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2287 writer
2288 .write_entries(&BlockFifoRequest {
2289 command: BlockFifoCommand {
2290 opcode: BlockOpcode::Write.into_primitive(),
2291 ..Default::default()
2292 },
2293 vmoid: vmo_id.id,
2294 dev_offset: 4,
2295 length: 5,
2296 vmo_offset: 6,
2297 ..Default::default()
2298 })
2299 .await
2300 .unwrap();
2301
2302 reader.read_entries(&mut response).await.unwrap();
2303 assert_eq!(response.status, zx::sys::ZX_OK);
2304
2305 *expected_op.lock() = Some(ExpectedOp::Flush);
2307 writer
2308 .write_entries(&BlockFifoRequest {
2309 command: BlockFifoCommand {
2310 opcode: BlockOpcode::Flush.into_primitive(),
2311 ..Default::default()
2312 },
2313 ..Default::default()
2314 })
2315 .await
2316 .unwrap();
2317
2318 reader.read_entries(&mut response).await.unwrap();
2319 assert_eq!(response.status, zx::sys::ZX_OK);
2320
2321 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2323 writer
2324 .write_entries(&BlockFifoRequest {
2325 command: BlockFifoCommand {
2326 opcode: BlockOpcode::Trim.into_primitive(),
2327 ..Default::default()
2328 },
2329 dev_offset: 7,
2330 length: 3,
2331 ..Default::default()
2332 })
2333 .await
2334 .unwrap();
2335
2336 reader.read_entries(&mut response).await.unwrap();
2337 assert_eq!(response.status, zx::sys::ZX_OK);
2338
2339 *expected_op.lock() = None;
2341 writer
2342 .write_entries(&BlockFifoRequest {
2343 command: BlockFifoCommand {
2344 opcode: BlockOpcode::Read.into_primitive(),
2345 ..Default::default()
2346 },
2347 vmoid: vmo_id.id,
2348 dev_offset: 19,
2349 length: 2,
2350 vmo_offset: 3,
2351 ..Default::default()
2352 })
2353 .await
2354 .unwrap();
2355
2356 reader.read_entries(&mut response).await.unwrap();
2357 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2358
2359 std::mem::drop(proxy);
2360 }
2361 );
2362 }
2363}