1use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions};
6use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
7use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
8use fuchsia_async::epoch::{Epoch, EpochGuard};
9use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18use storage_device::buffer::Buffer;
19use zx::HandleBased;
20use {
21 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
22 fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
23};
24
25pub mod async_interface;
26pub mod c_interface;
27
28#[cfg(test)]
29mod decompression_tests;
30
31pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
32
33type TraceFlowId = Option<NonZero<u64>>;
34
35#[derive(Clone)]
36pub enum DeviceInfo {
37 Block(BlockInfo),
38 Partition(PartitionInfo),
39}
40
41impl DeviceInfo {
42 pub fn device_flags(&self) -> fblock::Flag {
43 match self {
44 Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
45 Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
46 }
47 }
48
49 pub fn block_count(&self) -> Option<u64> {
50 match self {
51 Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
52 Self::Partition(PartitionInfo { block_range, .. }) => {
53 block_range.as_ref().map(|range| range.end - range.start)
54 }
55 }
56 }
57
58 pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
59 match self {
60 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
61 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
62 max_transfer_blocks.clone()
63 }
64 }
65 }
66
67 fn max_transfer_size(&self, block_size: u32) -> u32 {
68 if let Some(max_blocks) = self.max_transfer_blocks() {
69 max_blocks.get() * block_size
70 } else {
71 MAX_TRANSFER_UNBOUNDED
72 }
73 }
74}
75
76#[derive(Clone, Default)]
78pub struct BlockInfo {
79 pub device_flags: fblock::Flag,
80 pub block_count: u64,
81 pub max_transfer_blocks: Option<NonZero<u32>>,
82}
83
84#[derive(Clone, Default)]
86pub struct PartitionInfo {
87 pub device_flags: fblock::Flag,
89 pub max_transfer_blocks: Option<NonZero<u32>>,
90 pub block_range: Option<Range<u64>>,
94 pub type_guid: [u8; 16],
95 pub instance_guid: [u8; 16],
96 pub name: String,
97 pub flags: u64,
98}
99
100struct ActiveRequest<S> {
103 session: S,
104 group_or_request: GroupOrRequest,
105 trace_flow_id: TraceFlowId,
106 _epoch_guard: EpochGuard<'static>,
107 status: zx::Status,
108 count: u32,
109 req_id: Option<u32>,
110 decompression_info: Option<DecompressionInfo>,
111}
112
113struct DecompressionInfo {
114 compressed_range: Range<usize>,
116
117 uncompressed_range: Range<u64>,
119
120 bytes_so_far: u64,
121 mapping: Arc<VmoMapping>,
122 buffer: Option<Buffer<'static>>,
123}
124
125impl DecompressionInfo {
126 fn uncompressed_slice(&self) -> *mut [u8] {
128 std::ptr::slice_from_raw_parts_mut(
129 (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
130 (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
131 )
132 }
133}
134
135pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
136
137impl<S> Default for ActiveRequests<S> {
138 fn default() -> Self {
139 Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
140 }
141}
142
143impl<S> ActiveRequests<S> {
144 fn complete_and_take_response(
145 &self,
146 request_id: RequestId,
147 status: zx::Status,
148 ) -> Option<(S, BlockFifoResponse)> {
149 self.0.lock().complete_and_take_response(request_id, status)
150 }
151
152 fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
153 MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
154 }
155}
156
157struct ActiveRequestsInner<S> {
158 requests: Slab<ActiveRequest<S>>,
159}
160
161impl<S> ActiveRequestsInner<S> {
163 fn complete(&mut self, request_id: RequestId, status: zx::Status) {
165 let group = &mut self.requests[request_id.0];
166
167 group.count = group.count.checked_sub(1).unwrap();
168 if status != zx::Status::OK && group.status == zx::Status::OK {
169 group.status = status
170 }
171
172 fuchsia_trace::duration!(
173 c"storage",
174 c"block_server::finish_transaction",
175 "request_id" => request_id.0,
176 "group_completed" => group.count == 0,
177 "status" => status.into_raw());
178 if let Some(trace_flow_id) = group.trace_flow_id {
179 fuchsia_trace::flow_step!(
180 c"storage",
181 c"block_server::finish_request",
182 trace_flow_id.get().into()
183 );
184 }
185
186 if group.count == 0
187 && group.status == zx::Status::OK
188 && let Some(info) = &mut group.decompression_info
189 {
190 thread_local! {
191 static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
192 std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
193 }
194 DECOMPRESSOR.with(|decompressor| {
195 let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
197 let mut decompressor = decompressor.borrow_mut();
198 if let Err(error) = decompressor.decompress_to_buffer(
199 &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
200 target_slice,
201 ) {
202 log::warn!(error:?; "Decompression error");
203 group.status = zx::Status::IO_DATA_INTEGRITY;
204 };
205 });
206 }
207 }
208
209 fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
211 let group = &self.requests[request_id.0];
212 match group.req_id {
213 Some(reqid) if group.count == 0 => {
214 let group = self.requests.remove(request_id.0);
215 Some((
216 group.session,
217 BlockFifoResponse {
218 status: group.status.into_raw(),
219 reqid,
220 group: group.group_or_request.group_id().unwrap_or(0),
221 ..Default::default()
222 },
223 ))
224 }
225 _ => None,
226 }
227 }
228
229 fn complete_and_take_response(
231 &mut self,
232 request_id: RequestId,
233 status: zx::Status,
234 ) -> Option<(S, BlockFifoResponse)> {
235 self.complete(request_id, status);
236 self.take_response(request_id)
237 }
238}
239
240pub struct BlockServer<SM> {
243 block_size: u32,
244 session_manager: Arc<SM>,
245}
246
247#[derive(Debug)]
249pub struct BlockOffsetMapping {
250 source_block_offset: u64,
251 target_block_offset: u64,
252 length: u64,
253}
254
255impl BlockOffsetMapping {
256 fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
257 blocks.0 >= self.source_block_offset
258 && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
259 }
260}
261
262impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
263 type Error = zx::Status;
264
265 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
266 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
267 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
268 Ok(Self {
269 source_block_offset: wire.source_block_offset,
270 target_block_offset: wire.target_block_offset,
271 length: wire.length,
272 })
273 }
274}
275
276pub struct OffsetMap {
278 mapping: Option<BlockOffsetMapping>,
279 max_transfer_blocks: Option<NonZero<u32>>,
280}
281
282impl OffsetMap {
283 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
285 Self { mapping: Some(mapping), max_transfer_blocks }
286 }
287
288 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
290 Self { mapping: None, max_transfer_blocks }
291 }
292
293 pub fn is_empty(&self) -> bool {
294 self.mapping.is_none()
295 }
296
297 fn mapping(&self) -> Option<&BlockOffsetMapping> {
298 self.mapping.as_ref()
299 }
300
301 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
302 self.max_transfer_blocks
303 }
304}
305
306pub trait SessionManager: 'static {
309 const SUPPORTS_DECOMPRESSION: bool;
310
311 type Session;
312
313 fn on_attach_vmo(
314 self: Arc<Self>,
315 vmo: &Arc<zx::Vmo>,
316 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
317
318 fn open_session(
323 self: Arc<Self>,
324 stream: fblock::SessionRequestStream,
325 offset_map: OffsetMap,
326 block_size: u32,
327 ) -> impl Future<Output = Result<(), Error>> + Send;
328
329 fn get_info(&self) -> Cow<'_, DeviceInfo>;
331
332 fn get_volume_info(
334 &self,
335 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
336 {
337 async { Err(zx::Status::NOT_SUPPORTED) }
338 }
339
340 fn query_slices(
342 &self,
343 _start_slices: &[u64],
344 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
345 async { Err(zx::Status::NOT_SUPPORTED) }
346 }
347
348 fn extend(
350 &self,
351 _start_slice: u64,
352 _slice_count: u64,
353 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
354 async { Err(zx::Status::NOT_SUPPORTED) }
355 }
356
357 fn shrink(
359 &self,
360 _start_slice: u64,
361 _slice_count: u64,
362 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
363 async { Err(zx::Status::NOT_SUPPORTED) }
364 }
365
366 fn active_requests(&self) -> &ActiveRequests<Self::Session>;
368}
369
370pub trait IntoSessionManager {
371 type SM: SessionManager;
372
373 fn into_session_manager(self) -> Arc<Self::SM>;
374}
375
376impl<SM: SessionManager> BlockServer<SM> {
377 pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
378 Self { block_size, session_manager: session_manager.into_session_manager() }
379 }
380
381 pub fn session_manager(&self) -> &SM {
382 self.session_manager.as_ref()
383 }
384
385 pub async fn handle_requests(
387 &self,
388 mut requests: fvolume::VolumeRequestStream,
389 ) -> Result<(), Error> {
390 let scope = fasync::Scope::new();
391 loop {
392 match requests.try_next().await {
393 Ok(Some(request)) => {
394 if let Some(session) = self.handle_request(request).await? {
395 scope.spawn(session.map(|_| ()));
396 }
397 }
398 Ok(None) => break,
399 Err(err) => log::warn!(err:?; "Invalid request"),
400 }
401 }
402 scope.await;
403 Ok(())
404 }
405
406 async fn handle_request(
409 &self,
410 request: fvolume::VolumeRequest,
411 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
412 match request {
413 fvolume::VolumeRequest::GetInfo { responder } => {
414 let info = self.device_info();
415 let max_transfer_size = info.max_transfer_size(self.block_size);
416 let (block_count, mut flags) = match info.as_ref() {
417 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
418 (*block_count, *device_flags)
419 }
420 DeviceInfo::Partition(partition_info) => {
421 let block_count = if let Some(range) = partition_info.block_range.as_ref() {
422 range.end - range.start
423 } else {
424 let volume_info = self.session_manager.get_volume_info().await?;
425 volume_info.0.slice_size * volume_info.1.partition_slice_count
426 / self.block_size as u64
427 };
428 (block_count, partition_info.device_flags)
429 }
430 };
431 if SM::SUPPORTS_DECOMPRESSION {
432 flags |= fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT;
433 }
434 responder.send(Ok(&fblock::BlockInfo {
435 block_count,
436 block_size: self.block_size,
437 max_transfer_size,
438 flags,
439 }))?;
440 }
441 fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
442 let info = self.device_info();
443 return Ok(Some(self.session_manager.clone().open_session(
444 session.into_stream(),
445 OffsetMap::empty(info.max_transfer_blocks()),
446 self.block_size,
447 )));
448 }
449 fvolume::VolumeRequest::OpenSessionWithOffsetMap {
450 session,
451 mapping,
452 control_handle: _,
453 } => {
454 let info = self.device_info();
455 let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
456 Ok(m) => m,
457 Err(status) => {
458 session.close_with_epitaph(status)?;
459 return Ok(None);
460 }
461 };
462 if let Some(max) = info.block_count() {
463 if initial_mapping.target_block_offset + initial_mapping.length > max {
464 log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
465 session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
466 return Ok(None);
467 }
468 }
469 return Ok(Some(self.session_manager.clone().open_session(
470 session.into_stream(),
471 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
472 self.block_size,
473 )));
474 }
475 fvolume::VolumeRequest::GetTypeGuid { responder } => {
476 let info = self.device_info();
477 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
478 let mut guid =
479 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
480 guid.value.copy_from_slice(&partition_info.type_guid);
481 responder.send(zx::sys::ZX_OK, Some(&guid))?;
482 } else {
483 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
484 }
485 }
486 fvolume::VolumeRequest::GetInstanceGuid { responder } => {
487 let info = self.device_info();
488 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
489 let mut guid =
490 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
491 guid.value.copy_from_slice(&partition_info.instance_guid);
492 responder.send(zx::sys::ZX_OK, Some(&guid))?;
493 } else {
494 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
495 }
496 }
497 fvolume::VolumeRequest::GetName { responder } => {
498 let info = self.device_info();
499 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
500 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
501 } else {
502 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
503 }
504 }
505 fvolume::VolumeRequest::GetMetadata { responder } => {
506 let info = self.device_info();
507 if let DeviceInfo::Partition(info) = info.as_ref() {
508 let mut type_guid =
509 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
510 type_guid.value.copy_from_slice(&info.type_guid);
511 let mut instance_guid =
512 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
513 instance_guid.value.copy_from_slice(&info.instance_guid);
514 responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
515 name: Some(info.name.clone()),
516 type_guid: Some(type_guid),
517 instance_guid: Some(instance_guid),
518 start_block_offset: info.block_range.as_ref().map(|range| range.start),
519 num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
520 flags: Some(info.flags),
521 ..Default::default()
522 }))?;
523 } else {
524 responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
525 }
526 }
527 fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
528 match self.session_manager.query_slices(&start_slices).await {
529 Ok(mut results) => {
530 let results_len = results.len();
531 assert!(results_len <= 16);
532 results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
533 responder.send(
534 zx::sys::ZX_OK,
535 &results.try_into().unwrap(),
536 results_len as u64,
537 )?;
538 }
539 Err(s) => {
540 responder.send(
541 s.into_raw(),
542 &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
543 0,
544 )?;
545 }
546 }
547 }
548 fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
549 match self.session_manager.get_volume_info().await {
550 Ok((manager_info, volume_info)) => {
551 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
552 }
553 Err(s) => responder.send(s.into_raw(), None, None)?,
554 }
555 }
556 fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
557 responder.send(
558 zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
559 .into_raw(),
560 )?;
561 }
562 fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
563 responder.send(
564 zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
565 .into_raw(),
566 )?;
567 }
568 fvolume::VolumeRequest::Destroy { responder, .. } => {
569 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
570 }
571 }
572 Ok(None)
573 }
574
575 fn device_info(&self) -> Cow<'_, DeviceInfo> {
576 self.session_manager.get_info()
577 }
578}
579
580struct SessionHelper<SM: SessionManager> {
581 session_manager: Arc<SM>,
582 offset_map: OffsetMap,
583 block_size: u32,
584 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
585 vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
586}
587
588struct VmoMapping {
589 base: usize,
590 size: usize,
591}
592
593impl VmoMapping {
594 fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
595 let size = vmo.get_size().unwrap() as usize;
596 Ok(Arc::new(Self {
597 base: fuchsia_runtime::vmar_root_self()
598 .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
599 .inspect_err(|error| {
600 log::warn!(error:?, size; "VmoMapping: unable to map VMO");
601 })?,
602 size,
603 }))
604 }
605}
606
607impl Drop for VmoMapping {
608 fn drop(&mut self) {
609 unsafe {
611 let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
612 }
613 }
614}
615
616impl<SM: SessionManager> SessionHelper<SM> {
617 fn new(
618 session_manager: Arc<SM>,
619 offset_map: OffsetMap,
620 block_size: u32,
621 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
622 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
623 Ok((
624 Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
625 fifo,
626 ))
627 }
628
629 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
630 match request {
631 fblock::SessionRequest::GetFifo { responder } => {
632 let rights = zx::Rights::TRANSFER
633 | zx::Rights::READ
634 | zx::Rights::WRITE
635 | zx::Rights::SIGNAL
636 | zx::Rights::WAIT;
637 match self.peer_fifo.duplicate_handle(rights) {
638 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
639 Err(s) => responder.send(Err(s.into_raw()))?,
640 }
641 Ok(())
642 }
643 fblock::SessionRequest::AttachVmo { vmo, responder } => {
644 let vmo = Arc::new(vmo);
645 let vmo_id = {
646 let mut vmos = self.vmos.lock();
647 if vmos.len() == u16::MAX as usize {
648 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
649 return Ok(());
650 } else {
651 let vmo_id = match vmos.last_entry() {
652 None => 1,
653 Some(o) => {
654 o.key().checked_add(1).unwrap_or_else(|| {
655 let mut vmo_id = 1;
656 for (&id, _) in &*vmos {
658 if id > vmo_id {
659 break;
660 }
661 vmo_id = id + 1;
662 }
663 vmo_id
664 })
665 }
666 };
667 vmos.insert(vmo_id, (vmo.clone(), None));
668 vmo_id
669 }
670 };
671 self.session_manager.clone().on_attach_vmo(&vmo).await?;
672 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
673 Ok(())
674 }
675 fblock::SessionRequest::Close { responder } => {
676 responder.send(Ok(()))?;
677 Err(anyhow!("Closed"))
678 }
679 }
680 }
681
682 fn decode_fifo_request(
684 &self,
685 session: SM::Session,
686 request: &BlockFifoRequest,
687 ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
688 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
689
690 let request_bytes = request.length as u64 * self.block_size as u64;
691
692 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
693 .ok_or(zx::Status::INVALID_ARGS)
694 .and_then(|code| {
695 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
696 if code != BlockOpcode::Read {
697 return Err(zx::Status::INVALID_ARGS);
698 }
699 if !SM::SUPPORTS_DECOMPRESSION {
700 return Err(zx::Status::NOT_SUPPORTED);
701 }
702 }
703 if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
704 if request.length == 0 {
705 return Err(zx::Status::INVALID_ARGS);
706 }
707 if request.dev_offset.checked_add(request.length as u64).is_none()
709 || (code != BlockOpcode::Trim
710 && request_bytes.checked_add(request.vmo_offset).is_none())
711 {
712 return Err(zx::Status::OUT_OF_RANGE);
713 }
714 }
715 Ok(match code {
716 BlockOpcode::Read => Operation::Read {
717 device_block_offset: request.dev_offset,
718 block_count: request.length,
719 _unused: 0,
720 vmo_offset: request
721 .vmo_offset
722 .checked_mul(self.block_size as u64)
723 .ok_or(zx::Status::OUT_OF_RANGE)?,
724 options: ReadOptions {
725 inline_crypto_options: InlineCryptoOptions {
726 dun: request.dun,
727 slot: request.slot,
728 },
729 },
730 },
731 BlockOpcode::Write => {
732 let mut options = WriteOptions {
733 inline_crypto_options: InlineCryptoOptions {
734 dun: request.dun,
735 slot: request.slot,
736 },
737 ..WriteOptions::default()
738 };
739 if flags.contains(BlockIoFlag::FORCE_ACCESS) {
740 options.flags |= WriteFlags::FORCE_ACCESS;
741 }
742 if flags.contains(BlockIoFlag::PRE_BARRIER) {
743 options.flags |= WriteFlags::PRE_BARRIER;
744 }
745 Operation::Write {
746 device_block_offset: request.dev_offset,
747 block_count: request.length,
748 _unused: 0,
749 options,
750 vmo_offset: request
751 .vmo_offset
752 .checked_mul(self.block_size as u64)
753 .ok_or(zx::Status::OUT_OF_RANGE)?,
754 }
755 }
756 BlockOpcode::Flush => Operation::Flush,
757 BlockOpcode::Trim => Operation::Trim {
758 device_block_offset: request.dev_offset,
759 block_count: request.length,
760 },
761 BlockOpcode::CloseVmo => Operation::CloseVmo,
762 })
763 });
764
765 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
766 GroupOrRequest::Group(request.group)
767 } else {
768 GroupOrRequest::Request(request.reqid)
769 };
770
771 let mut active_requests = self.session_manager.active_requests().0.lock();
772 let mut request_id = None;
773
774 if group_or_request.is_group() {
785 for (key, group) in &mut active_requests.requests {
789 if group.group_or_request == group_or_request {
790 if group.req_id.is_some() {
791 if group.status == zx::Status::OK {
793 group.status = zx::Status::INVALID_ARGS;
794 }
795 return Err(None);
797 }
798 if group.status == zx::Status::OK
800 && let Some(info) = &mut group.decompression_info
801 {
802 if let Ok(Operation::Read {
803 device_block_offset,
804 mut block_count,
805 options,
806 vmo_offset: 0,
807 ..
808 }) = operation
809 {
810 let remaining_bytes = info
811 .compressed_range
812 .end
813 .next_multiple_of(self.block_size as usize)
814 as u64
815 - info.bytes_so_far;
816 if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
817 || request.total_compressed_bytes != 0
818 || request.uncompressed_bytes != 0
819 || request.compressed_prefix_bytes != 0
820 || (flags.contains(BlockIoFlag::GROUP_LAST)
821 && info.bytes_so_far + request_bytes
822 < info.compressed_range.end as u64)
823 || (!flags.contains(BlockIoFlag::GROUP_LAST)
824 && request_bytes >= remaining_bytes)
825 {
826 group.status = zx::Status::INVALID_ARGS;
827 } else {
828 if request_bytes > remaining_bytes {
837 block_count = (remaining_bytes / self.block_size as u64) as u32;
838 }
839
840 operation = Ok(Operation::ContinueDecompressedRead {
841 offset: info.bytes_so_far,
842 device_block_offset,
843 block_count,
844 options,
845 });
846
847 info.bytes_so_far += block_count as u64 * self.block_size as u64;
848 }
849 } else {
850 group.status = zx::Status::INVALID_ARGS;
851 }
852 }
853 if flags.contains(BlockIoFlag::GROUP_LAST) {
854 group.req_id = Some(request.reqid);
855 if group.status != zx::Status::OK {
858 operation = Err(group.status);
859 }
860 } else if group.status != zx::Status::OK {
861 return Err(None);
864 }
865 request_id = Some(RequestId(key));
866 group.count += 1;
867 break;
868 }
869 }
870 }
871
872 let is_single_request =
873 !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
874
875 let mut decompression_info = None;
876 let vmo = match operation {
877 Ok(Operation::Read {
878 device_block_offset,
879 mut block_count,
880 options,
881 vmo_offset,
882 ..
883 }) => match self.vmos.lock().get_mut(&request.vmoid) {
884 Some((vmo, mapping)) => {
885 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
886 let compressed_range = request.compressed_prefix_bytes as usize
887 ..request.compressed_prefix_bytes as usize
888 + request.total_compressed_bytes as usize;
889 let required_buffer_size =
890 compressed_range.end.next_multiple_of(self.block_size as usize);
891
892 if compressed_range.start >= compressed_range.end
894 || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
895 || (is_single_request && request_bytes < compressed_range.end as u64)
896 || (!is_single_request && request_bytes >= required_buffer_size as u64)
897 {
898 Err(zx::Status::INVALID_ARGS)
899 } else {
900 let bytes_so_far = if request_bytes > required_buffer_size as u64 {
907 block_count =
908 (required_buffer_size / self.block_size as usize) as u32;
909 required_buffer_size as u64
910 } else {
911 request_bytes
912 };
913
914 match mapping {
916 Some(mapping) => Ok(mapping.clone()),
917 None => {
918 VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
919 }
920 }
921 .and_then(|mapping| {
922 if vmo_offset
925 .checked_add(request.uncompressed_bytes as u64)
926 .is_some_and(|end| end <= mapping.size as u64)
927 {
928 Ok(mapping)
929 } else {
930 Err(zx::Status::OUT_OF_RANGE)
931 }
932 })
933 .map(|mapping| {
934 operation = Ok(Operation::StartDecompressedRead {
939 required_buffer_size,
940 device_block_offset,
941 block_count,
942 options,
943 });
944 decompression_info = Some(DecompressionInfo {
947 compressed_range,
948 bytes_so_far,
949 mapping,
950 uncompressed_range: vmo_offset
951 ..vmo_offset + request.uncompressed_bytes as u64,
952 buffer: None,
953 });
954 None
955 })
956 }
957 } else {
958 Ok(Some(vmo.clone()))
959 }
960 }
961 None => Err(zx::Status::IO),
962 },
963 Ok(Operation::Write { .. }) => self
964 .vmos
965 .lock()
966 .get(&request.vmoid)
967 .cloned()
968 .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
969 Ok(Operation::CloseVmo) => {
970 self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
971 let vmo_clone = vmo.clone();
972 Epoch::global().defer(move || drop(vmo_clone));
975 Ok(Some(vmo))
976 })
977 }
978 _ => Ok(None),
979 }
980 .unwrap_or_else(|e| {
981 operation = Err(e);
982 None
983 });
984
985 let trace_flow_id = NonZero::new(request.trace_flow_id);
986 let request_id = request_id.unwrap_or_else(|| {
987 RequestId(active_requests.requests.insert(ActiveRequest {
988 session,
989 group_or_request,
990 trace_flow_id,
991 _epoch_guard: Epoch::global().guard(),
992 status: zx::Status::OK,
993 count: 1,
994 req_id: is_single_request.then_some(request.reqid),
995 decompression_info,
996 }))
997 });
998
999 Ok(DecodedRequest {
1000 request_id,
1001 trace_flow_id,
1002 operation: operation.map_err(|status| {
1003 active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
1004 })?,
1005 vmo,
1006 })
1007 }
1008
1009 fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1010 std::mem::take(&mut *self.vmos.lock())
1011 }
1012
1013 fn map_request(
1015 &self,
1016 mut request: DecodedRequest,
1017 active_request: &mut ActiveRequest<SM::Session>,
1018 ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1019 if active_request.status != zx::Status::OK {
1020 return Err(zx::Status::BAD_STATE);
1021 }
1022 let mapping = self.offset_map.mapping();
1023 match (mapping, request.operation.blocks()) {
1024 (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1025 return Err(zx::Status::OUT_OF_RANGE);
1026 }
1027 _ => {}
1028 }
1029 let remainder = request.operation.map(
1030 self.offset_map.mapping(),
1031 self.offset_map.max_transfer_blocks(),
1032 self.block_size,
1033 );
1034 if remainder.is_some() {
1035 active_request.count += 1;
1036 }
1037 static CACHE: AtomicU64 = AtomicU64::new(0);
1038 if let Some(context) =
1039 fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
1040 {
1041 use fuchsia_trace::ArgValue;
1042 let trace_args = [
1043 ArgValue::of("request_id", request.request_id.0),
1044 ArgValue::of("opcode", request.operation.trace_label()),
1045 ];
1046 let _scope = fuchsia_trace::duration(
1047 c"storage",
1048 c"block_server::start_transaction",
1049 &trace_args,
1050 );
1051 if let Some(trace_flow_id) = active_request.trace_flow_id {
1052 fuchsia_trace::flow_step(
1053 &context,
1054 c"block_server::start_transaction",
1055 trace_flow_id.get().into(),
1056 &[],
1057 );
1058 }
1059 }
1060 let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1061 Ok((request, remainder))
1062 }
1063
1064 fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1065 self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1066 }
1067}
1068
1069#[repr(transparent)]
1070#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1071pub struct RequestId(usize);
1072
1073#[derive(Clone, Debug)]
1074struct DecodedRequest {
1075 request_id: RequestId,
1076 trace_flow_id: TraceFlowId,
1077 operation: Operation,
1078 vmo: Option<Arc<zx::Vmo>>,
1079}
1080
1081pub type WriteFlags = block_protocol::WriteFlags;
1083pub type WriteOptions = block_protocol::WriteOptions;
1084pub type ReadOptions = block_protocol::ReadOptions;
1085
1086#[repr(C)]
1087#[derive(Clone, Debug, PartialEq, Eq)]
1088pub enum Operation {
1089 Read {
1094 device_block_offset: u64,
1095 block_count: u32,
1096 _unused: u32,
1097 vmo_offset: u64,
1098 options: ReadOptions,
1099 },
1100 Write {
1101 device_block_offset: u64,
1102 block_count: u32,
1103 _unused: u32,
1104 vmo_offset: u64,
1105 options: WriteOptions,
1106 },
1107 Flush,
1108 Trim {
1109 device_block_offset: u64,
1110 block_count: u32,
1111 },
1112 CloseVmo,
1114 StartDecompressedRead {
1115 required_buffer_size: usize,
1116 device_block_offset: u64,
1117 block_count: u32,
1118 options: ReadOptions,
1119 },
1120 ContinueDecompressedRead {
1121 offset: u64,
1122 device_block_offset: u64,
1123 block_count: u32,
1124 options: ReadOptions,
1125 },
1126}
1127
1128impl Operation {
1129 fn trace_label(&self) -> &'static str {
1130 match self {
1131 Operation::Read { .. } => "read",
1132 Operation::Write { .. } => "write",
1133 Operation::Flush { .. } => "flush",
1134 Operation::Trim { .. } => "trim",
1135 Operation::CloseVmo { .. } => "close_vmo",
1136 Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1137 Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1138 }
1139 }
1140
1141 fn blocks(&self) -> Option<(u64, u32)> {
1143 match self {
1144 Operation::Read { device_block_offset, block_count, .. }
1145 | Operation::Write { device_block_offset, block_count, .. }
1146 | Operation::Trim { device_block_offset, block_count, .. } => {
1147 Some((*device_block_offset, *block_count))
1148 }
1149 _ => None,
1150 }
1151 }
1152
1153 fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1155 match self {
1156 Operation::Read { device_block_offset, block_count, .. }
1157 | Operation::Write { device_block_offset, block_count, .. }
1158 | Operation::Trim { device_block_offset, block_count, .. } => {
1159 Some((device_block_offset, block_count))
1160 }
1161 _ => None,
1162 }
1163 }
1164
1165 fn map(
1168 &mut self,
1169 mapping: Option<&BlockOffsetMapping>,
1170 max_blocks: Option<NonZero<u32>>,
1171 block_size: u32,
1172 ) -> Option<Self> {
1173 let mut max = match self {
1174 Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1175 _ => None,
1176 };
1177 let (offset, length) = self.blocks_mut()?;
1178 let orig_offset = *offset;
1179 if let Some(mapping) = mapping {
1180 let delta = *offset - mapping.source_block_offset;
1181 debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1182 *offset = mapping.target_block_offset + delta;
1183 let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1184 max = match max {
1185 None => Some(mapping_max),
1186 Some(m) => Some(std::cmp::min(m, mapping_max)),
1187 };
1188 };
1189 if let Some(max) = max {
1190 if *length as u64 > max {
1191 let rem = (*length as u64 - max) as u32;
1192 *length = max as u32;
1193 return Some(match self {
1194 Operation::Read {
1195 device_block_offset: _,
1196 block_count: _,
1197 vmo_offset,
1198 _unused,
1199 options,
1200 } => Operation::Read {
1201 device_block_offset: orig_offset + max,
1202 block_count: rem,
1203 vmo_offset: *vmo_offset + max * block_size as u64,
1204 _unused: *_unused,
1205 options: *options,
1206 },
1207 Operation::Write {
1208 device_block_offset: _,
1209 block_count: _,
1210 _unused,
1211 vmo_offset,
1212 options,
1213 } => Operation::Write {
1214 device_block_offset: orig_offset + max,
1215 block_count: rem,
1216 _unused: *_unused,
1217 vmo_offset: *vmo_offset + max * block_size as u64,
1218 options: *options,
1219 },
1220 Operation::Trim { device_block_offset: _, block_count: _ } => {
1221 Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1222 }
1223 _ => unreachable!(),
1224 });
1225 }
1226 }
1227 None
1228 }
1229
1230 pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1232 if let Operation::Write { options, .. } = self {
1233 options.flags.contains(value)
1234 } else {
1235 false
1236 }
1237 }
1238
1239 pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1241 if let Operation::Write { options, .. } = self {
1242 let result = options.flags.contains(value);
1243 options.flags.remove(value);
1244 result
1245 } else {
1246 false
1247 }
1248 }
1249}
1250
1251#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1252pub enum GroupOrRequest {
1253 Group(u16),
1254 Request(u32),
1255}
1256
1257impl GroupOrRequest {
1258 fn is_group(&self) -> bool {
1259 matches!(self, Self::Group(_))
1260 }
1261
1262 fn group_id(&self) -> Option<u16> {
1263 match self {
1264 Self::Group(id) => Some(*id),
1265 Self::Request(_) => None,
1266 }
1267 }
1268}
1269
1270#[cfg(test)]
1271mod tests {
1272 use super::{
1273 BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1274 TraceFlowId,
1275 };
1276 use assert_matches::assert_matches;
1277 use block_protocol::{
1278 BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags,
1279 WriteOptions,
1280 };
1281 use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1282 use fuchsia_sync::Mutex;
1283 use futures::FutureExt as _;
1284 use futures::channel::oneshot;
1285 use futures::future::BoxFuture;
1286 use std::borrow::Cow;
1287 use std::future::poll_fn;
1288 use std::num::NonZero;
1289 use std::pin::pin;
1290 use std::sync::Arc;
1291 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1292 use std::task::{Context, Poll};
1293 use zx::{AsHandleRef as _, HandleBased as _};
1294 use {
1295 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1296 fuchsia_async as fasync,
1297 };
1298
1299 #[derive(Default)]
1300 struct MockInterface {
1301 read_hook: Option<
1302 Box<
1303 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1304 + Send
1305 + Sync,
1306 >,
1307 >,
1308 write_hook:
1309 Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1310 barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1311 }
1312
1313 impl super::async_interface::Interface for MockInterface {
1314 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1315 Ok(())
1316 }
1317
1318 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1319 Cow::Owned(test_device_info())
1320 }
1321
1322 async fn read(
1323 &self,
1324 device_block_offset: u64,
1325 block_count: u32,
1326 vmo: &Arc<zx::Vmo>,
1327 vmo_offset: u64,
1328 _opts: ReadOptions,
1329 _trace_flow_id: TraceFlowId,
1330 ) -> Result<(), zx::Status> {
1331 if let Some(read_hook) = &self.read_hook {
1332 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1333 } else {
1334 unimplemented!();
1335 }
1336 }
1337
1338 async fn write(
1339 &self,
1340 device_block_offset: u64,
1341 _block_count: u32,
1342 _vmo: &Arc<zx::Vmo>,
1343 _vmo_offset: u64,
1344 opts: WriteOptions,
1345 _trace_flow_id: TraceFlowId,
1346 ) -> Result<(), zx::Status> {
1347 if opts.flags.contains(WriteFlags::PRE_BARRIER)
1348 && let Some(barrier_hook) = &self.barrier_hook
1349 {
1350 barrier_hook()?;
1351 }
1352 if let Some(write_hook) = &self.write_hook {
1353 write_hook(device_block_offset).await
1354 } else {
1355 unimplemented!();
1356 }
1357 }
1358
1359 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1360 Ok(())
1361 }
1362
1363 async fn trim(
1364 &self,
1365 _device_block_offset: u64,
1366 _block_count: u32,
1367 _trace_flow_id: TraceFlowId,
1368 ) -> Result<(), zx::Status> {
1369 unreachable!();
1370 }
1371
1372 async fn get_volume_info(
1373 &self,
1374 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1375 let () = std::future::pending().await;
1377 unreachable!();
1378 }
1379 }
1380
1381 const BLOCK_SIZE: u32 = 512;
1382 const MAX_TRANSFER_BLOCKS: u32 = 10;
1383
1384 fn test_device_info() -> DeviceInfo {
1385 DeviceInfo::Partition(PartitionInfo {
1386 device_flags: fblock::Flag::READONLY
1387 | fblock::Flag::BARRIER_SUPPORT
1388 | fblock::Flag::FUA_SUPPORT,
1389 max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1390 block_range: Some(0..100),
1391 type_guid: [1; 16],
1392 instance_guid: [2; 16],
1393 name: "foo".to_string(),
1394 flags: 0xabcd,
1395 })
1396 }
1397
1398 #[fuchsia::test]
1399 async fn test_barriers_ordering() {
1400 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1401 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1402 let barrier_called = Arc::new(AtomicBool::new(false));
1403
1404 futures::join!(
1405 async move {
1406 let barrier_called_clone = barrier_called.clone();
1407 let block_server = BlockServer::new(
1408 BLOCK_SIZE,
1409 Arc::new(MockInterface {
1410 barrier_hook: Some(Box::new(move || {
1411 barrier_called.store(true, Ordering::Relaxed);
1412 Ok(())
1413 })),
1414 write_hook: Some(Box::new(move |device_block_offset| {
1415 let barrier_called = barrier_called_clone.clone();
1416 Box::pin(async move {
1417 if device_block_offset % 2 == 0 {
1419 fasync::Timer::new(fasync::MonotonicInstant::after(
1420 zx::MonotonicDuration::from_millis(200),
1421 ))
1422 .await;
1423 }
1424 assert!(barrier_called.load(Ordering::Relaxed));
1425 Ok(())
1426 })
1427 })),
1428 ..MockInterface::default()
1429 }),
1430 );
1431 block_server.handle_requests(stream).await.unwrap();
1432 },
1433 async move {
1434 let (session_proxy, server) = fidl::endpoints::create_proxy();
1435
1436 proxy.open_session(server).unwrap();
1437
1438 let vmo_id = session_proxy
1439 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1440 .await
1441 .unwrap()
1442 .unwrap();
1443 assert_ne!(vmo_id.id, 0);
1444
1445 let mut fifo =
1446 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1447 let (mut reader, mut writer) = fifo.async_io();
1448
1449 writer
1450 .write_entries(&BlockFifoRequest {
1451 command: BlockFifoCommand {
1452 opcode: BlockOpcode::Write.into_primitive(),
1453 flags: BlockIoFlag::PRE_BARRIER.bits(),
1454 ..Default::default()
1455 },
1456 vmoid: vmo_id.id,
1457 dev_offset: 0,
1458 length: 5,
1459 vmo_offset: 6,
1460 ..Default::default()
1461 })
1462 .await
1463 .unwrap();
1464
1465 for i in 0..10 {
1466 writer
1467 .write_entries(&BlockFifoRequest {
1468 command: BlockFifoCommand {
1469 opcode: BlockOpcode::Write.into_primitive(),
1470 ..Default::default()
1471 },
1472 vmoid: vmo_id.id,
1473 dev_offset: i + 1,
1474 length: 5,
1475 vmo_offset: 6,
1476 ..Default::default()
1477 })
1478 .await
1479 .unwrap();
1480 }
1481 for _ in 0..11 {
1482 let mut response = BlockFifoResponse::default();
1483 reader.read_entries(&mut response).await.unwrap();
1484 assert_eq!(response.status, zx::sys::ZX_OK);
1485 }
1486
1487 std::mem::drop(proxy);
1488 }
1489 );
1490 }
1491
1492 #[fuchsia::test]
1493 async fn test_info() {
1494 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1495
1496 futures::join!(
1497 async {
1498 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1499 block_server.handle_requests(stream).await.unwrap();
1500 },
1501 async {
1502 let expected_info = test_device_info();
1503 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1504 info
1505 } else {
1506 unreachable!()
1507 };
1508
1509 let block_info = proxy.get_info().await.unwrap().unwrap();
1510 assert_eq!(
1511 block_info.block_count,
1512 partition_info.block_range.as_ref().unwrap().end
1513 - partition_info.block_range.as_ref().unwrap().start
1514 );
1515 assert_eq!(
1516 block_info.flags,
1517 fblock::Flag::READONLY
1518 | fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT
1519 | fblock::Flag::BARRIER_SUPPORT
1520 | fblock::Flag::FUA_SUPPORT
1521 );
1522
1523 assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1524
1525 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1526 assert_eq!(status, zx::sys::ZX_OK);
1527 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1528
1529 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1530 assert_eq!(status, zx::sys::ZX_OK);
1531 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1532
1533 let (status, name) = proxy.get_name().await.unwrap();
1534 assert_eq!(status, zx::sys::ZX_OK);
1535 assert_eq!(name.as_ref(), Some(&partition_info.name));
1536
1537 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1538 assert_eq!(metadata.name, name);
1539 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1540 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1541 assert_eq!(
1542 metadata.start_block_offset,
1543 Some(partition_info.block_range.as_ref().unwrap().start)
1544 );
1545 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1546 assert_eq!(metadata.flags, Some(partition_info.flags));
1547
1548 std::mem::drop(proxy);
1549 }
1550 );
1551 }
1552
1553 #[fuchsia::test]
1554 async fn test_attach_vmo() {
1555 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1556
1557 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1558 let koid = vmo.get_koid().unwrap();
1559
1560 futures::join!(
1561 async {
1562 let block_server = BlockServer::new(
1563 BLOCK_SIZE,
1564 Arc::new(MockInterface {
1565 read_hook: Some(Box::new(move |_, _, vmo, _| {
1566 assert_eq!(vmo.get_koid().unwrap(), koid);
1567 Box::pin(async { Ok(()) })
1568 })),
1569 ..MockInterface::default()
1570 }),
1571 );
1572 block_server.handle_requests(stream).await.unwrap();
1573 },
1574 async move {
1575 let (session_proxy, server) = fidl::endpoints::create_proxy();
1576
1577 proxy.open_session(server).unwrap();
1578
1579 let vmo_id = session_proxy
1580 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1581 .await
1582 .unwrap()
1583 .unwrap();
1584 assert_ne!(vmo_id.id, 0);
1585
1586 let mut fifo =
1587 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1588 let (mut reader, mut writer) = fifo.async_io();
1589
1590 let mut count = 1;
1592 loop {
1593 match session_proxy
1594 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1595 .await
1596 .unwrap()
1597 {
1598 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1599 Err(e) => {
1600 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1601 break;
1602 }
1603 }
1604
1605 if count % 10 == 0 {
1607 writer
1608 .write_entries(&BlockFifoRequest {
1609 command: BlockFifoCommand {
1610 opcode: BlockOpcode::Read.into_primitive(),
1611 ..Default::default()
1612 },
1613 vmoid: vmo_id.id,
1614 length: 1,
1615 ..Default::default()
1616 })
1617 .await
1618 .unwrap();
1619
1620 let mut response = BlockFifoResponse::default();
1621 reader.read_entries(&mut response).await.unwrap();
1622 assert_eq!(response.status, zx::sys::ZX_OK);
1623 }
1624
1625 count += 1;
1626 }
1627
1628 assert_eq!(count, u16::MAX as u64);
1629
1630 writer
1632 .write_entries(&BlockFifoRequest {
1633 command: BlockFifoCommand {
1634 opcode: BlockOpcode::CloseVmo.into_primitive(),
1635 ..Default::default()
1636 },
1637 vmoid: vmo_id.id,
1638 ..Default::default()
1639 })
1640 .await
1641 .unwrap();
1642
1643 let mut response = BlockFifoResponse::default();
1644 reader.read_entries(&mut response).await.unwrap();
1645 assert_eq!(response.status, zx::sys::ZX_OK);
1646
1647 let new_vmo_id = session_proxy
1648 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1649 .await
1650 .unwrap()
1651 .unwrap();
1652 assert_eq!(new_vmo_id.id, vmo_id.id);
1654
1655 std::mem::drop(proxy);
1656 }
1657 );
1658 }
1659
1660 #[fuchsia::test]
1661 async fn test_close() {
1662 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1663
1664 let mut server = std::pin::pin!(
1665 async {
1666 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1667 block_server.handle_requests(stream).await.unwrap();
1668 }
1669 .fuse()
1670 );
1671
1672 let mut client = std::pin::pin!(
1673 async {
1674 let (session_proxy, server) = fidl::endpoints::create_proxy();
1675
1676 proxy.open_session(server).unwrap();
1677
1678 std::mem::drop(proxy);
1681
1682 session_proxy.close().await.unwrap().unwrap();
1683
1684 let _: () = std::future::pending().await;
1686 }
1687 .fuse()
1688 );
1689
1690 futures::select!(
1691 _ = server => {}
1692 _ = client => unreachable!(),
1693 );
1694 }
1695
1696 #[derive(Default)]
1697 struct IoMockInterface {
1698 do_checks: bool,
1699 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1700 return_errors: bool,
1701 }
1702
1703 #[derive(Debug)]
1704 enum ExpectedOp {
1705 Read(u64, u32, u64),
1706 Write(u64, u32, u64),
1707 Trim(u64, u32),
1708 Flush,
1709 }
1710
1711 impl super::async_interface::Interface for IoMockInterface {
1712 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1713 Ok(())
1714 }
1715
1716 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1717 Cow::Owned(test_device_info())
1718 }
1719
1720 async fn read(
1721 &self,
1722 device_block_offset: u64,
1723 block_count: u32,
1724 _vmo: &Arc<zx::Vmo>,
1725 vmo_offset: u64,
1726 _opts: ReadOptions,
1727 _trace_flow_id: TraceFlowId,
1728 ) -> Result<(), zx::Status> {
1729 if self.return_errors {
1730 Err(zx::Status::INTERNAL)
1731 } else {
1732 if self.do_checks {
1733 assert_matches!(
1734 self.expected_op.lock().take(),
1735 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1736 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1737 "Read {device_block_offset} {block_count} {vmo_offset}"
1738 );
1739 }
1740 Ok(())
1741 }
1742 }
1743
1744 async fn write(
1745 &self,
1746 device_block_offset: u64,
1747 block_count: u32,
1748 _vmo: &Arc<zx::Vmo>,
1749 vmo_offset: u64,
1750 _write_opts: WriteOptions,
1751 _trace_flow_id: TraceFlowId,
1752 ) -> Result<(), zx::Status> {
1753 if self.return_errors {
1754 Err(zx::Status::NOT_SUPPORTED)
1755 } else {
1756 if self.do_checks {
1757 assert_matches!(
1758 self.expected_op.lock().take(),
1759 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1760 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1761 "Write {device_block_offset} {block_count} {vmo_offset}"
1762 );
1763 }
1764 Ok(())
1765 }
1766 }
1767
1768 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1769 if self.return_errors {
1770 Err(zx::Status::NO_RESOURCES)
1771 } else {
1772 if self.do_checks {
1773 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1774 }
1775 Ok(())
1776 }
1777 }
1778
1779 async fn trim(
1780 &self,
1781 device_block_offset: u64,
1782 block_count: u32,
1783 _trace_flow_id: TraceFlowId,
1784 ) -> Result<(), zx::Status> {
1785 if self.return_errors {
1786 Err(zx::Status::NO_MEMORY)
1787 } else {
1788 if self.do_checks {
1789 assert_matches!(
1790 self.expected_op.lock().take(),
1791 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1792 block_count == b,
1793 "Trim {device_block_offset} {block_count}"
1794 );
1795 }
1796 Ok(())
1797 }
1798 }
1799 }
1800
1801 #[fuchsia::test]
1802 async fn test_io() {
1803 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1804
1805 let expected_op = Arc::new(Mutex::new(None));
1806 let expected_op_clone = expected_op.clone();
1807
1808 let server = async {
1809 let block_server = BlockServer::new(
1810 BLOCK_SIZE,
1811 Arc::new(IoMockInterface {
1812 return_errors: false,
1813 do_checks: true,
1814 expected_op: expected_op_clone,
1815 }),
1816 );
1817 block_server.handle_requests(stream).await.unwrap();
1818 };
1819
1820 let client = async move {
1821 let (session_proxy, server) = fidl::endpoints::create_proxy();
1822
1823 proxy.open_session(server).unwrap();
1824
1825 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1826 let vmo_id = session_proxy
1827 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1828 .await
1829 .unwrap()
1830 .unwrap();
1831
1832 let mut fifo =
1833 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1834 let (mut reader, mut writer) = fifo.async_io();
1835
1836 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1838 writer
1839 .write_entries(&BlockFifoRequest {
1840 command: BlockFifoCommand {
1841 opcode: BlockOpcode::Read.into_primitive(),
1842 ..Default::default()
1843 },
1844 vmoid: vmo_id.id,
1845 dev_offset: 1,
1846 length: 2,
1847 vmo_offset: 3,
1848 ..Default::default()
1849 })
1850 .await
1851 .unwrap();
1852
1853 let mut response = BlockFifoResponse::default();
1854 reader.read_entries(&mut response).await.unwrap();
1855 assert_eq!(response.status, zx::sys::ZX_OK);
1856
1857 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1859 writer
1860 .write_entries(&BlockFifoRequest {
1861 command: BlockFifoCommand {
1862 opcode: BlockOpcode::Write.into_primitive(),
1863 ..Default::default()
1864 },
1865 vmoid: vmo_id.id,
1866 dev_offset: 4,
1867 length: 5,
1868 vmo_offset: 6,
1869 ..Default::default()
1870 })
1871 .await
1872 .unwrap();
1873
1874 let mut response = BlockFifoResponse::default();
1875 reader.read_entries(&mut response).await.unwrap();
1876 assert_eq!(response.status, zx::sys::ZX_OK);
1877
1878 *expected_op.lock() = Some(ExpectedOp::Flush);
1880 writer
1881 .write_entries(&BlockFifoRequest {
1882 command: BlockFifoCommand {
1883 opcode: BlockOpcode::Flush.into_primitive(),
1884 ..Default::default()
1885 },
1886 ..Default::default()
1887 })
1888 .await
1889 .unwrap();
1890
1891 reader.read_entries(&mut response).await.unwrap();
1892 assert_eq!(response.status, zx::sys::ZX_OK);
1893
1894 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1896 writer
1897 .write_entries(&BlockFifoRequest {
1898 command: BlockFifoCommand {
1899 opcode: BlockOpcode::Trim.into_primitive(),
1900 ..Default::default()
1901 },
1902 dev_offset: 7,
1903 length: 8,
1904 ..Default::default()
1905 })
1906 .await
1907 .unwrap();
1908
1909 reader.read_entries(&mut response).await.unwrap();
1910 assert_eq!(response.status, zx::sys::ZX_OK);
1911
1912 std::mem::drop(proxy);
1913 };
1914
1915 futures::join!(server, client);
1916 }
1917
1918 #[fuchsia::test]
1919 async fn test_io_errors() {
1920 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1921
1922 futures::join!(
1923 async {
1924 let block_server = BlockServer::new(
1925 BLOCK_SIZE,
1926 Arc::new(IoMockInterface {
1927 return_errors: true,
1928 do_checks: false,
1929 expected_op: Arc::new(Mutex::new(None)),
1930 }),
1931 );
1932 block_server.handle_requests(stream).await.unwrap();
1933 },
1934 async move {
1935 let (session_proxy, server) = fidl::endpoints::create_proxy();
1936
1937 proxy.open_session(server).unwrap();
1938
1939 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1940 let vmo_id = session_proxy
1941 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1942 .await
1943 .unwrap()
1944 .unwrap();
1945
1946 let mut fifo =
1947 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1948 let (mut reader, mut writer) = fifo.async_io();
1949
1950 writer
1952 .write_entries(&BlockFifoRequest {
1953 command: BlockFifoCommand {
1954 opcode: BlockOpcode::Read.into_primitive(),
1955 ..Default::default()
1956 },
1957 vmoid: vmo_id.id,
1958 length: 1,
1959 reqid: 1,
1960 ..Default::default()
1961 })
1962 .await
1963 .unwrap();
1964
1965 let mut response = BlockFifoResponse::default();
1966 reader.read_entries(&mut response).await.unwrap();
1967 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1968
1969 writer
1971 .write_entries(&BlockFifoRequest {
1972 command: BlockFifoCommand {
1973 opcode: BlockOpcode::Write.into_primitive(),
1974 ..Default::default()
1975 },
1976 vmoid: vmo_id.id,
1977 length: 1,
1978 reqid: 2,
1979 ..Default::default()
1980 })
1981 .await
1982 .unwrap();
1983
1984 reader.read_entries(&mut response).await.unwrap();
1985 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1986
1987 writer
1989 .write_entries(&BlockFifoRequest {
1990 command: BlockFifoCommand {
1991 opcode: BlockOpcode::Flush.into_primitive(),
1992 ..Default::default()
1993 },
1994 reqid: 3,
1995 ..Default::default()
1996 })
1997 .await
1998 .unwrap();
1999
2000 reader.read_entries(&mut response).await.unwrap();
2001 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2002
2003 writer
2005 .write_entries(&BlockFifoRequest {
2006 command: BlockFifoCommand {
2007 opcode: BlockOpcode::Trim.into_primitive(),
2008 ..Default::default()
2009 },
2010 reqid: 4,
2011 length: 1,
2012 ..Default::default()
2013 })
2014 .await
2015 .unwrap();
2016
2017 reader.read_entries(&mut response).await.unwrap();
2018 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2019
2020 std::mem::drop(proxy);
2021 }
2022 );
2023 }
2024
2025 #[fuchsia::test]
2026 async fn test_invalid_args() {
2027 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2028
2029 futures::join!(
2030 async {
2031 let block_server = BlockServer::new(
2032 BLOCK_SIZE,
2033 Arc::new(IoMockInterface {
2034 return_errors: false,
2035 do_checks: false,
2036 expected_op: Arc::new(Mutex::new(None)),
2037 }),
2038 );
2039 block_server.handle_requests(stream).await.unwrap();
2040 },
2041 async move {
2042 let (session_proxy, server) = fidl::endpoints::create_proxy();
2043
2044 proxy.open_session(server).unwrap();
2045
2046 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2047 let vmo_id = session_proxy
2048 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2049 .await
2050 .unwrap()
2051 .unwrap();
2052
2053 let mut fifo =
2054 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2055
2056 async fn test(
2057 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2058 request: BlockFifoRequest,
2059 ) -> Result<(), zx::Status> {
2060 let (mut reader, mut writer) = fifo.async_io();
2061 writer.write_entries(&request).await.unwrap();
2062 let mut response = BlockFifoResponse::default();
2063 reader.read_entries(&mut response).await.unwrap();
2064 zx::Status::ok(response.status)
2065 }
2066
2067 let good_read_request = || BlockFifoRequest {
2070 command: BlockFifoCommand {
2071 opcode: BlockOpcode::Read.into_primitive(),
2072 ..Default::default()
2073 },
2074 length: 1,
2075 vmoid: vmo_id.id,
2076 ..Default::default()
2077 };
2078
2079 assert_eq!(
2080 test(
2081 &mut fifo,
2082 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2083 )
2084 .await,
2085 Err(zx::Status::IO)
2086 );
2087
2088 assert_eq!(
2089 test(
2090 &mut fifo,
2091 BlockFifoRequest {
2092 vmo_offset: 0xffff_ffff_ffff_ffff,
2093 ..good_read_request()
2094 }
2095 )
2096 .await,
2097 Err(zx::Status::OUT_OF_RANGE)
2098 );
2099
2100 assert_eq!(
2101 test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2102 Err(zx::Status::INVALID_ARGS)
2103 );
2104
2105 let good_write_request = || BlockFifoRequest {
2108 command: BlockFifoCommand {
2109 opcode: BlockOpcode::Write.into_primitive(),
2110 ..Default::default()
2111 },
2112 length: 1,
2113 vmoid: vmo_id.id,
2114 ..Default::default()
2115 };
2116
2117 assert_eq!(
2118 test(
2119 &mut fifo,
2120 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2121 )
2122 .await,
2123 Err(zx::Status::IO)
2124 );
2125
2126 assert_eq!(
2127 test(
2128 &mut fifo,
2129 BlockFifoRequest {
2130 vmo_offset: 0xffff_ffff_ffff_ffff,
2131 ..good_write_request()
2132 }
2133 )
2134 .await,
2135 Err(zx::Status::OUT_OF_RANGE)
2136 );
2137
2138 assert_eq!(
2139 test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2140 Err(zx::Status::INVALID_ARGS)
2141 );
2142
2143 assert_eq!(
2146 test(
2147 &mut fifo,
2148 BlockFifoRequest {
2149 command: BlockFifoCommand {
2150 opcode: BlockOpcode::CloseVmo.into_primitive(),
2151 ..Default::default()
2152 },
2153 vmoid: vmo_id.id + 1,
2154 ..Default::default()
2155 }
2156 )
2157 .await,
2158 Err(zx::Status::IO)
2159 );
2160
2161 std::mem::drop(proxy);
2162 }
2163 );
2164 }
2165
2166 #[fuchsia::test]
2167 async fn test_concurrent_requests() {
2168 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2169
2170 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2171 let waiting_readers_clone = waiting_readers.clone();
2172
2173 futures::join!(
2174 async move {
2175 let block_server = BlockServer::new(
2176 BLOCK_SIZE,
2177 Arc::new(MockInterface {
2178 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2179 let (tx, rx) = oneshot::channel();
2180 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2181 Box::pin(async move {
2182 let _ = rx.await;
2183 Ok(())
2184 })
2185 })),
2186 ..MockInterface::default()
2187 }),
2188 );
2189 block_server.handle_requests(stream).await.unwrap();
2190 },
2191 async move {
2192 let (session_proxy, server) = fidl::endpoints::create_proxy();
2193
2194 proxy.open_session(server).unwrap();
2195
2196 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2197 let vmo_id = session_proxy
2198 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2199 .await
2200 .unwrap()
2201 .unwrap();
2202
2203 let mut fifo =
2204 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2205 let (mut reader, mut writer) = fifo.async_io();
2206
2207 writer
2208 .write_entries(&BlockFifoRequest {
2209 command: BlockFifoCommand {
2210 opcode: BlockOpcode::Read.into_primitive(),
2211 ..Default::default()
2212 },
2213 reqid: 1,
2214 dev_offset: 1, vmoid: vmo_id.id,
2216 length: 1,
2217 ..Default::default()
2218 })
2219 .await
2220 .unwrap();
2221
2222 writer
2223 .write_entries(&BlockFifoRequest {
2224 command: BlockFifoCommand {
2225 opcode: BlockOpcode::Read.into_primitive(),
2226 ..Default::default()
2227 },
2228 reqid: 2,
2229 dev_offset: 2,
2230 vmoid: vmo_id.id,
2231 length: 1,
2232 ..Default::default()
2233 })
2234 .await
2235 .unwrap();
2236
2237 poll_fn(|cx: &mut Context<'_>| {
2239 if waiting_readers.lock().len() == 2 {
2240 Poll::Ready(())
2241 } else {
2242 cx.waker().wake_by_ref();
2244 Poll::Pending
2245 }
2246 })
2247 .await;
2248
2249 let mut response = BlockFifoResponse::default();
2250 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2251
2252 let (id, tx) = waiting_readers.lock().pop().unwrap();
2253 tx.send(()).unwrap();
2254
2255 reader.read_entries(&mut response).await.unwrap();
2256 assert_eq!(response.status, zx::sys::ZX_OK);
2257 assert_eq!(response.reqid, id);
2258
2259 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2260
2261 let (id, tx) = waiting_readers.lock().pop().unwrap();
2262 tx.send(()).unwrap();
2263
2264 reader.read_entries(&mut response).await.unwrap();
2265 assert_eq!(response.status, zx::sys::ZX_OK);
2266 assert_eq!(response.reqid, id);
2267 }
2268 );
2269 }
2270
2271 #[fuchsia::test]
2272 async fn test_groups() {
2273 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2274
2275 futures::join!(
2276 async move {
2277 let block_server = BlockServer::new(
2278 BLOCK_SIZE,
2279 Arc::new(MockInterface {
2280 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2281 ..MockInterface::default()
2282 }),
2283 );
2284 block_server.handle_requests(stream).await.unwrap();
2285 },
2286 async move {
2287 let (session_proxy, server) = fidl::endpoints::create_proxy();
2288
2289 proxy.open_session(server).unwrap();
2290
2291 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2292 let vmo_id = session_proxy
2293 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2294 .await
2295 .unwrap()
2296 .unwrap();
2297
2298 let mut fifo =
2299 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2300 let (mut reader, mut writer) = fifo.async_io();
2301
2302 writer
2303 .write_entries(&BlockFifoRequest {
2304 command: BlockFifoCommand {
2305 opcode: BlockOpcode::Read.into_primitive(),
2306 flags: BlockIoFlag::GROUP_ITEM.bits(),
2307 ..Default::default()
2308 },
2309 group: 1,
2310 vmoid: vmo_id.id,
2311 length: 1,
2312 ..Default::default()
2313 })
2314 .await
2315 .unwrap();
2316
2317 writer
2318 .write_entries(&BlockFifoRequest {
2319 command: BlockFifoCommand {
2320 opcode: BlockOpcode::Read.into_primitive(),
2321 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2322 ..Default::default()
2323 },
2324 reqid: 2,
2325 group: 1,
2326 vmoid: vmo_id.id,
2327 length: 1,
2328 ..Default::default()
2329 })
2330 .await
2331 .unwrap();
2332
2333 let mut response = BlockFifoResponse::default();
2334 reader.read_entries(&mut response).await.unwrap();
2335 assert_eq!(response.status, zx::sys::ZX_OK);
2336 assert_eq!(response.reqid, 2);
2337 assert_eq!(response.group, 1);
2338 }
2339 );
2340 }
2341
2342 #[fuchsia::test]
2343 async fn test_group_error() {
2344 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2345
2346 let counter = Arc::new(AtomicU64::new(0));
2347 let counter_clone = counter.clone();
2348
2349 futures::join!(
2350 async move {
2351 let block_server = BlockServer::new(
2352 BLOCK_SIZE,
2353 Arc::new(MockInterface {
2354 read_hook: Some(Box::new(move |_, _, _, _| {
2355 counter_clone.fetch_add(1, Ordering::Relaxed);
2356 Box::pin(async { Err(zx::Status::BAD_STATE) })
2357 })),
2358 ..MockInterface::default()
2359 }),
2360 );
2361 block_server.handle_requests(stream).await.unwrap();
2362 },
2363 async move {
2364 let (session_proxy, server) = fidl::endpoints::create_proxy();
2365
2366 proxy.open_session(server).unwrap();
2367
2368 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2369 let vmo_id = session_proxy
2370 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2371 .await
2372 .unwrap()
2373 .unwrap();
2374
2375 let mut fifo =
2376 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2377 let (mut reader, mut writer) = fifo.async_io();
2378
2379 writer
2380 .write_entries(&BlockFifoRequest {
2381 command: BlockFifoCommand {
2382 opcode: BlockOpcode::Read.into_primitive(),
2383 flags: BlockIoFlag::GROUP_ITEM.bits(),
2384 ..Default::default()
2385 },
2386 group: 1,
2387 vmoid: vmo_id.id,
2388 length: 1,
2389 ..Default::default()
2390 })
2391 .await
2392 .unwrap();
2393
2394 poll_fn(|cx: &mut Context<'_>| {
2396 if counter.load(Ordering::Relaxed) == 1 {
2397 Poll::Ready(())
2398 } else {
2399 cx.waker().wake_by_ref();
2401 Poll::Pending
2402 }
2403 })
2404 .await;
2405
2406 let mut response = BlockFifoResponse::default();
2407 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2408
2409 writer
2410 .write_entries(&BlockFifoRequest {
2411 command: BlockFifoCommand {
2412 opcode: BlockOpcode::Read.into_primitive(),
2413 flags: BlockIoFlag::GROUP_ITEM.bits(),
2414 ..Default::default()
2415 },
2416 group: 1,
2417 vmoid: vmo_id.id,
2418 length: 1,
2419 ..Default::default()
2420 })
2421 .await
2422 .unwrap();
2423
2424 writer
2425 .write_entries(&BlockFifoRequest {
2426 command: BlockFifoCommand {
2427 opcode: BlockOpcode::Read.into_primitive(),
2428 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2429 ..Default::default()
2430 },
2431 reqid: 2,
2432 group: 1,
2433 vmoid: vmo_id.id,
2434 length: 1,
2435 ..Default::default()
2436 })
2437 .await
2438 .unwrap();
2439
2440 reader.read_entries(&mut response).await.unwrap();
2441 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2442 assert_eq!(response.reqid, 2);
2443 assert_eq!(response.group, 1);
2444
2445 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2446
2447 assert_eq!(counter.load(Ordering::Relaxed), 1);
2449 }
2450 );
2451 }
2452
2453 #[fuchsia::test]
2454 async fn test_group_with_two_lasts() {
2455 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2456
2457 let (tx, rx) = oneshot::channel();
2458
2459 futures::join!(
2460 async move {
2461 let rx = Mutex::new(Some(rx));
2462 let block_server = BlockServer::new(
2463 BLOCK_SIZE,
2464 Arc::new(MockInterface {
2465 read_hook: Some(Box::new(move |_, _, _, _| {
2466 let rx = rx.lock().take().unwrap();
2467 Box::pin(async {
2468 let _ = rx.await;
2469 Ok(())
2470 })
2471 })),
2472 ..MockInterface::default()
2473 }),
2474 );
2475 block_server.handle_requests(stream).await.unwrap();
2476 },
2477 async move {
2478 let (session_proxy, server) = fidl::endpoints::create_proxy();
2479
2480 proxy.open_session(server).unwrap();
2481
2482 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2483 let vmo_id = session_proxy
2484 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2485 .await
2486 .unwrap()
2487 .unwrap();
2488
2489 let mut fifo =
2490 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2491 let (mut reader, mut writer) = fifo.async_io();
2492
2493 writer
2494 .write_entries(&BlockFifoRequest {
2495 command: BlockFifoCommand {
2496 opcode: BlockOpcode::Read.into_primitive(),
2497 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2498 ..Default::default()
2499 },
2500 reqid: 1,
2501 group: 1,
2502 vmoid: vmo_id.id,
2503 length: 1,
2504 ..Default::default()
2505 })
2506 .await
2507 .unwrap();
2508
2509 writer
2510 .write_entries(&BlockFifoRequest {
2511 command: BlockFifoCommand {
2512 opcode: BlockOpcode::Read.into_primitive(),
2513 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2514 ..Default::default()
2515 },
2516 reqid: 2,
2517 group: 1,
2518 vmoid: vmo_id.id,
2519 length: 1,
2520 ..Default::default()
2521 })
2522 .await
2523 .unwrap();
2524
2525 writer
2527 .write_entries(&BlockFifoRequest {
2528 command: BlockFifoCommand {
2529 opcode: BlockOpcode::CloseVmo.into_primitive(),
2530 ..Default::default()
2531 },
2532 reqid: 3,
2533 vmoid: vmo_id.id,
2534 ..Default::default()
2535 })
2536 .await
2537 .unwrap();
2538
2539 let mut response = BlockFifoResponse::default();
2541 reader.read_entries(&mut response).await.unwrap();
2542 assert_eq!(response.status, zx::sys::ZX_OK);
2543 assert_eq!(response.reqid, 3);
2544
2545 tx.send(()).unwrap();
2547
2548 let mut response = BlockFifoResponse::default();
2551 reader.read_entries(&mut response).await.unwrap();
2552 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2553 assert_eq!(response.reqid, 1);
2554 assert_eq!(response.group, 1);
2555 }
2556 );
2557 }
2558
2559 #[fuchsia::test(allow_stalls = false)]
2560 async fn test_requests_dont_block_sessions() {
2561 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2562
2563 let (tx, rx) = oneshot::channel();
2564
2565 fasync::Task::local(async move {
2566 let rx = Mutex::new(Some(rx));
2567 let block_server = BlockServer::new(
2568 BLOCK_SIZE,
2569 Arc::new(MockInterface {
2570 read_hook: Some(Box::new(move |_, _, _, _| {
2571 let rx = rx.lock().take().unwrap();
2572 Box::pin(async {
2573 let _ = rx.await;
2574 Ok(())
2575 })
2576 })),
2577 ..MockInterface::default()
2578 }),
2579 );
2580 block_server.handle_requests(stream).await.unwrap();
2581 })
2582 .detach();
2583
2584 let mut fut = pin!(async {
2585 let (session_proxy, server) = fidl::endpoints::create_proxy();
2586
2587 proxy.open_session(server).unwrap();
2588
2589 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2590 let vmo_id = session_proxy
2591 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2592 .await
2593 .unwrap()
2594 .unwrap();
2595
2596 let mut fifo =
2597 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2598 let (mut reader, mut writer) = fifo.async_io();
2599
2600 writer
2601 .write_entries(&BlockFifoRequest {
2602 command: BlockFifoCommand {
2603 opcode: BlockOpcode::Read.into_primitive(),
2604 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2605 ..Default::default()
2606 },
2607 reqid: 1,
2608 group: 1,
2609 vmoid: vmo_id.id,
2610 length: 1,
2611 ..Default::default()
2612 })
2613 .await
2614 .unwrap();
2615
2616 let mut response = BlockFifoResponse::default();
2617 reader.read_entries(&mut response).await.unwrap();
2618 assert_eq!(response.status, zx::sys::ZX_OK);
2619 });
2620
2621 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2623
2624 let mut fut2 = pin!(proxy.get_volume_info());
2625
2626 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2628
2629 let _ = tx.send(());
2632
2633 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2634 }
2635
2636 #[fuchsia::test]
2637 async fn test_request_flow_control() {
2638 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2639
2640 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2643 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2644 let event_clone = event.clone();
2645 futures::join!(
2646 async move {
2647 let block_server = BlockServer::new(
2648 BLOCK_SIZE,
2649 Arc::new(MockInterface {
2650 read_hook: Some(Box::new(move |_, _, _, _| {
2651 let event_clone = event_clone.clone();
2652 Box::pin(async move {
2653 if !event_clone.1.load(Ordering::SeqCst) {
2654 event_clone.0.listen().await;
2655 }
2656 Ok(())
2657 })
2658 })),
2659 ..MockInterface::default()
2660 }),
2661 );
2662 block_server.handle_requests(stream).await.unwrap();
2663 },
2664 async move {
2665 let (session_proxy, server) = fidl::endpoints::create_proxy();
2666
2667 proxy.open_session(server).unwrap();
2668
2669 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2670 let vmo_id = session_proxy
2671 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2672 .await
2673 .unwrap()
2674 .unwrap();
2675
2676 let mut fifo =
2677 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2678 let (mut reader, mut writer) = fifo.async_io();
2679
2680 for i in 0..MAX_REQUESTS {
2681 writer
2682 .write_entries(&BlockFifoRequest {
2683 command: BlockFifoCommand {
2684 opcode: BlockOpcode::Read.into_primitive(),
2685 ..Default::default()
2686 },
2687 reqid: (i + 1) as u32,
2688 dev_offset: i,
2689 vmoid: vmo_id.id,
2690 length: 1,
2691 ..Default::default()
2692 })
2693 .await
2694 .unwrap();
2695 }
2696 assert!(
2697 futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2698 command: BlockFifoCommand {
2699 opcode: BlockOpcode::Read.into_primitive(),
2700 ..Default::default()
2701 },
2702 reqid: u32::MAX,
2703 dev_offset: MAX_REQUESTS,
2704 vmoid: vmo_id.id,
2705 length: 1,
2706 ..Default::default()
2707 })))
2708 .is_pending()
2709 );
2710 event.1.store(true, Ordering::SeqCst);
2712 event.0.notify(usize::MAX);
2713 let mut finished_reqids = vec![];
2715 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2716 let mut response = BlockFifoResponse::default();
2717 reader.read_entries(&mut response).await.unwrap();
2718 assert_eq!(response.status, zx::sys::ZX_OK);
2719 finished_reqids.push(response.reqid);
2720 writer
2721 .write_entries(&BlockFifoRequest {
2722 command: BlockFifoCommand {
2723 opcode: BlockOpcode::Read.into_primitive(),
2724 ..Default::default()
2725 },
2726 reqid: (i + 1) as u32,
2727 dev_offset: i,
2728 vmoid: vmo_id.id,
2729 length: 1,
2730 ..Default::default()
2731 })
2732 .await
2733 .unwrap();
2734 }
2735 let mut response = BlockFifoResponse::default();
2736 for _ in 0..MAX_REQUESTS {
2737 reader.read_entries(&mut response).await.unwrap();
2738 assert_eq!(response.status, zx::sys::ZX_OK);
2739 finished_reqids.push(response.reqid);
2740 }
2741 finished_reqids.sort();
2744 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2745 let mut i = 1;
2746 for reqid in finished_reqids {
2747 assert_eq!(reqid, i);
2748 i += 1;
2749 }
2750 }
2751 );
2752 }
2753
2754 #[fuchsia::test]
2755 async fn test_passthrough_io_with_fixed_map() {
2756 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2757
2758 let expected_op = Arc::new(Mutex::new(None));
2759 let expected_op_clone = expected_op.clone();
2760 futures::join!(
2761 async {
2762 let block_server = BlockServer::new(
2763 BLOCK_SIZE,
2764 Arc::new(IoMockInterface {
2765 return_errors: false,
2766 do_checks: true,
2767 expected_op: expected_op_clone,
2768 }),
2769 );
2770 block_server.handle_requests(stream).await.unwrap();
2771 },
2772 async move {
2773 let (session_proxy, server) = fidl::endpoints::create_proxy();
2774
2775 let mapping = fblock::BlockOffsetMapping {
2776 source_block_offset: 0,
2777 target_block_offset: 10,
2778 length: 20,
2779 };
2780 proxy.open_session_with_offset_map(server, &mapping).unwrap();
2781
2782 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2783 let vmo_id = session_proxy
2784 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2785 .await
2786 .unwrap()
2787 .unwrap();
2788
2789 let mut fifo =
2790 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2791 let (mut reader, mut writer) = fifo.async_io();
2792
2793 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2795 writer
2796 .write_entries(&BlockFifoRequest {
2797 command: BlockFifoCommand {
2798 opcode: BlockOpcode::Read.into_primitive(),
2799 ..Default::default()
2800 },
2801 vmoid: vmo_id.id,
2802 dev_offset: 1,
2803 length: 2,
2804 vmo_offset: 3,
2805 ..Default::default()
2806 })
2807 .await
2808 .unwrap();
2809
2810 let mut response = BlockFifoResponse::default();
2811 reader.read_entries(&mut response).await.unwrap();
2812 assert_eq!(response.status, zx::sys::ZX_OK);
2813
2814 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2816 writer
2817 .write_entries(&BlockFifoRequest {
2818 command: BlockFifoCommand {
2819 opcode: BlockOpcode::Write.into_primitive(),
2820 ..Default::default()
2821 },
2822 vmoid: vmo_id.id,
2823 dev_offset: 4,
2824 length: 5,
2825 vmo_offset: 6,
2826 ..Default::default()
2827 })
2828 .await
2829 .unwrap();
2830
2831 reader.read_entries(&mut response).await.unwrap();
2832 assert_eq!(response.status, zx::sys::ZX_OK);
2833
2834 *expected_op.lock() = Some(ExpectedOp::Flush);
2836 writer
2837 .write_entries(&BlockFifoRequest {
2838 command: BlockFifoCommand {
2839 opcode: BlockOpcode::Flush.into_primitive(),
2840 ..Default::default()
2841 },
2842 ..Default::default()
2843 })
2844 .await
2845 .unwrap();
2846
2847 reader.read_entries(&mut response).await.unwrap();
2848 assert_eq!(response.status, zx::sys::ZX_OK);
2849
2850 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2852 writer
2853 .write_entries(&BlockFifoRequest {
2854 command: BlockFifoCommand {
2855 opcode: BlockOpcode::Trim.into_primitive(),
2856 ..Default::default()
2857 },
2858 dev_offset: 7,
2859 length: 3,
2860 ..Default::default()
2861 })
2862 .await
2863 .unwrap();
2864
2865 reader.read_entries(&mut response).await.unwrap();
2866 assert_eq!(response.status, zx::sys::ZX_OK);
2867
2868 *expected_op.lock() = None;
2870 writer
2871 .write_entries(&BlockFifoRequest {
2872 command: BlockFifoCommand {
2873 opcode: BlockOpcode::Read.into_primitive(),
2874 ..Default::default()
2875 },
2876 vmoid: vmo_id.id,
2877 dev_offset: 19,
2878 length: 2,
2879 vmo_offset: 3,
2880 ..Default::default()
2881 })
2882 .await
2883 .unwrap();
2884
2885 reader.read_entries(&mut response).await.unwrap();
2886 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2887
2888 std::mem::drop(proxy);
2889 }
2890 );
2891 }
2892
2893 #[fuchsia::test]
2894 fn operation_map() {
2895 fn expect_map_result(
2896 mut operation: Operation,
2897 mapping: Option<BlockOffsetMapping>,
2898 max_blocks: Option<NonZero<u32>>,
2899 expected_operations: Vec<Operation>,
2900 ) {
2901 let mut ops = vec![];
2902 while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
2903 ops.push(operation);
2904 operation = remainder;
2905 }
2906 ops.push(operation);
2907 assert_eq!(ops, expected_operations);
2908 }
2909
2910 expect_map_result(
2912 Operation::Read {
2913 device_block_offset: 10,
2914 block_count: 200,
2915 _unused: 0,
2916 vmo_offset: 0,
2917 options: ReadOptions::default(),
2918 },
2919 None,
2920 None,
2921 vec![Operation::Read {
2922 device_block_offset: 10,
2923 block_count: 200,
2924 _unused: 0,
2925 vmo_offset: 0,
2926 options: ReadOptions::default(),
2927 }],
2928 );
2929
2930 expect_map_result(
2932 Operation::Read {
2933 device_block_offset: 10,
2934 block_count: 200,
2935 _unused: 0,
2936 vmo_offset: 0,
2937 options: ReadOptions::default(),
2938 },
2939 None,
2940 NonZero::new(120),
2941 vec![
2942 Operation::Read {
2943 device_block_offset: 10,
2944 block_count: 120,
2945 _unused: 0,
2946 vmo_offset: 0,
2947 options: ReadOptions::default(),
2948 },
2949 Operation::Read {
2950 device_block_offset: 130,
2951 block_count: 80,
2952 _unused: 0,
2953 vmo_offset: 120 * 512,
2954 options: ReadOptions::default(),
2955 },
2956 ],
2957 );
2958 expect_map_result(
2959 Operation::Trim { device_block_offset: 10, block_count: 200 },
2960 None,
2961 NonZero::new(120),
2962 vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2963 );
2964
2965 expect_map_result(
2967 Operation::Read {
2968 device_block_offset: 10,
2969 block_count: 200,
2970 _unused: 0,
2971 vmo_offset: 0,
2972 options: ReadOptions::default(),
2973 },
2974 Some(BlockOffsetMapping {
2975 source_block_offset: 10,
2976 target_block_offset: 100,
2977 length: 200,
2978 }),
2979 NonZero::new(120),
2980 vec![
2981 Operation::Read {
2982 device_block_offset: 100,
2983 block_count: 120,
2984 _unused: 0,
2985 vmo_offset: 0,
2986 options: ReadOptions::default(),
2987 },
2988 Operation::Read {
2989 device_block_offset: 220,
2990 block_count: 80,
2991 _unused: 0,
2992 vmo_offset: 120 * 512,
2993 options: ReadOptions::default(),
2994 },
2995 ],
2996 );
2997 expect_map_result(
2998 Operation::Trim { device_block_offset: 10, block_count: 200 },
2999 Some(BlockOffsetMapping {
3000 source_block_offset: 10,
3001 target_block_offset: 100,
3002 length: 200,
3003 }),
3004 NonZero::new(120),
3005 vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3006 );
3007 }
3008
3009 #[fuchsia::test]
3011 async fn test_pre_barrier_flush_failure() {
3012 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
3013
3014 struct NoBarrierInterface;
3015 impl super::async_interface::Interface for NoBarrierInterface {
3016 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3017 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3018 device_flags: fblock::Flag::empty(), max_transfer_blocks: NonZero::new(100),
3020 block_range: Some(0..100),
3021 type_guid: [0; 16],
3022 instance_guid: [0; 16],
3023 name: "test".to_string(),
3024 flags: 0,
3025 }))
3026 }
3027 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3028 Ok(())
3029 }
3030 async fn read(
3031 &self,
3032 _: u64,
3033 _: u32,
3034 _: &Arc<zx::Vmo>,
3035 _: u64,
3036 _: ReadOptions,
3037 _: TraceFlowId,
3038 ) -> Result<(), zx::Status> {
3039 unreachable!()
3040 }
3041 async fn write(
3042 &self,
3043 _: u64,
3044 _: u32,
3045 _: &Arc<zx::Vmo>,
3046 _: u64,
3047 _: WriteOptions,
3048 _: TraceFlowId,
3049 ) -> Result<(), zx::Status> {
3050 panic!("Write should not be called");
3051 }
3052 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3053 Err(zx::Status::IO)
3054 }
3055 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3056 unreachable!()
3057 }
3058 }
3059
3060 futures::join!(
3061 async move {
3062 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3063 block_server.handle_requests(stream).await.unwrap();
3064 },
3065 async move {
3066 let (session_proxy, server) = fidl::endpoints::create_proxy();
3067 proxy.open_session(server).unwrap();
3068 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3069 let vmo_id = session_proxy
3070 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3071 .await
3072 .unwrap()
3073 .unwrap();
3074
3075 let mut fifo =
3076 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3077 let (mut reader, mut writer) = fifo.async_io();
3078
3079 writer
3080 .write_entries(&BlockFifoRequest {
3081 command: BlockFifoCommand {
3082 opcode: BlockOpcode::Write.into_primitive(),
3083 flags: BlockIoFlag::PRE_BARRIER.bits(),
3084 ..Default::default()
3085 },
3086 vmoid: vmo_id.id,
3087 length: 1,
3088 ..Default::default()
3089 })
3090 .await
3091 .unwrap();
3092
3093 let mut response = BlockFifoResponse::default();
3094 reader.read_entries(&mut response).await.unwrap();
3095 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3096 }
3097 );
3098 }
3099
3100 #[fuchsia::test]
3103 async fn test_post_barrier_write_failure() {
3104 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
3105
3106 struct NoBarrierInterface;
3107 impl super::async_interface::Interface for NoBarrierInterface {
3108 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3109 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3110 device_flags: fblock::Flag::empty(), max_transfer_blocks: NonZero::new(100),
3112 block_range: Some(0..100),
3113 type_guid: [0; 16],
3114 instance_guid: [0; 16],
3115 name: "test".to_string(),
3116 flags: 0,
3117 }))
3118 }
3119 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3120 Ok(())
3121 }
3122 async fn read(
3123 &self,
3124 _: u64,
3125 _: u32,
3126 _: &Arc<zx::Vmo>,
3127 _: u64,
3128 _: ReadOptions,
3129 _: TraceFlowId,
3130 ) -> Result<(), zx::Status> {
3131 unreachable!()
3132 }
3133 async fn write(
3134 &self,
3135 _: u64,
3136 _: u32,
3137 _: &Arc<zx::Vmo>,
3138 _: u64,
3139 _: WriteOptions,
3140 _: TraceFlowId,
3141 ) -> Result<(), zx::Status> {
3142 Err(zx::Status::IO)
3143 }
3144 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3145 panic!("Flush should not be called")
3146 }
3147 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3148 unreachable!()
3149 }
3150 }
3151
3152 futures::join!(
3153 async move {
3154 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3155 block_server.handle_requests(stream).await.unwrap();
3156 },
3157 async move {
3158 let (session_proxy, server) = fidl::endpoints::create_proxy();
3159 proxy.open_session(server).unwrap();
3160 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3161 let vmo_id = session_proxy
3162 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3163 .await
3164 .unwrap()
3165 .unwrap();
3166
3167 let mut fifo =
3168 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3169 let (mut reader, mut writer) = fifo.async_io();
3170
3171 writer
3172 .write_entries(&BlockFifoRequest {
3173 command: BlockFifoCommand {
3174 opcode: BlockOpcode::Write.into_primitive(),
3175 flags: BlockIoFlag::FORCE_ACCESS.bits(),
3176 ..Default::default()
3177 },
3178 vmoid: vmo_id.id,
3179 length: 1,
3180 ..Default::default()
3181 })
3182 .await
3183 .unwrap();
3184
3185 let mut response = BlockFifoResponse::default();
3186 reader.read_entries(&mut response).await.unwrap();
3187 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3188 }
3189 );
3190 }
3191}