1use anyhow::{anyhow, Error};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
7use futures::{Future, FutureExt as _, TryStreamExt as _};
8use std::borrow::Cow;
9use std::collections::btree_map::Entry;
10use std::collections::BTreeMap;
11use std::num::NonZero;
12use std::ops::Range;
13use std::sync::atomic::AtomicU64;
14use std::sync::{Arc, Mutex};
15use zx::HandleBased;
16use {
17 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
18 fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
19};
20
21pub mod async_interface;
22pub mod c_interface;
23
24#[derive(Clone)]
25pub enum DeviceInfo {
26 Block(BlockInfo),
27 Partition(PartitionInfo),
28}
29
30#[derive(Clone)]
32pub struct BlockInfo {
33 pub device_flags: fblock::Flag,
34 pub block_count: u64,
35}
36
37#[derive(Clone)]
39pub struct PartitionInfo {
40 pub device_flags: fblock::Flag,
42 pub block_range: Option<Range<u64>>,
46 pub type_guid: [u8; 16],
47 pub instance_guid: [u8; 16],
48 pub name: String,
49 pub flags: u64,
50}
51
52struct FifoMessageGroup {
65 status: zx::Status,
66 count: u32,
67 req_id: Option<u32>,
68}
69
70impl FifoMessageGroup {
71 fn new() -> Self {
72 FifoMessageGroup { status: zx::Status::OK, count: 0, req_id: None }
73 }
74}
75
76#[derive(Default)]
77struct FifoMessageGroups(Mutex<BTreeMap<u16, FifoMessageGroup>>);
78
79impl FifoMessageGroups {
81 fn complete(&self, group_id: u16, status: zx::Status) -> Option<BlockFifoResponse> {
84 let mut map = self.0.lock().unwrap();
85 let Entry::Occupied(mut o) = map.entry(group_id) else { unreachable!() };
86 let group = o.get_mut();
87 if group.count == 1 {
88 if let Some(reqid) = group.req_id {
89 let status =
90 if group.status != zx::Status::OK { group.status } else { status }.into_raw();
91
92 o.remove();
93
94 return Some(BlockFifoResponse {
95 status,
96 reqid,
97 group: group_id,
98 ..Default::default()
99 });
100 }
101 }
102
103 group.count = group.count.checked_sub(1).unwrap();
104 if status != zx::Status::OK && group.status == zx::Status::OK {
105 group.status = status
106 }
107 None
108 }
109}
110
111pub struct BlockServer<SM> {
114 block_size: u32,
115 session_manager: Arc<SM>,
116}
117
118pub struct BlockOffsetMapping {
120 source_block_offset: u64,
121 target_block_offset: u64,
122 length: u64,
123}
124
125impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
126 type Error = zx::Status;
127
128 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
129 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
130 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
131 Ok(Self {
132 source_block_offset: wire.source_block_offset,
133 target_block_offset: wire.target_block_offset,
134 length: wire.length,
135 })
136 }
137}
138
139pub struct OffsetMap {
145 mapping: BlockOffsetMapping,
146}
147
148impl OffsetMap {
149 pub fn new(mapping: BlockOffsetMapping) -> Self {
150 Self { mapping }
151 }
152
153 pub fn adjust_request(&self, dev_offset: u64, length: u64) -> Option<u64> {
156 if length == 0 {
157 return None;
158 }
159 let end = dev_offset.checked_add(length)?;
160 if self.mapping.source_block_offset > dev_offset
161 || end > self.mapping.source_block_offset + self.mapping.length
162 {
163 return None;
164 }
165 let delta = dev_offset - self.mapping.source_block_offset;
166 Some(self.mapping.target_block_offset + delta)
167 }
168}
169
170pub trait SessionManager: 'static {
173 fn on_attach_vmo(
174 self: Arc<Self>,
175 vmo: &Arc<zx::Vmo>,
176 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
177
178 fn open_session(
183 self: Arc<Self>,
184 stream: fblock::SessionRequestStream,
185 offset_map: Option<OffsetMap>,
186 block_size: u32,
187 ) -> impl Future<Output = Result<(), Error>> + Send;
188
189 fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
191
192 fn get_volume_info(
194 &self,
195 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
196 {
197 async { Err(zx::Status::NOT_SUPPORTED) }
198 }
199
200 fn query_slices(
202 &self,
203 _start_slices: &[u64],
204 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
205 async { Err(zx::Status::NOT_SUPPORTED) }
206 }
207
208 fn extend(
210 &self,
211 _start_slice: u64,
212 _slice_count: u64,
213 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
214 async { Err(zx::Status::NOT_SUPPORTED) }
215 }
216
217 fn shrink(
219 &self,
220 _start_slice: u64,
221 _slice_count: u64,
222 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
223 async { Err(zx::Status::NOT_SUPPORTED) }
224 }
225}
226
227pub trait IntoSessionManager {
228 type SM: SessionManager;
229
230 fn into_session_manager(self) -> Arc<Self::SM>;
231}
232
233impl<SM: SessionManager> BlockServer<SM> {
234 pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
235 Self { block_size, session_manager: session_manager.into_session_manager() }
236 }
237
238 pub async fn handle_requests(
240 &self,
241 mut requests: fvolume::VolumeRequestStream,
242 ) -> Result<(), Error> {
243 let scope = fasync::Scope::new();
244 while let Some(request) = requests.try_next().await.unwrap() {
245 if let Some(session) = self.handle_request(request).await? {
246 scope.spawn(session.map(|_| ()));
247 }
248 }
249 scope.await;
250 Ok(())
251 }
252
253 async fn handle_request(
256 &self,
257 request: fvolume::VolumeRequest,
258 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
259 match request {
260 fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
261 Ok(info) => {
262 let (block_count, flags) = match info.as_ref() {
263 DeviceInfo::Block(BlockInfo { block_count, device_flags }) => {
264 (*block_count, *device_flags)
265 }
266 DeviceInfo::Partition(partition_info) => {
267 let block_count = if let Some(range) =
268 partition_info.block_range.as_ref()
269 {
270 range.end - range.start
271 } else {
272 let volume_info = self.session_manager.get_volume_info().await?;
273 volume_info.0.slice_size * volume_info.1.partition_slice_count
274 / self.block_size as u64
275 };
276 (block_count, partition_info.device_flags)
277 }
278 };
279 responder.send(Ok(&fblock::BlockInfo {
280 block_count,
281 block_size: self.block_size,
282 max_transfer_size: fblock::MAX_TRANSFER_UNBOUNDED,
283 flags,
284 }))?;
285 }
286 Err(status) => responder.send(Err(status.into_raw()))?,
287 },
288 fvolume::VolumeRequest::GetStats { clear: _, responder } => {
289 responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw()))?;
291 }
292 fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
293 return Ok(Some(self.session_manager.clone().open_session(
294 session.into_stream(),
295 None,
296 self.block_size,
297 )));
298 }
299 fvolume::VolumeRequest::OpenSessionWithOffsetMap {
300 session,
301 offset_map,
302 initial_mappings,
303 control_handle: _,
304 } => {
305 if offset_map.is_some() || initial_mappings.as_ref().is_none_or(|m| m.len() != 1) {
306 session.close_with_epitaph(zx::Status::NOT_SUPPORTED)?;
310 return Ok(None);
311 }
312 let initial_mapping = match initial_mappings.unwrap().pop().unwrap().try_into() {
313 Ok(m) => m,
314 Err(status) => {
315 session.close_with_epitaph(status)?;
316 return Ok(None);
317 }
318 };
319 let offset_map = OffsetMap::new(initial_mapping);
320 return Ok(Some(self.session_manager.clone().open_session(
321 session.into_stream(),
322 Some(offset_map),
323 self.block_size,
324 )));
325 }
326 fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
327 Ok(info) => {
328 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
329 let mut guid =
330 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
331 guid.value.copy_from_slice(&partition_info.type_guid);
332 responder.send(zx::sys::ZX_OK, Some(&guid))?;
333 } else {
334 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
335 }
336 }
337 Err(status) => {
338 responder.send(status.into_raw(), None)?;
339 }
340 },
341 fvolume::VolumeRequest::GetInstanceGuid { responder } => {
342 match self.device_info().await {
343 Ok(info) => {
344 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
345 let mut guid =
346 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
347 guid.value.copy_from_slice(&partition_info.instance_guid);
348 responder.send(zx::sys::ZX_OK, Some(&guid))?;
349 } else {
350 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
351 }
352 }
353 Err(status) => {
354 responder.send(status.into_raw(), None)?;
355 }
356 }
357 }
358 fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
359 Ok(info) => {
360 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
361 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
362 } else {
363 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
364 }
365 }
366 Err(status) => {
367 responder.send(status.into_raw(), None)?;
368 }
369 },
370 fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
371 Ok(info) => {
372 if let DeviceInfo::Partition(info) = info.as_ref() {
373 let mut type_guid =
374 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
375 type_guid.value.copy_from_slice(&info.type_guid);
376 let mut instance_guid =
377 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
378 instance_guid.value.copy_from_slice(&info.instance_guid);
379 responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
380 name: Some(info.name.clone()),
381 type_guid: Some(type_guid),
382 instance_guid: Some(instance_guid),
383 start_block_offset: info.block_range.as_ref().map(|range| range.start),
384 num_blocks: info
385 .block_range
386 .as_ref()
387 .map(|range| range.end - range.start),
388 flags: Some(info.flags),
389 ..Default::default()
390 }))?;
391 }
392 }
393 Err(status) => responder.send(Err(status.into_raw()))?,
394 },
395 fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
396 match self.session_manager.query_slices(&start_slices).await {
397 Ok(mut results) => {
398 let results_len = results.len();
399 assert!(results_len <= 16);
400 results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
401 responder.send(
402 zx::sys::ZX_OK,
403 &results.try_into().unwrap(),
404 results_len as u64,
405 )?;
406 }
407 Err(s) => {
408 responder.send(
409 s.into_raw(),
410 &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
411 0,
412 )?;
413 }
414 }
415 }
416 fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
417 match self.session_manager.get_volume_info().await {
418 Ok((manager_info, volume_info)) => {
419 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
420 }
421 Err(s) => responder.send(s.into_raw(), None, None)?,
422 }
423 }
424 fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
425 responder.send(
426 zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
427 .into_raw(),
428 )?;
429 }
430 fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
431 responder.send(
432 zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
433 .into_raw(),
434 )?;
435 }
436 fvolume::VolumeRequest::Destroy { responder, .. } => {
437 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
438 }
439 }
440 Ok(None)
441 }
442
443 async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
444 self.session_manager.get_info().await
445 }
446}
447
448struct SessionHelper<SM: SessionManager> {
449 session_manager: Arc<SM>,
450 offset_map: Option<OffsetMap>,
451 block_size: u32,
452 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
453 vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
454 message_groups: FifoMessageGroups,
455}
456
457impl<SM: SessionManager> SessionHelper<SM> {
458 fn new(
459 session_manager: Arc<SM>,
460 offset_map: Option<OffsetMap>,
461 block_size: u32,
462 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
463 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
464 Ok((
465 Self {
466 session_manager,
467 offset_map,
468 block_size,
469 peer_fifo,
470 vmos: Mutex::default(),
471 message_groups: FifoMessageGroups::default(),
472 },
473 fifo,
474 ))
475 }
476
477 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
478 match request {
479 fblock::SessionRequest::GetFifo { responder } => {
480 let rights = zx::Rights::TRANSFER
481 | zx::Rights::READ
482 | zx::Rights::WRITE
483 | zx::Rights::SIGNAL
484 | zx::Rights::WAIT;
485 match self.peer_fifo.duplicate_handle(rights) {
486 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
487 Err(s) => responder.send(Err(s.into_raw()))?,
488 }
489 Ok(())
490 }
491 fblock::SessionRequest::AttachVmo { vmo, responder } => {
492 let vmo = Arc::new(vmo);
493 let vmo_id = {
494 let mut vmos = self.vmos.lock().unwrap();
495 if vmos.len() == u16::MAX as usize {
496 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
497 return Ok(());
498 } else {
499 let vmo_id = match vmos.last_entry() {
500 None => 1,
501 Some(o) => {
502 o.key().checked_add(1).unwrap_or_else(|| {
503 let mut vmo_id = 1;
504 for (&id, _) in &*vmos {
506 if id > vmo_id {
507 break;
508 }
509 vmo_id = id + 1;
510 }
511 vmo_id
512 })
513 }
514 };
515 vmos.insert(vmo_id, vmo.clone());
516 vmo_id
517 }
518 };
519 self.session_manager.clone().on_attach_vmo(&vmo).await?;
520 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
521 Ok(())
522 }
523 fblock::SessionRequest::Close { responder } => {
524 responder.send(Ok(()))?;
525 Err(anyhow!("Closed"))
526 }
527 }
528 }
529
530 fn finish_fifo_request(
531 &self,
532 request: RequestTracking,
533 status: zx::Status,
534 ) -> Option<BlockFifoResponse> {
535 match request.group_or_request {
536 GroupOrRequest::Group(group_id) => {
537 let response = self.message_groups.complete(group_id, status);
538 fuchsia_trace::duration!(
539 c"storage",
540 c"block_server::finish_transaction_in_group",
541 "group" => u32::from(group_id),
542 "group_completed" => response.is_some(),
543 "status" => status.into_raw());
544 if let Some(trace_flow_id) = request.trace_flow_id {
545 fuchsia_trace::flow_step!(
546 c"storage",
547 c"block_server::finish_transaction",
548 trace_flow_id.get().into()
549 );
550 }
551 response
552 }
553 GroupOrRequest::Request(reqid) => {
554 fuchsia_trace::duration!(
555 c"storage", c"block_server::finish_transaction", "status" => status.into_raw());
556 if let Some(trace_flow_id) = request.trace_flow_id {
557 fuchsia_trace::flow_step!(
558 c"storage",
559 c"block_server::finish_transaction",
560 trace_flow_id.get().into()
561 );
562 }
563 Some(BlockFifoResponse { status: status.into_raw(), reqid, ..Default::default() })
564 }
565 }
566 }
567
568 fn decode_fifo_request(&self, request: &BlockFifoRequest) -> Option<DecodedRequest> {
569 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
570 let is_group = flags.contains(BlockIoFlag::GROUP_ITEM);
571 let last_in_group = flags.contains(BlockIoFlag::GROUP_LAST);
572 let mut op_code =
573 BlockOpcode::from_primitive(request.command.opcode).ok_or(zx::Status::INVALID_ARGS);
574
575 let group_or_request = if is_group {
576 let mut groups = self.message_groups.0.lock().unwrap();
577 let group = groups.entry(request.group).or_insert_with(|| FifoMessageGroup::new());
578 if group.req_id.is_some() {
579 if group.status == zx::Status::OK {
581 group.status = zx::Status::INVALID_ARGS;
582 }
583 return None;
584 }
585 if last_in_group {
586 group.req_id = Some(request.reqid);
587 if group.status != zx::Status::OK {
589 op_code = Err(group.status);
590 }
591 } else if group.status != zx::Status::OK {
592 return None;
595 }
596 group.count += 1;
597 GroupOrRequest::Group(request.group)
598 } else {
599 GroupOrRequest::Request(request.reqid)
600 };
601
602 let mut vmo_offset = 0;
603 let vmo = match op_code {
604 Ok(BlockOpcode::Read) | Ok(BlockOpcode::Write) => (|| {
605 if request.length == 0 {
606 return Err(zx::Status::INVALID_ARGS);
607 }
608 vmo_offset = request
609 .vmo_offset
610 .checked_mul(self.block_size as u64)
611 .ok_or(zx::Status::OUT_OF_RANGE)?;
612 self.vmos
613 .lock()
614 .unwrap()
615 .get(&request.vmoid)
616 .cloned()
617 .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo)))
618 })(),
619 Ok(BlockOpcode::CloseVmo) => self
620 .vmos
621 .lock()
622 .unwrap()
623 .remove(&request.vmoid)
624 .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo))),
625 _ => Ok(None),
626 }
627 .unwrap_or_else(|e| {
628 op_code = Err(e);
629 None
630 });
631
632 let operation = op_code.map(|code| match code {
633 BlockOpcode::Read => Operation::Read {
634 device_block_offset: request.dev_offset,
635 block_count: request.length,
636 _unused: 0,
637 vmo_offset,
638 },
639 BlockOpcode::Write => Operation::Write {
640 device_block_offset: request.dev_offset,
641 block_count: request.length,
642 options: if flags.contains(BlockIoFlag::FORCE_ACCESS) {
643 WriteOptions::FORCE_ACCESS
644 } else {
645 WriteOptions::empty()
646 },
647 vmo_offset,
648 },
649 BlockOpcode::Flush => Operation::Flush,
650 BlockOpcode::Trim => Operation::Trim {
651 device_block_offset: request.dev_offset,
652 block_count: request.length,
653 },
654 BlockOpcode::CloseVmo => Operation::CloseVmo,
655 });
656 if let Ok(operation) = operation.as_ref() {
657 use fuchsia_trace::ArgValue;
658 static CACHE: AtomicU64 = AtomicU64::new(0);
659 if let Some(context) =
660 fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
661 {
662 let trace_args_with_group = [
663 ArgValue::of("group", u32::from(group_or_request.group_id())),
664 ArgValue::of("opcode", operation.trace_label()),
665 ];
666 let trace_args = [ArgValue::of("opcode", operation.trace_label())];
667 let _scope = if group_or_request.is_group() {
668 fuchsia_trace::duration(
669 c"storage",
670 c"block_server::start_transaction",
671 &trace_args_with_group,
672 )
673 } else {
674 fuchsia_trace::duration(
675 c"storage",
676 c"block_server::start_transaction",
677 &trace_args,
678 )
679 };
680 let trace_flow_id = NonZero::new(request.trace_flow_id);
681 if let Some(trace_flow_id) = trace_flow_id.clone() {
682 fuchsia_trace::flow_step(
683 &context,
684 c"block_server::start_trnsaction",
685 trace_flow_id.get().into(),
686 &[],
687 );
688 }
689 }
690 }
691 Some(DecodedRequest::new(
692 RequestTracking {
693 group_or_request,
694 trace_flow_id: NonZero::new(request.trace_flow_id),
695 },
696 operation,
697 vmo,
698 self.offset_map.as_ref(),
699 ))
700 }
701
702 fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
703 std::mem::take(&mut *self.vmos.lock().unwrap())
704 }
705}
706
707#[derive(Debug)]
708struct DecodedRequest {
709 request_tracking: RequestTracking,
710 operation: Result<Operation, zx::Status>,
711 vmo: Option<Arc<zx::Vmo>>,
712}
713
714impl DecodedRequest {
715 fn new(
716 request_tracking: RequestTracking,
717 operation: Result<Operation, zx::Status>,
718 vmo: Option<Arc<zx::Vmo>>,
719 offset_map: Option<&OffsetMap>,
720 ) -> Self {
721 let operation =
722 operation.and_then(|operation| operation.validate_and_adjust_range(offset_map));
723 Self { request_tracking, operation, vmo }
724 }
725}
726
727pub type WriteOptions = block_protocol::WriteOptions;
729
730#[repr(C)]
731#[derive(Debug)]
732pub enum Operation {
733 Read {
738 device_block_offset: u64,
739 block_count: u32,
740 _unused: u32,
741 vmo_offset: u64,
742 },
743 Write {
744 device_block_offset: u64,
745 block_count: u32,
746 options: WriteOptions,
747 vmo_offset: u64,
748 },
749 Flush,
750 Trim {
751 device_block_offset: u64,
752 block_count: u32,
753 },
754 CloseVmo,
756}
757
758impl Operation {
759 fn validate_and_adjust_range(
762 mut self,
763 offset_map: Option<&OffsetMap>,
764 ) -> Result<Operation, zx::Status> {
765 let adjust_offset = |dev_offset, length| {
766 if length == 0 {
768 return Err(zx::Status::INVALID_ARGS);
769 }
770 if let Some(offset_map) = offset_map {
771 offset_map.adjust_request(dev_offset, length as u64).ok_or(zx::Status::OUT_OF_RANGE)
772 } else {
773 Ok(dev_offset)
774 }
775 };
776 match &mut self {
777 Operation::Read { device_block_offset, block_count, .. } => {
778 *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
779 }
780 Operation::Write { device_block_offset, block_count, .. } => {
781 *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
782 }
783 Operation::Trim { device_block_offset, block_count, .. } => {
784 *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
785 }
786 _ => {}
787 }
788 Ok(self)
789 }
790 fn trace_label(&self) -> &'static str {
791 match self {
792 Operation::Read { .. } => "read",
793 Operation::Write { .. } => "write",
794 Operation::Flush { .. } => "flush",
795 Operation::Trim { .. } => "trim",
796 Operation::CloseVmo { .. } => "close_vmo",
797 }
798 }
799}
800
801#[derive(Clone, Copy, Debug)]
802pub struct RequestTracking {
803 group_or_request: GroupOrRequest,
804 trace_flow_id: Option<NonZero<u64>>,
805}
806
807#[derive(Clone, Copy, Debug)]
808pub enum GroupOrRequest {
809 Group(u16),
810 Request(u32),
811}
812
813impl GroupOrRequest {
814 fn is_group(&self) -> bool {
815 if let Self::Group(_) = self {
816 true
817 } else {
818 false
819 }
820 }
821
822 fn group_id(&self) -> u16 {
823 match self {
824 Self::Group(id) => *id,
825 Self::Request(_) => 0,
826 }
827 }
828}
829
830const IS_GROUP: u64 = 0x8000_0000_0000_0000;
832const USED_VMO: u64 = 0x4000_0000_0000_0000;
834#[repr(transparent)]
835#[derive(Clone, Copy, Eq, PartialEq, Hash)]
836pub struct RequestId(u64);
837
838impl RequestId {
839 fn with_vmo(self) -> Self {
842 Self(self.0 | USED_VMO)
843 }
844
845 fn did_have_vmo(&self) -> bool {
847 self.0 & USED_VMO != 0
848 }
849}
850
851impl From<GroupOrRequest> for RequestId {
852 fn from(value: GroupOrRequest) -> Self {
853 match value {
854 GroupOrRequest::Group(group) => RequestId(group as u64 | IS_GROUP),
855 GroupOrRequest::Request(request) => RequestId(request as u64),
856 }
857 }
858}
859
860impl From<RequestId> for GroupOrRequest {
861 fn from(value: RequestId) -> Self {
862 if value.0 & IS_GROUP == 0 {
863 GroupOrRequest::Request(value.0 as u32)
864 } else {
865 GroupOrRequest::Group(value.0 as u16)
866 }
867 }
868}
869
870#[cfg(test)]
871mod tests {
872 use super::{BlockServer, DeviceInfo, PartitionInfo};
873 use crate::async_interface::FIFO_MAX_REQUESTS;
874 use assert_matches::assert_matches;
875 use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteOptions};
876 use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
877 use fuchsia_async::{FifoReadable as _, FifoWritable as _};
878 use futures::channel::oneshot;
879 use futures::future::BoxFuture;
880 use futures::FutureExt as _;
881 use std::borrow::Cow;
882 use std::future::poll_fn;
883 use std::num::NonZero;
884 use std::pin::pin;
885 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
886 use std::sync::{Arc, Mutex};
887 use std::task::{Context, Poll};
888 use zx::{AsHandleRef as _, HandleBased as _};
889 use {
890 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
891 fuchsia_async as fasync,
892 };
893
894 #[derive(Default)]
895 struct MockInterface {
896 read_hook: Option<
897 Box<
898 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
899 + Send
900 + Sync,
901 >,
902 >,
903 }
904
905 impl super::async_interface::Interface for MockInterface {
906 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
907 Ok(())
908 }
909
910 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
911 Ok(Cow::Owned(test_device_info()))
912 }
913
914 async fn read(
915 &self,
916 device_block_offset: u64,
917 block_count: u32,
918 vmo: &Arc<zx::Vmo>,
919 vmo_offset: u64,
920 _trace_flow_id: Option<NonZero<u64>>,
921 ) -> Result<(), zx::Status> {
922 if let Some(read_hook) = &self.read_hook {
923 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
924 } else {
925 unimplemented!();
926 }
927 }
928
929 async fn write(
930 &self,
931 _device_block_offset: u64,
932 _block_count: u32,
933 _vmo: &Arc<zx::Vmo>,
934 _vmo_offset: u64,
935 _opts: WriteOptions,
936 _trace_flow_id: Option<NonZero<u64>>,
937 ) -> Result<(), zx::Status> {
938 unreachable!();
939 }
940
941 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
942 unreachable!();
943 }
944
945 async fn trim(
946 &self,
947 _device_block_offset: u64,
948 _block_count: u32,
949 _trace_flow_id: Option<NonZero<u64>>,
950 ) -> Result<(), zx::Status> {
951 unreachable!();
952 }
953
954 async fn get_volume_info(
955 &self,
956 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
957 let () = std::future::pending().await;
959 unreachable!();
960 }
961 }
962
963 const BLOCK_SIZE: u32 = 512;
964
965 fn test_device_info() -> DeviceInfo {
966 DeviceInfo::Partition(PartitionInfo {
967 device_flags: fblock::Flag::READONLY,
968 block_range: Some(12..34),
969 type_guid: [1; 16],
970 instance_guid: [2; 16],
971 name: "foo".to_string(),
972 flags: 0xabcd,
973 })
974 }
975
976 #[fuchsia::test]
977 async fn test_info() {
978 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
979
980 futures::join!(
981 async {
982 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
983 block_server.handle_requests(stream).await.unwrap();
984 },
985 async {
986 let expected_info = test_device_info();
987 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
988 info
989 } else {
990 unreachable!()
991 };
992
993 let block_info = proxy.get_info().await.unwrap().unwrap();
994 assert_eq!(
995 block_info.block_count,
996 partition_info.block_range.as_ref().unwrap().end
997 - partition_info.block_range.as_ref().unwrap().start
998 );
999 assert_eq!(block_info.flags, fblock::Flag::READONLY);
1000
1001 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1004 assert_eq!(status, zx::sys::ZX_OK);
1005 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1006
1007 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1008 assert_eq!(status, zx::sys::ZX_OK);
1009 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1010
1011 let (status, name) = proxy.get_name().await.unwrap();
1012 assert_eq!(status, zx::sys::ZX_OK);
1013 assert_eq!(name.as_ref(), Some(&partition_info.name));
1014
1015 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1016 assert_eq!(metadata.name, name);
1017 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1018 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1019 assert_eq!(
1020 metadata.start_block_offset,
1021 Some(partition_info.block_range.as_ref().unwrap().start)
1022 );
1023 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1024 assert_eq!(metadata.flags, Some(partition_info.flags));
1025
1026 std::mem::drop(proxy);
1027 }
1028 );
1029 }
1030
1031 #[fuchsia::test]
1032 async fn test_attach_vmo() {
1033 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1034
1035 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1036 let koid = vmo.get_koid().unwrap();
1037
1038 futures::join!(
1039 async {
1040 let block_server = BlockServer::new(
1041 BLOCK_SIZE,
1042 Arc::new(MockInterface {
1043 read_hook: Some(Box::new(move |_, _, vmo, _| {
1044 assert_eq!(vmo.get_koid().unwrap(), koid);
1045 Box::pin(async { Ok(()) })
1046 })),
1047 ..MockInterface::default()
1048 }),
1049 );
1050 block_server.handle_requests(stream).await.unwrap();
1051 },
1052 async move {
1053 let (session_proxy, server) = fidl::endpoints::create_proxy();
1054
1055 proxy.open_session(server).unwrap();
1056
1057 let vmo_id = session_proxy
1058 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1059 .await
1060 .unwrap()
1061 .unwrap();
1062 assert_ne!(vmo_id.id, 0);
1063
1064 let fifo =
1065 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1066
1067 let mut count = 1;
1069 loop {
1070 match session_proxy
1071 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1072 .await
1073 .unwrap()
1074 {
1075 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1076 Err(e) => {
1077 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1078 break;
1079 }
1080 }
1081
1082 if count % 10 == 0 {
1084 fifo.write_entries(&BlockFifoRequest {
1085 command: BlockFifoCommand {
1086 opcode: BlockOpcode::Read.into_primitive(),
1087 ..Default::default()
1088 },
1089 vmoid: vmo_id.id,
1090 length: 1,
1091 ..Default::default()
1092 })
1093 .await
1094 .unwrap();
1095
1096 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1097 assert_eq!(response.status, zx::sys::ZX_OK);
1098 }
1099
1100 count += 1;
1101 }
1102
1103 assert_eq!(count, u16::MAX as u64);
1104
1105 fifo.write_entries(&BlockFifoRequest {
1107 command: BlockFifoCommand {
1108 opcode: BlockOpcode::CloseVmo.into_primitive(),
1109 ..Default::default()
1110 },
1111 vmoid: vmo_id.id,
1112 ..Default::default()
1113 })
1114 .await
1115 .unwrap();
1116
1117 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1118 assert_eq!(response.status, zx::sys::ZX_OK);
1119
1120 let new_vmo_id = session_proxy
1121 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1122 .await
1123 .unwrap()
1124 .unwrap();
1125 assert_eq!(new_vmo_id.id, vmo_id.id);
1127
1128 std::mem::drop(proxy);
1129 }
1130 );
1131 }
1132
1133 #[fuchsia::test]
1134 async fn test_close() {
1135 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1136
1137 let mut server = std::pin::pin!(async {
1138 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1139 block_server.handle_requests(stream).await.unwrap();
1140 }
1141 .fuse());
1142
1143 let mut client = std::pin::pin!(async {
1144 let (session_proxy, server) = fidl::endpoints::create_proxy();
1145
1146 proxy.open_session(server).unwrap();
1147
1148 std::mem::drop(proxy);
1151
1152 session_proxy.close().await.unwrap().unwrap();
1153
1154 let _: () = std::future::pending().await;
1156 }
1157 .fuse());
1158
1159 futures::select!(
1160 _ = server => {}
1161 _ = client => unreachable!(),
1162 );
1163 }
1164
1165 #[derive(Default)]
1166 struct IoMockInterface {
1167 do_checks: bool,
1168 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1169 return_errors: bool,
1170 }
1171
1172 #[derive(Debug)]
1173 enum ExpectedOp {
1174 Read(u64, u32, u64),
1175 Write(u64, u32, u64),
1176 Trim(u64, u32),
1177 Flush,
1178 }
1179
1180 impl super::async_interface::Interface for IoMockInterface {
1181 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1182 Ok(())
1183 }
1184
1185 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1186 Ok(Cow::Owned(test_device_info()))
1187 }
1188
1189 async fn read(
1190 &self,
1191 device_block_offset: u64,
1192 block_count: u32,
1193 _vmo: &Arc<zx::Vmo>,
1194 vmo_offset: u64,
1195 _trace_flow_id: Option<NonZero<u64>>,
1196 ) -> Result<(), zx::Status> {
1197 if self.return_errors {
1198 Err(zx::Status::INTERNAL)
1199 } else {
1200 if self.do_checks {
1201 assert_matches!(
1202 self.expected_op.lock().unwrap().take(),
1203 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1204 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1205 "Read {device_block_offset} {block_count} {vmo_offset}"
1206 );
1207 }
1208 Ok(())
1209 }
1210 }
1211
1212 async fn write(
1213 &self,
1214 device_block_offset: u64,
1215 block_count: u32,
1216 _vmo: &Arc<zx::Vmo>,
1217 vmo_offset: u64,
1218 _opts: WriteOptions,
1219 _trace_flow_id: Option<NonZero<u64>>,
1220 ) -> Result<(), zx::Status> {
1221 if self.return_errors {
1222 Err(zx::Status::NOT_SUPPORTED)
1223 } else {
1224 if self.do_checks {
1225 assert_matches!(
1226 self.expected_op.lock().unwrap().take(),
1227 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1228 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1229 "Write {device_block_offset} {block_count} {vmo_offset}"
1230 );
1231 }
1232 Ok(())
1233 }
1234 }
1235
1236 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1237 if self.return_errors {
1238 Err(zx::Status::NO_RESOURCES)
1239 } else {
1240 if self.do_checks {
1241 assert_matches!(
1242 self.expected_op.lock().unwrap().take(),
1243 Some(ExpectedOp::Flush)
1244 );
1245 }
1246 Ok(())
1247 }
1248 }
1249
1250 async fn trim(
1251 &self,
1252 device_block_offset: u64,
1253 block_count: u32,
1254 _trace_flow_id: Option<NonZero<u64>>,
1255 ) -> Result<(), zx::Status> {
1256 if self.return_errors {
1257 Err(zx::Status::NO_MEMORY)
1258 } else {
1259 if self.do_checks {
1260 assert_matches!(
1261 self.expected_op.lock().unwrap().take(),
1262 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1263 block_count == b,
1264 "Trim {device_block_offset} {block_count}"
1265 );
1266 }
1267 Ok(())
1268 }
1269 }
1270 }
1271
1272 #[fuchsia::test]
1273 async fn test_io() {
1274 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1275
1276 let expected_op = Arc::new(Mutex::new(None));
1277 let expected_op_clone = expected_op.clone();
1278 futures::join!(
1279 async {
1280 let block_server = BlockServer::new(
1281 BLOCK_SIZE,
1282 Arc::new(IoMockInterface {
1283 return_errors: false,
1284 do_checks: true,
1285 expected_op: expected_op_clone,
1286 }),
1287 );
1288 block_server.handle_requests(stream).await.unwrap();
1289 },
1290 async move {
1291 let (session_proxy, server) = fidl::endpoints::create_proxy();
1292
1293 proxy.open_session(server).unwrap();
1294
1295 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1296 let vmo_id = session_proxy
1297 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1298 .await
1299 .unwrap()
1300 .unwrap();
1301
1302 let fifo =
1303 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1304
1305 *expected_op.lock().unwrap() = Some(ExpectedOp::Read(1, 2, 3));
1307 fifo.write_entries(&BlockFifoRequest {
1308 command: BlockFifoCommand {
1309 opcode: BlockOpcode::Read.into_primitive(),
1310 ..Default::default()
1311 },
1312 vmoid: vmo_id.id,
1313 dev_offset: 1,
1314 length: 2,
1315 vmo_offset: 3,
1316 ..Default::default()
1317 })
1318 .await
1319 .unwrap();
1320
1321 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1322 assert_eq!(response.status, zx::sys::ZX_OK);
1323
1324 *expected_op.lock().unwrap() = Some(ExpectedOp::Write(4, 5, 6));
1326 fifo.write_entries(&BlockFifoRequest {
1327 command: BlockFifoCommand {
1328 opcode: BlockOpcode::Write.into_primitive(),
1329 ..Default::default()
1330 },
1331 vmoid: vmo_id.id,
1332 dev_offset: 4,
1333 length: 5,
1334 vmo_offset: 6,
1335 ..Default::default()
1336 })
1337 .await
1338 .unwrap();
1339
1340 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1341 assert_eq!(response.status, zx::sys::ZX_OK);
1342
1343 *expected_op.lock().unwrap() = Some(ExpectedOp::Flush);
1345 fifo.write_entries(&BlockFifoRequest {
1346 command: BlockFifoCommand {
1347 opcode: BlockOpcode::Flush.into_primitive(),
1348 ..Default::default()
1349 },
1350 ..Default::default()
1351 })
1352 .await
1353 .unwrap();
1354
1355 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1356 assert_eq!(response.status, zx::sys::ZX_OK);
1357
1358 *expected_op.lock().unwrap() = Some(ExpectedOp::Trim(7, 8));
1360 fifo.write_entries(&BlockFifoRequest {
1361 command: BlockFifoCommand {
1362 opcode: BlockOpcode::Trim.into_primitive(),
1363 ..Default::default()
1364 },
1365 dev_offset: 7,
1366 length: 8,
1367 ..Default::default()
1368 })
1369 .await
1370 .unwrap();
1371
1372 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1373 assert_eq!(response.status, zx::sys::ZX_OK);
1374
1375 std::mem::drop(proxy);
1376 }
1377 );
1378 }
1379
1380 #[fuchsia::test]
1381 async fn test_io_errors() {
1382 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1383
1384 futures::join!(
1385 async {
1386 let block_server = BlockServer::new(
1387 BLOCK_SIZE,
1388 Arc::new(IoMockInterface {
1389 return_errors: true,
1390 do_checks: false,
1391 expected_op: Arc::new(Mutex::new(None)),
1392 }),
1393 );
1394 block_server.handle_requests(stream).await.unwrap();
1395 },
1396 async move {
1397 let (session_proxy, server) = fidl::endpoints::create_proxy();
1398
1399 proxy.open_session(server).unwrap();
1400
1401 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1402 let vmo_id = session_proxy
1403 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1404 .await
1405 .unwrap()
1406 .unwrap();
1407
1408 let fifo =
1409 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1410
1411 fifo.write_entries(&BlockFifoRequest {
1413 command: BlockFifoCommand {
1414 opcode: BlockOpcode::Read.into_primitive(),
1415 ..Default::default()
1416 },
1417 vmoid: vmo_id.id,
1418 length: 1,
1419 reqid: 1,
1420 ..Default::default()
1421 })
1422 .await
1423 .unwrap();
1424
1425 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1426 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1427
1428 fifo.write_entries(&BlockFifoRequest {
1430 command: BlockFifoCommand {
1431 opcode: BlockOpcode::Write.into_primitive(),
1432 ..Default::default()
1433 },
1434 vmoid: vmo_id.id,
1435 length: 1,
1436 reqid: 2,
1437 ..Default::default()
1438 })
1439 .await
1440 .unwrap();
1441
1442 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1443 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1444
1445 fifo.write_entries(&BlockFifoRequest {
1447 command: BlockFifoCommand {
1448 opcode: BlockOpcode::Flush.into_primitive(),
1449 ..Default::default()
1450 },
1451 reqid: 3,
1452 ..Default::default()
1453 })
1454 .await
1455 .unwrap();
1456
1457 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1458 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1459
1460 fifo.write_entries(&BlockFifoRequest {
1462 command: BlockFifoCommand {
1463 opcode: BlockOpcode::Trim.into_primitive(),
1464 ..Default::default()
1465 },
1466 reqid: 4,
1467 length: 1,
1468 ..Default::default()
1469 })
1470 .await
1471 .unwrap();
1472
1473 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1474 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1475
1476 std::mem::drop(proxy);
1477 }
1478 );
1479 }
1480
1481 #[fuchsia::test]
1482 async fn test_invalid_args() {
1483 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1484
1485 futures::join!(
1486 async {
1487 let block_server = BlockServer::new(
1488 BLOCK_SIZE,
1489 Arc::new(IoMockInterface {
1490 return_errors: false,
1491 do_checks: false,
1492 expected_op: Arc::new(Mutex::new(None)),
1493 }),
1494 );
1495 block_server.handle_requests(stream).await.unwrap();
1496 },
1497 async move {
1498 let (session_proxy, server) = fidl::endpoints::create_proxy();
1499
1500 proxy.open_session(server).unwrap();
1501
1502 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1503 let vmo_id = session_proxy
1504 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1505 .await
1506 .unwrap()
1507 .unwrap();
1508
1509 let fifo =
1510 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1511
1512 async fn test(
1513 fifo: &fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1514 request: BlockFifoRequest,
1515 ) -> Result<(), zx::Status> {
1516 fifo.write_entries(&request).await.unwrap();
1517 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1518 zx::Status::ok(response.status)
1519 }
1520
1521 let good_read_request = || BlockFifoRequest {
1524 command: BlockFifoCommand {
1525 opcode: BlockOpcode::Read.into_primitive(),
1526 ..Default::default()
1527 },
1528 vmoid: vmo_id.id,
1529 ..Default::default()
1530 };
1531
1532 assert_eq!(
1533 test(&fifo, BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() })
1534 .await,
1535 Err(zx::Status::INVALID_ARGS)
1536 );
1537
1538 assert_eq!(
1539 test(
1540 &fifo,
1541 BlockFifoRequest {
1542 vmo_offset: 0xffff_ffff_ffff_ffff,
1543 ..good_read_request()
1544 }
1545 )
1546 .await,
1547 Err(zx::Status::INVALID_ARGS)
1548 );
1549
1550 let good_write_request = || BlockFifoRequest {
1553 command: BlockFifoCommand {
1554 opcode: BlockOpcode::Write.into_primitive(),
1555 ..Default::default()
1556 },
1557 vmoid: vmo_id.id,
1558 ..Default::default()
1559 };
1560
1561 assert_eq!(
1562 test(&fifo, BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() })
1563 .await,
1564 Err(zx::Status::INVALID_ARGS)
1565 );
1566
1567 assert_eq!(
1568 test(
1569 &fifo,
1570 BlockFifoRequest {
1571 vmo_offset: 0xffff_ffff_ffff_ffff,
1572 ..good_write_request()
1573 }
1574 )
1575 .await,
1576 Err(zx::Status::INVALID_ARGS)
1577 );
1578
1579 assert_eq!(
1582 test(
1583 &fifo,
1584 BlockFifoRequest {
1585 command: BlockFifoCommand {
1586 opcode: BlockOpcode::CloseVmo.into_primitive(),
1587 ..Default::default()
1588 },
1589 vmoid: vmo_id.id + 1,
1590 ..Default::default()
1591 }
1592 )
1593 .await,
1594 Err(zx::Status::IO)
1595 );
1596
1597 std::mem::drop(proxy);
1598 }
1599 );
1600 }
1601
1602 #[fuchsia::test]
1603 async fn test_concurrent_requests() {
1604 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1605
1606 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
1607 let waiting_readers_clone = waiting_readers.clone();
1608
1609 futures::join!(
1610 async move {
1611 let block_server = BlockServer::new(
1612 BLOCK_SIZE,
1613 Arc::new(MockInterface {
1614 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
1615 let (tx, rx) = oneshot::channel();
1616 waiting_readers_clone
1617 .lock()
1618 .unwrap()
1619 .push((dev_block_offset as u32, tx));
1620 Box::pin(async move {
1621 let _ = rx.await;
1622 Ok(())
1623 })
1624 })),
1625 }),
1626 );
1627 block_server.handle_requests(stream).await.unwrap();
1628 },
1629 async move {
1630 let (session_proxy, server) = fidl::endpoints::create_proxy();
1631
1632 proxy.open_session(server).unwrap();
1633
1634 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1635 let vmo_id = session_proxy
1636 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1637 .await
1638 .unwrap()
1639 .unwrap();
1640
1641 let fifo =
1642 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1643
1644 fifo.write_entries(&BlockFifoRequest {
1645 command: BlockFifoCommand {
1646 opcode: BlockOpcode::Read.into_primitive(),
1647 ..Default::default()
1648 },
1649 reqid: 1,
1650 dev_offset: 1, vmoid: vmo_id.id,
1652 length: 1,
1653 ..Default::default()
1654 })
1655 .await
1656 .unwrap();
1657
1658 fifo.write_entries(&BlockFifoRequest {
1659 command: BlockFifoCommand {
1660 opcode: BlockOpcode::Read.into_primitive(),
1661 ..Default::default()
1662 },
1663 reqid: 2,
1664 dev_offset: 2,
1665 vmoid: vmo_id.id,
1666 length: 1,
1667 ..Default::default()
1668 })
1669 .await
1670 .unwrap();
1671
1672 poll_fn(|cx: &mut Context<'_>| {
1674 if waiting_readers.lock().unwrap().len() == 2 {
1675 Poll::Ready(())
1676 } else {
1677 cx.waker().wake_by_ref();
1679 Poll::Pending
1680 }
1681 })
1682 .await;
1683
1684 assert!(futures::poll!(fifo.read_entry()).is_pending());
1685
1686 let (id, tx) = waiting_readers.lock().unwrap().pop().unwrap();
1687 tx.send(()).unwrap();
1688
1689 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1690 assert_eq!(response.status, zx::sys::ZX_OK);
1691 assert_eq!(response.reqid, id);
1692
1693 assert!(futures::poll!(fifo.read_entry()).is_pending());
1694
1695 let (id, tx) = waiting_readers.lock().unwrap().pop().unwrap();
1696 tx.send(()).unwrap();
1697
1698 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1699 assert_eq!(response.status, zx::sys::ZX_OK);
1700 assert_eq!(response.reqid, id);
1701 }
1702 );
1703 }
1704
1705 #[fuchsia::test]
1706 async fn test_groups() {
1707 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1708
1709 futures::join!(
1710 async move {
1711 let block_server = BlockServer::new(
1712 BLOCK_SIZE,
1713 Arc::new(MockInterface {
1714 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
1715 }),
1716 );
1717 block_server.handle_requests(stream).await.unwrap();
1718 },
1719 async move {
1720 let (session_proxy, server) = fidl::endpoints::create_proxy();
1721
1722 proxy.open_session(server).unwrap();
1723
1724 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1725 let vmo_id = session_proxy
1726 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1727 .await
1728 .unwrap()
1729 .unwrap();
1730
1731 let fifo =
1732 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1733
1734 fifo.write_entries(&BlockFifoRequest {
1735 command: BlockFifoCommand {
1736 opcode: BlockOpcode::Read.into_primitive(),
1737 flags: BlockIoFlag::GROUP_ITEM.bits(),
1738 ..Default::default()
1739 },
1740 group: 1,
1741 vmoid: vmo_id.id,
1742 length: 1,
1743 ..Default::default()
1744 })
1745 .await
1746 .unwrap();
1747
1748 fifo.write_entries(&BlockFifoRequest {
1749 command: BlockFifoCommand {
1750 opcode: BlockOpcode::Read.into_primitive(),
1751 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1752 ..Default::default()
1753 },
1754 reqid: 2,
1755 group: 1,
1756 vmoid: vmo_id.id,
1757 length: 1,
1758 ..Default::default()
1759 })
1760 .await
1761 .unwrap();
1762
1763 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1764 assert_eq!(response.status, zx::sys::ZX_OK);
1765 assert_eq!(response.reqid, 2);
1766 assert_eq!(response.group, 1);
1767 }
1768 );
1769 }
1770
1771 #[fuchsia::test]
1772 async fn test_group_error() {
1773 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1774
1775 let counter = Arc::new(AtomicU64::new(0));
1776 let counter_clone = counter.clone();
1777
1778 futures::join!(
1779 async move {
1780 let block_server = BlockServer::new(
1781 BLOCK_SIZE,
1782 Arc::new(MockInterface {
1783 read_hook: Some(Box::new(move |_, _, _, _| {
1784 counter_clone.fetch_add(1, Ordering::Relaxed);
1785 Box::pin(async { Err(zx::Status::BAD_STATE) })
1786 })),
1787 }),
1788 );
1789 block_server.handle_requests(stream).await.unwrap();
1790 },
1791 async move {
1792 let (session_proxy, server) = fidl::endpoints::create_proxy();
1793
1794 proxy.open_session(server).unwrap();
1795
1796 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1797 let vmo_id = session_proxy
1798 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1799 .await
1800 .unwrap()
1801 .unwrap();
1802
1803 let fifo =
1804 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1805
1806 fifo.write_entries(&BlockFifoRequest {
1807 command: BlockFifoCommand {
1808 opcode: BlockOpcode::Read.into_primitive(),
1809 flags: BlockIoFlag::GROUP_ITEM.bits(),
1810 ..Default::default()
1811 },
1812 group: 1,
1813 vmoid: vmo_id.id,
1814 length: 1,
1815 ..Default::default()
1816 })
1817 .await
1818 .unwrap();
1819
1820 poll_fn(|cx: &mut Context<'_>| {
1822 if counter.load(Ordering::Relaxed) == 1 {
1823 Poll::Ready(())
1824 } else {
1825 cx.waker().wake_by_ref();
1827 Poll::Pending
1828 }
1829 })
1830 .await;
1831
1832 assert!(futures::poll!(fifo.read_entry()).is_pending());
1833
1834 fifo.write_entries(&BlockFifoRequest {
1835 command: BlockFifoCommand {
1836 opcode: BlockOpcode::Read.into_primitive(),
1837 flags: BlockIoFlag::GROUP_ITEM.bits(),
1838 ..Default::default()
1839 },
1840 group: 1,
1841 vmoid: vmo_id.id,
1842 length: 1,
1843 ..Default::default()
1844 })
1845 .await
1846 .unwrap();
1847
1848 fifo.write_entries(&BlockFifoRequest {
1849 command: BlockFifoCommand {
1850 opcode: BlockOpcode::Read.into_primitive(),
1851 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1852 ..Default::default()
1853 },
1854 reqid: 2,
1855 group: 1,
1856 vmoid: vmo_id.id,
1857 length: 1,
1858 ..Default::default()
1859 })
1860 .await
1861 .unwrap();
1862
1863 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1864 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
1865 assert_eq!(response.reqid, 2);
1866 assert_eq!(response.group, 1);
1867
1868 assert!(futures::poll!(fifo.read_entry()).is_pending());
1869
1870 assert_eq!(counter.load(Ordering::Relaxed), 1);
1872 }
1873 );
1874 }
1875
1876 #[fuchsia::test]
1877 async fn test_group_with_two_lasts() {
1878 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1879
1880 let (tx, rx) = oneshot::channel();
1881
1882 futures::join!(
1883 async move {
1884 let rx = Mutex::new(Some(rx));
1885 let block_server = BlockServer::new(
1886 BLOCK_SIZE,
1887 Arc::new(MockInterface {
1888 read_hook: Some(Box::new(move |_, _, _, _| {
1889 let rx = rx.lock().unwrap().take().unwrap();
1890 Box::pin(async {
1891 let _ = rx.await;
1892 Ok(())
1893 })
1894 })),
1895 }),
1896 );
1897 block_server.handle_requests(stream).await.unwrap();
1898 },
1899 async move {
1900 let (session_proxy, server) = fidl::endpoints::create_proxy();
1901
1902 proxy.open_session(server).unwrap();
1903
1904 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1905 let vmo_id = session_proxy
1906 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1907 .await
1908 .unwrap()
1909 .unwrap();
1910
1911 let fifo =
1912 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1913
1914 fifo.write_entries(&BlockFifoRequest {
1915 command: BlockFifoCommand {
1916 opcode: BlockOpcode::Read.into_primitive(),
1917 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1918 ..Default::default()
1919 },
1920 reqid: 1,
1921 group: 1,
1922 vmoid: vmo_id.id,
1923 length: 1,
1924 ..Default::default()
1925 })
1926 .await
1927 .unwrap();
1928
1929 fifo.write_entries(&BlockFifoRequest {
1930 command: BlockFifoCommand {
1931 opcode: BlockOpcode::Read.into_primitive(),
1932 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1933 ..Default::default()
1934 },
1935 reqid: 2,
1936 group: 1,
1937 vmoid: vmo_id.id,
1938 length: 1,
1939 ..Default::default()
1940 })
1941 .await
1942 .unwrap();
1943
1944 fifo.write_entries(&BlockFifoRequest {
1946 command: BlockFifoCommand {
1947 opcode: BlockOpcode::CloseVmo.into_primitive(),
1948 ..Default::default()
1949 },
1950 reqid: 3,
1951 vmoid: vmo_id.id,
1952 ..Default::default()
1953 })
1954 .await
1955 .unwrap();
1956
1957 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1959 assert_eq!(response.status, zx::sys::ZX_OK);
1960 assert_eq!(response.reqid, 3);
1961
1962 tx.send(()).unwrap();
1964
1965 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1968 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
1969 assert_eq!(response.reqid, 1);
1970 assert_eq!(response.group, 1);
1971 }
1972 );
1973 }
1974
1975 #[fuchsia::test(allow_stalls = false)]
1976 async fn test_requests_dont_block_sessions() {
1977 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1978
1979 let (tx, rx) = oneshot::channel();
1980
1981 fasync::Task::local(async move {
1982 let rx = Mutex::new(Some(rx));
1983 let block_server = BlockServer::new(
1984 BLOCK_SIZE,
1985 Arc::new(MockInterface {
1986 read_hook: Some(Box::new(move |_, _, _, _| {
1987 let rx = rx.lock().unwrap().take().unwrap();
1988 Box::pin(async {
1989 let _ = rx.await;
1990 Ok(())
1991 })
1992 })),
1993 }),
1994 );
1995 block_server.handle_requests(stream).await.unwrap();
1996 })
1997 .detach();
1998
1999 let mut fut = pin!(async {
2000 let (session_proxy, server) = fidl::endpoints::create_proxy();
2001
2002 proxy.open_session(server).unwrap();
2003
2004 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2005 let vmo_id = session_proxy
2006 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2007 .await
2008 .unwrap()
2009 .unwrap();
2010
2011 let fifo = fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2012
2013 fifo.write_entries(&BlockFifoRequest {
2014 command: BlockFifoCommand {
2015 opcode: BlockOpcode::Read.into_primitive(),
2016 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2017 ..Default::default()
2018 },
2019 reqid: 1,
2020 group: 1,
2021 vmoid: vmo_id.id,
2022 length: 1,
2023 ..Default::default()
2024 })
2025 .await
2026 .unwrap();
2027
2028 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2029 assert_eq!(response.status, zx::sys::ZX_OK);
2030 });
2031
2032 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2034
2035 let mut fut2 = pin!(proxy.get_volume_info());
2036
2037 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2039
2040 let _ = tx.send(());
2043
2044 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2045 }
2046
2047 #[fuchsia::test]
2048 async fn test_request_flow_control() {
2049 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2050
2051 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2054 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2055 let event_clone = event.clone();
2056 futures::join!(
2057 async move {
2058 let block_server = BlockServer::new(
2059 BLOCK_SIZE,
2060 Arc::new(MockInterface {
2061 read_hook: Some(Box::new(move |_, _, _, _| {
2062 let event_clone = event_clone.clone();
2063 Box::pin(async move {
2064 if !event_clone.1.load(Ordering::SeqCst) {
2065 event_clone.0.listen().await;
2066 }
2067 Ok(())
2068 })
2069 })),
2070 }),
2071 );
2072 block_server.handle_requests(stream).await.unwrap();
2073 },
2074 async move {
2075 let (session_proxy, server) = fidl::endpoints::create_proxy();
2076
2077 proxy.open_session(server).unwrap();
2078
2079 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2080 let vmo_id = session_proxy
2081 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2082 .await
2083 .unwrap()
2084 .unwrap();
2085
2086 let fifo =
2087 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2088
2089 for i in 0..MAX_REQUESTS {
2090 fifo.write_entries(&BlockFifoRequest {
2091 command: BlockFifoCommand {
2092 opcode: BlockOpcode::Read.into_primitive(),
2093 ..Default::default()
2094 },
2095 reqid: (i + 1) as u32,
2096 dev_offset: i,
2097 vmoid: vmo_id.id,
2098 length: 1,
2099 ..Default::default()
2100 })
2101 .await
2102 .unwrap();
2103 }
2104 assert!(futures::poll!(fifo.write_entries(&BlockFifoRequest {
2105 command: BlockFifoCommand {
2106 opcode: BlockOpcode::Read.into_primitive(),
2107 ..Default::default()
2108 },
2109 reqid: u32::MAX,
2110 dev_offset: MAX_REQUESTS,
2111 vmoid: vmo_id.id,
2112 length: 1,
2113 ..Default::default()
2114 }))
2115 .is_pending());
2116 event.1.store(true, Ordering::SeqCst);
2118 event.0.notify(usize::MAX);
2119 let mut finished_reqids = vec![];
2121 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2122 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2123 assert_eq!(response.status, zx::sys::ZX_OK);
2124 finished_reqids.push(response.reqid);
2125 fifo.write_entries(&BlockFifoRequest {
2126 command: BlockFifoCommand {
2127 opcode: BlockOpcode::Read.into_primitive(),
2128 ..Default::default()
2129 },
2130 reqid: (i + 1) as u32,
2131 dev_offset: i,
2132 vmoid: vmo_id.id,
2133 length: 1,
2134 ..Default::default()
2135 })
2136 .await
2137 .unwrap();
2138 }
2139 for _ in 0..MAX_REQUESTS {
2140 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2141 assert_eq!(response.status, zx::sys::ZX_OK);
2142 finished_reqids.push(response.reqid);
2143 }
2144 finished_reqids.sort();
2147 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2148 let mut i = 1;
2149 for reqid in finished_reqids {
2150 assert_eq!(reqid, i);
2151 i += 1;
2152 }
2153 }
2154 );
2155 }
2156
2157 #[fuchsia::test]
2158 async fn test_passthrough_io_with_fixed_map() {
2159 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2160
2161 let expected_op = Arc::new(Mutex::new(None));
2162 let expected_op_clone = expected_op.clone();
2163 futures::join!(
2164 async {
2165 let block_server = BlockServer::new(
2166 BLOCK_SIZE,
2167 Arc::new(IoMockInterface {
2168 return_errors: false,
2169 do_checks: true,
2170 expected_op: expected_op_clone,
2171 }),
2172 );
2173 block_server.handle_requests(stream).await.unwrap();
2174 },
2175 async move {
2176 let (session_proxy, server) = fidl::endpoints::create_proxy();
2177
2178 let mappings = [fblock::BlockOffsetMapping {
2179 source_block_offset: 0,
2180 target_block_offset: 10,
2181 length: 20,
2182 }];
2183 proxy.open_session_with_offset_map(server, None, Some(&mappings[..])).unwrap();
2184
2185 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2186 let vmo_id = session_proxy
2187 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2188 .await
2189 .unwrap()
2190 .unwrap();
2191
2192 let fifo =
2193 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2194
2195 *expected_op.lock().unwrap() = Some(ExpectedOp::Read(11, 2, 3));
2197 fifo.write_entries(&BlockFifoRequest {
2198 command: BlockFifoCommand {
2199 opcode: BlockOpcode::Read.into_primitive(),
2200 ..Default::default()
2201 },
2202 vmoid: vmo_id.id,
2203 dev_offset: 1,
2204 length: 2,
2205 vmo_offset: 3,
2206 ..Default::default()
2207 })
2208 .await
2209 .unwrap();
2210
2211 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2212 assert_eq!(response.status, zx::sys::ZX_OK);
2213
2214 *expected_op.lock().unwrap() = Some(ExpectedOp::Write(14, 5, 6));
2216 fifo.write_entries(&BlockFifoRequest {
2217 command: BlockFifoCommand {
2218 opcode: BlockOpcode::Write.into_primitive(),
2219 ..Default::default()
2220 },
2221 vmoid: vmo_id.id,
2222 dev_offset: 4,
2223 length: 5,
2224 vmo_offset: 6,
2225 ..Default::default()
2226 })
2227 .await
2228 .unwrap();
2229
2230 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2231 assert_eq!(response.status, zx::sys::ZX_OK);
2232
2233 *expected_op.lock().unwrap() = Some(ExpectedOp::Flush);
2235 fifo.write_entries(&BlockFifoRequest {
2236 command: BlockFifoCommand {
2237 opcode: BlockOpcode::Flush.into_primitive(),
2238 ..Default::default()
2239 },
2240 ..Default::default()
2241 })
2242 .await
2243 .unwrap();
2244
2245 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2246 assert_eq!(response.status, zx::sys::ZX_OK);
2247
2248 *expected_op.lock().unwrap() = Some(ExpectedOp::Trim(17, 3));
2250 fifo.write_entries(&BlockFifoRequest {
2251 command: BlockFifoCommand {
2252 opcode: BlockOpcode::Trim.into_primitive(),
2253 ..Default::default()
2254 },
2255 dev_offset: 7,
2256 length: 3,
2257 ..Default::default()
2258 })
2259 .await
2260 .unwrap();
2261
2262 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2263 assert_eq!(response.status, zx::sys::ZX_OK);
2264
2265 *expected_op.lock().unwrap() = None;
2267 fifo.write_entries(&BlockFifoRequest {
2268 command: BlockFifoCommand {
2269 opcode: BlockOpcode::Read.into_primitive(),
2270 ..Default::default()
2271 },
2272 vmoid: vmo_id.id,
2273 dev_offset: 19,
2274 length: 2,
2275 vmo_offset: 3,
2276 ..Default::default()
2277 })
2278 .await
2279 .unwrap();
2280
2281 let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2282 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2283
2284 std::mem::drop(proxy);
2285 }
2286 );
2287 }
2288}