1use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fblock::{BlockIoFlag, BlockOpcode, MAX_TRANSFER_UNBOUNDED};
7use fuchsia_async::epoch::{Epoch, EpochGuard};
8use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
9use futures::{Future, FutureExt as _, TryStreamExt as _};
10use slab::Slab;
11use std::borrow::{Borrow, Cow};
12use std::collections::BTreeMap;
13use std::num::NonZero;
14use std::ops::Range;
15use std::sync::Arc;
16use std::sync::atomic::AtomicU64;
17use storage_device::buffer::Buffer;
18use zx::HandleBased;
19use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
20
21pub mod async_interface;
22pub mod c_interface;
23pub mod callback_interface;
24
25#[cfg(test)]
26mod decompression_tests;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34 Block(BlockInfo),
35 Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39 pub fn label(&self) -> &str {
40 match self {
41 Self::Block(BlockInfo { .. }) => "",
42 Self::Partition(PartitionInfo { name, .. }) => name,
43 }
44 }
45 pub fn device_flags(&self) -> fblock::DeviceFlag {
46 match self {
47 Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
48 Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
49 }
50 }
51
52 pub fn block_count(&self) -> Option<u64> {
53 match self {
54 Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
55 Self::Partition(PartitionInfo { block_range, .. }) => {
56 block_range.as_ref().map(|range| range.end - range.start)
57 }
58 }
59 }
60
61 pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
62 match self {
63 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
64 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
65 max_transfer_blocks.clone()
66 }
67 }
68 }
69
70 fn max_transfer_size(&self, block_size: u32) -> u32 {
71 if let Some(max_blocks) = self.max_transfer_blocks() {
72 max_blocks.get() * block_size
73 } else {
74 MAX_TRANSFER_UNBOUNDED
75 }
76 }
77}
78
79#[derive(Clone, Default)]
81pub struct BlockInfo {
82 pub device_flags: fblock::DeviceFlag,
83 pub block_count: u64,
84 pub max_transfer_blocks: Option<NonZero<u32>>,
85}
86
87#[derive(Clone, Default)]
89pub struct PartitionInfo {
90 pub device_flags: fblock::DeviceFlag,
92 pub max_transfer_blocks: Option<NonZero<u32>>,
93 pub block_range: Option<Range<u64>>,
97 pub type_guid: [u8; 16],
98 pub instance_guid: [u8; 16],
99 pub name: String,
100 pub flags: u64,
101}
102
103struct ActiveRequest<S> {
106 session: S,
107 group_or_request: GroupOrRequest,
108 trace_flow_id: TraceFlowId,
109 _epoch_guard: EpochGuard<'static>,
110 status: zx::Status,
111 count: u32,
112 req_id: Option<u32>,
113 decompression_info: Option<DecompressionInfo>,
114}
115
116struct DecompressionInfo {
117 compressed_range: Range<usize>,
119
120 uncompressed_range: Range<u64>,
122
123 bytes_so_far: u64,
124 mapping: Arc<VmoMapping>,
125 buffer: Option<Buffer<'static>>,
126}
127
128impl DecompressionInfo {
129 fn uncompressed_slice(&self) -> *mut [u8] {
131 std::ptr::slice_from_raw_parts_mut(
132 (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
133 (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
134 )
135 }
136}
137
138pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
139
140impl<S> Default for ActiveRequests<S> {
141 fn default() -> Self {
142 Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
143 }
144}
145
146impl<S> ActiveRequests<S> {
147 fn complete_and_take_response(
148 &self,
149 request_id: RequestId,
150 status: zx::Status,
151 ) -> Option<(S, BlockFifoResponse)> {
152 self.0.lock().complete_and_take_response(request_id, status)
153 }
154
155 fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
156 MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
157 }
158}
159
160struct ActiveRequestsInner<S> {
161 requests: Slab<ActiveRequest<S>>,
162}
163
164impl<S> ActiveRequestsInner<S> {
166 fn complete(&mut self, request_id: RequestId, status: zx::Status) {
168 let group = &mut self.requests[request_id.0];
169
170 group.count = group.count.checked_sub(1).unwrap();
171 if status != zx::Status::OK && group.status == zx::Status::OK {
172 group.status = status
173 }
174
175 fuchsia_trace::duration!(
176 "storage",
177 "block_server::finish_transaction",
178 "request_id" => request_id.0,
179 "group_completed" => group.count == 0,
180 "status" => status.into_raw());
181 if let Some(trace_flow_id) = group.trace_flow_id {
182 fuchsia_trace::flow_step!(
183 "storage",
184 "block_server::finish_request",
185 trace_flow_id.get().into()
186 );
187 }
188
189 if group.count == 0
190 && group.status == zx::Status::OK
191 && let Some(info) = &mut group.decompression_info
192 {
193 thread_local! {
194 static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
195 std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
196 }
197 DECOMPRESSOR.with(|decompressor| {
198 let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
200 let mut decompressor = decompressor.borrow_mut();
201 if let Err(error) = decompressor.decompress_to_buffer(
202 &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
203 target_slice,
204 ) {
205 log::warn!(error:?; "Decompression error");
206 group.status = zx::Status::IO_DATA_INTEGRITY;
207 };
208 });
209 }
210 }
211
212 fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
214 let group = &self.requests[request_id.0];
215 match group.req_id {
216 Some(reqid) if group.count == 0 => {
217 let group = self.requests.remove(request_id.0);
218 Some((
219 group.session,
220 BlockFifoResponse {
221 status: group.status.into_raw(),
222 reqid,
223 group: group.group_or_request.group_id().unwrap_or(0),
224 ..Default::default()
225 },
226 ))
227 }
228 _ => None,
229 }
230 }
231
232 fn complete_and_take_response(
234 &mut self,
235 request_id: RequestId,
236 status: zx::Status,
237 ) -> Option<(S, BlockFifoResponse)> {
238 self.complete(request_id, status);
239 self.take_response(request_id)
240 }
241}
242
243pub struct BlockServer<SM: SessionManager> {
246 block_size: u32,
247 orchestrator: Arc<SM::Orchestrator>,
248}
249
250#[derive(Debug)]
252pub struct BlockOffsetMapping {
253 source_block_offset: u64,
254 target_block_offset: u64,
255 length: u64,
256}
257
258impl BlockOffsetMapping {
259 fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
260 blocks.0 >= self.source_block_offset
261 && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
262 }
263}
264
265impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
266 type Error = zx::Status;
267
268 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
269 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
270 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
271 Ok(Self {
272 source_block_offset: wire.source_block_offset,
273 target_block_offset: wire.target_block_offset,
274 length: wire.length,
275 })
276 }
277}
278
279pub struct OffsetMap {
281 mapping: Option<BlockOffsetMapping>,
282 max_transfer_blocks: Option<NonZero<u32>>,
283}
284
285impl OffsetMap {
286 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
288 Self { mapping: Some(mapping), max_transfer_blocks }
289 }
290
291 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
293 Self { mapping: None, max_transfer_blocks }
294 }
295
296 pub fn is_empty(&self) -> bool {
297 self.mapping.is_none()
298 }
299
300 fn mapping(&self) -> Option<&BlockOffsetMapping> {
301 self.mapping.as_ref()
302 }
303
304 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
305 self.max_transfer_blocks
306 }
307}
308
309pub trait SessionManager: 'static {
312 type Orchestrator: Borrow<Self> + Send + Sync;
317
318 const SUPPORTS_DECOMPRESSION: bool;
319
320 type Session;
321
322 fn on_attach_vmo(
323 orchestrator: Arc<Self::Orchestrator>,
324 vmo: &Arc<zx::Vmo>,
325 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
326
327 fn open_session(
332 orchestrator: Arc<Self::Orchestrator>,
333 stream: fblock::SessionRequestStream,
334 offset_map: OffsetMap,
335 block_size: u32,
336 ) -> impl Future<Output = Result<(), Error>> + Send;
337
338 fn get_info(&self) -> Cow<'_, DeviceInfo>;
340
341 fn get_volume_info(
343 &self,
344 ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
345 {
346 async { Err(zx::Status::NOT_SUPPORTED) }
347 }
348
349 fn query_slices(
351 &self,
352 _start_slices: &[u64],
353 ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
354 async { Err(zx::Status::NOT_SUPPORTED) }
355 }
356
357 fn extend(
359 &self,
360 _start_slice: u64,
361 _slice_count: u64,
362 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
363 async { Err(zx::Status::NOT_SUPPORTED) }
364 }
365
366 fn shrink(
368 &self,
369 _start_slice: u64,
370 _slice_count: u64,
371 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
372 async { Err(zx::Status::NOT_SUPPORTED) }
373 }
374
375 fn active_requests(&self) -> &ActiveRequests<Self::Session>;
377}
378
379pub trait IntoOrchestrator {
383 type SM: SessionManager;
384
385 fn into_orchestrator(self) -> Arc<<Self::SM as SessionManager>::Orchestrator>;
386}
387
388impl<SM: SessionManager> BlockServer<SM> {
389 pub fn new(block_size: u32, orchestrator: impl IntoOrchestrator<SM = SM>) -> Self {
390 Self { block_size, orchestrator: orchestrator.into_orchestrator() }
391 }
392
393 pub fn session_manager(&self) -> &SM {
394 self.orchestrator.as_ref().borrow()
395 }
396
397 pub async fn handle_requests(
399 &self,
400 mut requests: fblock::BlockRequestStream,
401 ) -> Result<(), Error> {
402 let scope = fasync::Scope::new();
403 loop {
404 match requests.try_next().await {
405 Ok(Some(request)) => {
406 if let Some(session) = self.handle_request(request).await? {
407 scope.spawn(session.map(|_| ()));
408 }
409 }
410 Ok(None) => break,
411 Err(err) => log::warn!(err:?; "Invalid request"),
412 }
413 }
414 scope.await;
415 Ok(())
416 }
417
418 async fn handle_request(
421 &self,
422 request: fblock::BlockRequest,
423 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
424 match request {
425 fblock::BlockRequest::GetInfo { responder } => {
426 let info = self.device_info();
427 let max_transfer_size = info.max_transfer_size(self.block_size);
428 let (block_count, mut flags) = match info.as_ref() {
429 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
430 (*block_count, *device_flags)
431 }
432 DeviceInfo::Partition(partition_info) => {
433 let block_count = if let Some(range) = partition_info.block_range.as_ref() {
434 range.end - range.start
435 } else {
436 let volume_info = self.session_manager().get_volume_info().await?;
437 volume_info.0.slice_size * volume_info.1.partition_slice_count
438 / self.block_size as u64
439 };
440 (block_count, partition_info.device_flags)
441 }
442 };
443 if SM::SUPPORTS_DECOMPRESSION {
444 flags |= fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT;
445 }
446 responder.send(Ok(&fblock::BlockInfo {
447 block_count,
448 block_size: self.block_size,
449 max_transfer_size,
450 flags,
451 }))?;
452 }
453 fblock::BlockRequest::OpenSession { session, control_handle: _ } => {
454 let info = self.device_info();
455 return Ok(Some(SM::open_session(
456 self.orchestrator.clone(),
457 session.into_stream(),
458 OffsetMap::empty(info.max_transfer_blocks()),
459 self.block_size,
460 )));
461 }
462 fblock::BlockRequest::OpenSessionWithOffsetMap {
463 session,
464 mapping,
465 control_handle: _,
466 } => {
467 let info = self.device_info();
468 let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
469 Ok(m) => m,
470 Err(status) => {
471 session.close_with_epitaph(status)?;
472 return Ok(None);
473 }
474 };
475 if let Some(max) = info.block_count() {
476 if initial_mapping.target_block_offset + initial_mapping.length > max {
477 log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
478 session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
479 return Ok(None);
480 }
481 }
482 return Ok(Some(SM::open_session(
483 self.orchestrator.clone(),
484 session.into_stream(),
485 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
486 self.block_size,
487 )));
488 }
489 fblock::BlockRequest::GetTypeGuid { responder } => {
490 let info = self.device_info();
491 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
492 let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
493 guid.value.copy_from_slice(&partition_info.type_guid);
494 responder.send(zx::sys::ZX_OK, Some(&guid))?;
495 } else {
496 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
497 }
498 }
499 fblock::BlockRequest::GetInstanceGuid { responder } => {
500 let info = self.device_info();
501 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
502 let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
503 guid.value.copy_from_slice(&partition_info.instance_guid);
504 responder.send(zx::sys::ZX_OK, Some(&guid))?;
505 } else {
506 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
507 }
508 }
509 fblock::BlockRequest::GetName { responder } => {
510 let info = self.device_info();
511 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
512 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
513 } else {
514 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
515 }
516 }
517 fblock::BlockRequest::GetMetadata { responder } => {
518 let info = self.device_info();
519 if let DeviceInfo::Partition(info) = info.as_ref() {
520 let mut type_guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
521 type_guid.value.copy_from_slice(&info.type_guid);
522 let mut instance_guid =
523 fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
524 instance_guid.value.copy_from_slice(&info.instance_guid);
525 responder.send(Ok(&fblock::BlockGetMetadataResponse {
526 name: Some(info.name.clone()),
527 type_guid: Some(type_guid),
528 instance_guid: Some(instance_guid),
529 start_block_offset: info.block_range.as_ref().map(|range| range.start),
530 num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
531 flags: Some(info.flags),
532 ..Default::default()
533 }))?;
534 } else {
535 responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
536 }
537 }
538 fblock::BlockRequest::QuerySlices { responder, start_slices } => {
539 match self.session_manager().query_slices(&start_slices).await {
540 Ok(mut results) => {
541 let results_len = results.len();
542 assert!(results_len <= 16);
543 results.resize(16, fblock::VsliceRange { allocated: false, count: 0 });
544 responder.send(
545 zx::sys::ZX_OK,
546 &results.try_into().unwrap(),
547 results_len as u64,
548 )?;
549 }
550 Err(s) => {
551 responder.send(
552 s.into_raw(),
553 &[fblock::VsliceRange { allocated: false, count: 0 }; 16],
554 0,
555 )?;
556 }
557 }
558 }
559 fblock::BlockRequest::GetVolumeInfo { responder, .. } => {
560 match self.session_manager().get_volume_info().await {
561 Ok((manager_info, volume_info)) => {
562 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
563 }
564 Err(s) => responder.send(s.into_raw(), None, None)?,
565 }
566 }
567 fblock::BlockRequest::Extend { responder, start_slice, slice_count } => {
568 responder.send(
569 zx::Status::from(self.session_manager().extend(start_slice, slice_count).await)
570 .into_raw(),
571 )?;
572 }
573 fblock::BlockRequest::Shrink { responder, start_slice, slice_count } => {
574 responder.send(
575 zx::Status::from(self.session_manager().shrink(start_slice, slice_count).await)
576 .into_raw(),
577 )?;
578 }
579 fblock::BlockRequest::Destroy { responder, .. } => {
580 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
581 }
582 }
583 Ok(None)
584 }
585
586 fn device_info(&self) -> Cow<'_, DeviceInfo> {
587 self.session_manager().get_info()
588 }
589}
590
591struct SessionHelper<SM: SessionManager> {
592 orchestrator: Arc<SM::Orchestrator>,
593 offset_map: OffsetMap,
594 block_size: u32,
595 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
596 vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
597}
598
599struct VmoMapping {
600 base: usize,
601 size: usize,
602}
603
604impl VmoMapping {
605 fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
606 let size = vmo.get_size().unwrap() as usize;
607 Ok(Arc::new(Self {
608 base: fuchsia_runtime::vmar_root_self()
609 .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
610 .inspect_err(|error| {
611 log::warn!(error:?, size; "VmoMapping: unable to map VMO");
612 })?,
613 size,
614 }))
615 }
616}
617
618impl Drop for VmoMapping {
619 fn drop(&mut self) {
620 unsafe {
622 let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
623 }
624 }
625}
626
627impl<SM: SessionManager> SessionHelper<SM> {
628 fn new(
629 orchestrator: Arc<SM::Orchestrator>,
630 offset_map: OffsetMap,
631 block_size: u32,
632 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
633 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
634 Ok((Self { orchestrator, offset_map, block_size, peer_fifo, vmos: Mutex::default() }, fifo))
635 }
636
637 fn session_manager(&self) -> &SM {
638 self.orchestrator.as_ref().borrow()
639 }
640
641 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
642 match request {
643 fblock::SessionRequest::GetFifo { responder } => {
644 let rights = zx::Rights::TRANSFER
645 | zx::Rights::READ
646 | zx::Rights::WRITE
647 | zx::Rights::SIGNAL
648 | zx::Rights::WAIT;
649 match self.peer_fifo.duplicate_handle(rights) {
650 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
651 Err(s) => responder.send(Err(s.into_raw()))?,
652 }
653 Ok(())
654 }
655 fblock::SessionRequest::AttachVmo { vmo, responder } => {
656 let vmo = Arc::new(vmo);
657 let vmo_id = {
658 let mut vmos = self.vmos.lock();
659 if vmos.len() == u16::MAX as usize {
660 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
661 return Ok(());
662 } else {
663 let vmo_id = match vmos.last_entry() {
664 None => 1,
665 Some(o) => {
666 o.key().checked_add(1).unwrap_or_else(|| {
667 let mut vmo_id = 1;
668 for (&id, _) in &*vmos {
670 if id > vmo_id {
671 break;
672 }
673 vmo_id = id + 1;
674 }
675 vmo_id
676 })
677 }
678 };
679 vmos.insert(vmo_id, (vmo.clone(), None));
680 vmo_id
681 }
682 };
683 SM::on_attach_vmo(self.orchestrator.clone(), &vmo).await?;
684 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
685 Ok(())
686 }
687 fblock::SessionRequest::Close { responder } => {
688 responder.send(Ok(()))?;
689 Err(anyhow!("Closed"))
690 }
691 }
692 }
693
694 fn decode_fifo_request(
696 &self,
697 session: SM::Session,
698 request: &BlockFifoRequest,
699 ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
700 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
701
702 let request_bytes = request.length as u64 * self.block_size as u64;
703
704 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
705 .ok_or(zx::Status::INVALID_ARGS)
706 .and_then(|code| {
707 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
708 if code != BlockOpcode::Read {
709 return Err(zx::Status::INVALID_ARGS);
710 }
711 if !SM::SUPPORTS_DECOMPRESSION {
712 return Err(zx::Status::NOT_SUPPORTED);
713 }
714 }
715 if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
716 if request.length == 0 {
717 return Err(zx::Status::INVALID_ARGS);
718 }
719 if request.dev_offset.checked_add(request.length as u64).is_none()
721 || (code != BlockOpcode::Trim
722 && request_bytes.checked_add(request.vmo_offset).is_none())
723 {
724 return Err(zx::Status::OUT_OF_RANGE);
725 }
726 }
727 Ok(match code {
728 BlockOpcode::Read => Operation::Read {
729 device_block_offset: request.dev_offset,
730 block_count: request.length,
731 _unused: 0,
732 vmo_offset: request
733 .vmo_offset
734 .checked_mul(self.block_size as u64)
735 .ok_or(zx::Status::OUT_OF_RANGE)?,
736 options: ReadOptions {
737 inline_crypto: InlineCryptoOptions {
738 is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
739 dun: request.dun,
740 slot: request.slot,
741 },
742 },
743 },
744 BlockOpcode::Write => {
745 let mut options = WriteOptions {
746 inline_crypto: InlineCryptoOptions {
747 is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
748 dun: request.dun,
749 slot: request.slot,
750 },
751 ..WriteOptions::default()
752 };
753 if flags.contains(BlockIoFlag::FORCE_ACCESS) {
754 options.flags |= WriteFlags::FORCE_ACCESS;
755 }
756 if flags.contains(BlockIoFlag::PRE_BARRIER) {
757 options.flags |= WriteFlags::PRE_BARRIER;
758 }
759 Operation::Write {
760 device_block_offset: request.dev_offset,
761 block_count: request.length,
762 _unused: 0,
763 options,
764 vmo_offset: request
765 .vmo_offset
766 .checked_mul(self.block_size as u64)
767 .ok_or(zx::Status::OUT_OF_RANGE)?,
768 }
769 }
770 BlockOpcode::Flush => Operation::Flush,
771 BlockOpcode::Trim => Operation::Trim {
772 device_block_offset: request.dev_offset,
773 block_count: request.length,
774 },
775 BlockOpcode::CloseVmo => Operation::CloseVmo,
776 })
777 });
778
779 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
780 GroupOrRequest::Group(request.group)
781 } else {
782 GroupOrRequest::Request(request.reqid)
783 };
784
785 let mut active_requests = self.session_manager().active_requests().0.lock();
786 let mut request_id = None;
787
788 if group_or_request.is_group() {
799 for (key, group) in &mut active_requests.requests {
803 if group.group_or_request == group_or_request {
804 if group.req_id.is_some() {
805 if group.status == zx::Status::OK {
807 group.status = zx::Status::INVALID_ARGS;
808 }
809 return Err(None);
811 }
812 if group.status == zx::Status::OK
814 && let Some(info) = &mut group.decompression_info
815 {
816 if let Ok(Operation::Read {
817 device_block_offset,
818 mut block_count,
819 options,
820 vmo_offset: 0,
821 ..
822 }) = operation
823 {
824 let remaining_bytes = info
825 .compressed_range
826 .end
827 .next_multiple_of(self.block_size as usize)
828 as u64
829 - info.bytes_so_far;
830 if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
831 || request.total_compressed_bytes != 0
832 || request.uncompressed_bytes != 0
833 || request.compressed_prefix_bytes != 0
834 || (flags.contains(BlockIoFlag::GROUP_LAST)
835 && info.bytes_so_far + request_bytes
836 < info.compressed_range.end as u64)
837 || (!flags.contains(BlockIoFlag::GROUP_LAST)
838 && request_bytes >= remaining_bytes)
839 {
840 group.status = zx::Status::INVALID_ARGS;
841 } else {
842 if request_bytes > remaining_bytes {
851 block_count = (remaining_bytes / self.block_size as u64) as u32;
852 }
853
854 operation = Ok(Operation::ContinueDecompressedRead {
855 offset: info.bytes_so_far,
856 device_block_offset,
857 block_count,
858 options,
859 });
860
861 info.bytes_so_far += block_count as u64 * self.block_size as u64;
862 }
863 } else {
864 group.status = zx::Status::INVALID_ARGS;
865 }
866 }
867 if flags.contains(BlockIoFlag::GROUP_LAST) {
868 group.req_id = Some(request.reqid);
869 if group.status != zx::Status::OK {
872 operation = Err(group.status);
873 }
874 } else if group.status != zx::Status::OK {
875 return Err(None);
878 }
879 request_id = Some(RequestId(key));
880 group.count += 1;
881 break;
882 }
883 }
884 }
885
886 let is_single_request =
887 !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
888
889 let mut decompression_info = None;
890 let vmo = match operation {
891 Ok(Operation::Read {
892 device_block_offset,
893 mut block_count,
894 options,
895 vmo_offset,
896 ..
897 }) => match self.vmos.lock().get_mut(&request.vmoid) {
898 Some((vmo, mapping)) => {
899 if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
900 let compressed_range = request.compressed_prefix_bytes as usize
901 ..request.compressed_prefix_bytes as usize
902 + request.total_compressed_bytes as usize;
903 let required_buffer_size =
904 compressed_range.end.next_multiple_of(self.block_size as usize);
905
906 if compressed_range.start >= compressed_range.end
908 || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
909 || (is_single_request && request_bytes < compressed_range.end as u64)
910 || (!is_single_request && request_bytes >= required_buffer_size as u64)
911 {
912 Err(zx::Status::INVALID_ARGS)
913 } else {
914 let bytes_so_far = if request_bytes > required_buffer_size as u64 {
921 block_count =
922 (required_buffer_size / self.block_size as usize) as u32;
923 required_buffer_size as u64
924 } else {
925 request_bytes
926 };
927
928 match mapping {
930 Some(mapping) => Ok(mapping.clone()),
931 None => {
932 VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
933 }
934 }
935 .and_then(|mapping| {
936 if vmo_offset
939 .checked_add(request.uncompressed_bytes as u64)
940 .is_some_and(|end| end <= mapping.size as u64)
941 {
942 Ok(mapping)
943 } else {
944 Err(zx::Status::OUT_OF_RANGE)
945 }
946 })
947 .map(|mapping| {
948 operation = Ok(Operation::StartDecompressedRead {
953 required_buffer_size,
954 device_block_offset,
955 block_count,
956 options,
957 });
958 decompression_info = Some(DecompressionInfo {
961 compressed_range,
962 bytes_so_far,
963 mapping,
964 uncompressed_range: vmo_offset
965 ..vmo_offset + request.uncompressed_bytes as u64,
966 buffer: None,
967 });
968 None
969 })
970 }
971 } else {
972 Ok(Some(vmo.clone()))
973 }
974 }
975 None => Err(zx::Status::IO),
976 },
977 Ok(Operation::Write { .. }) => self
978 .vmos
979 .lock()
980 .get(&request.vmoid)
981 .cloned()
982 .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
983 Ok(Operation::CloseVmo) => {
984 self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
985 let vmo_clone = vmo.clone();
986 Epoch::global().defer(move || drop(vmo_clone));
989 Ok(Some(vmo))
990 })
991 }
992 _ => Ok(None),
993 }
994 .unwrap_or_else(|e| {
995 operation = Err(e);
996 None
997 });
998
999 let trace_flow_id = NonZero::new(request.trace_flow_id);
1000 let request_id = request_id.unwrap_or_else(|| {
1001 RequestId(active_requests.requests.insert(ActiveRequest {
1002 session,
1003 group_or_request,
1004 trace_flow_id,
1005 _epoch_guard: Epoch::global().guard(),
1006 status: zx::Status::OK,
1007 count: 1,
1008 req_id: is_single_request.then_some(request.reqid),
1009 decompression_info,
1010 }))
1011 });
1012
1013 Ok(DecodedRequest {
1014 request_id,
1015 trace_flow_id,
1016 operation: operation.map_err(|status| {
1017 active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
1018 })?,
1019 vmo,
1020 })
1021 }
1022
1023 fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1024 std::mem::take(&mut *self.vmos.lock())
1025 }
1026
1027 fn map_request(
1029 &self,
1030 mut request: DecodedRequest,
1031 active_request: &mut ActiveRequest<SM::Session>,
1032 ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1033 if active_request.status != zx::Status::OK {
1034 return Err(zx::Status::BAD_STATE);
1035 }
1036 let mapping = self.offset_map.mapping();
1037 match (mapping, request.operation.blocks()) {
1038 (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1039 return Err(zx::Status::OUT_OF_RANGE);
1040 }
1041 _ => {}
1042 }
1043 let remainder = request.operation.map(
1044 self.offset_map.mapping(),
1045 self.offset_map.max_transfer_blocks(),
1046 self.block_size,
1047 );
1048 if remainder.is_some() {
1049 active_request.count += 1;
1050 }
1051 static CACHE: AtomicU64 = AtomicU64::new(0);
1052 if let Some(context) =
1053 fuchsia_trace::TraceCategoryContext::acquire_cached("storage", &CACHE)
1054 {
1055 use fuchsia_trace::ArgValue;
1056 let trace_args = [
1057 ArgValue::of("request_id", request.request_id.0),
1058 ArgValue::of("opcode", request.operation.trace_label()),
1059 ];
1060 let _scope =
1061 fuchsia_trace::duration("storage", "block_server::start_transaction", &trace_args);
1062 if let Some(trace_flow_id) = active_request.trace_flow_id {
1063 fuchsia_trace::flow_step(
1064 &context,
1065 "block_server::start_transaction",
1066 trace_flow_id.get().into(),
1067 &[],
1068 );
1069 }
1070 }
1071 let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1072 Ok((request, remainder))
1073 }
1074
1075 fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1080 self.session_manager().active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1081 }
1082
1083 fn close_active_groups(&self, pred: impl Fn(&SM::Session) -> bool) {
1096 self.session_manager().active_requests().0.lock().requests.retain(|_, request| {
1097 if !pred(&request.session) || request.req_id.is_some() {
1098 return true;
1099 }
1100 request.req_id = Some(u32::MAX);
1103 request.count > 0
1104 });
1105 }
1106}
1107
1108#[repr(transparent)]
1109#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
1110pub struct RequestId(usize);
1111
1112#[derive(Clone, Debug)]
1113struct DecodedRequest {
1114 request_id: RequestId,
1115 trace_flow_id: TraceFlowId,
1116 operation: Operation,
1117 vmo: Option<Arc<zx::Vmo>>,
1118}
1119
1120pub type WriteFlags = block_protocol::WriteFlags;
1122pub type WriteOptions = block_protocol::WriteOptions;
1123pub type ReadOptions = block_protocol::ReadOptions;
1124pub type InlineCryptoOptions = block_protocol::InlineCryptoOptions;
1125
1126#[repr(C)]
1127#[derive(Clone, Debug, PartialEq, Eq)]
1128pub enum Operation {
1129 Read {
1134 device_block_offset: u64,
1135 block_count: u32,
1136 _unused: u32,
1137 vmo_offset: u64,
1138 options: ReadOptions,
1139 },
1140 Write {
1141 device_block_offset: u64,
1142 block_count: u32,
1143 _unused: u32,
1144 vmo_offset: u64,
1145 options: WriteOptions,
1146 },
1147 Flush,
1148 Trim {
1149 device_block_offset: u64,
1150 block_count: u32,
1151 },
1152 CloseVmo,
1154 StartDecompressedRead {
1155 required_buffer_size: usize,
1156 device_block_offset: u64,
1157 block_count: u32,
1158 options: ReadOptions,
1159 },
1160 ContinueDecompressedRead {
1161 offset: u64,
1162 device_block_offset: u64,
1163 block_count: u32,
1164 options: ReadOptions,
1165 },
1166}
1167
1168impl Operation {
1169 fn trace_label(&self) -> &'static str {
1170 match self {
1171 Operation::Read { .. } => "read",
1172 Operation::Write { .. } => "write",
1173 Operation::Flush { .. } => "flush",
1174 Operation::Trim { .. } => "trim",
1175 Operation::CloseVmo { .. } => "close_vmo",
1176 Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1177 Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1178 }
1179 }
1180
1181 fn blocks(&self) -> Option<(u64, u32)> {
1183 match self {
1184 Operation::Read { device_block_offset, block_count, .. }
1185 | Operation::Write { device_block_offset, block_count, .. }
1186 | Operation::Trim { device_block_offset, block_count, .. } => {
1187 Some((*device_block_offset, *block_count))
1188 }
1189 _ => None,
1190 }
1191 }
1192
1193 fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1195 match self {
1196 Operation::Read { device_block_offset, block_count, .. }
1197 | Operation::Write { device_block_offset, block_count, .. }
1198 | Operation::Trim { device_block_offset, block_count, .. } => {
1199 Some((device_block_offset, block_count))
1200 }
1201 _ => None,
1202 }
1203 }
1204
1205 fn map(
1208 &mut self,
1209 mapping: Option<&BlockOffsetMapping>,
1210 max_blocks: Option<NonZero<u32>>,
1211 block_size: u32,
1212 ) -> Option<Self> {
1213 let mut max = match self {
1214 Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1215 _ => None,
1216 };
1217 let (offset, length) = self.blocks_mut()?;
1218 let orig_offset = *offset;
1219 if let Some(mapping) = mapping {
1220 let delta = *offset - mapping.source_block_offset;
1221 debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1222 *offset = mapping.target_block_offset + delta;
1223 let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1224 max = match max {
1225 None => Some(mapping_max),
1226 Some(m) => Some(std::cmp::min(m, mapping_max)),
1227 };
1228 };
1229 if let Some(max) = max {
1230 if *length as u64 > max {
1231 let rem = (*length as u64 - max) as u32;
1232 *length = max as u32;
1233 return Some(match self {
1234 Operation::Read {
1235 device_block_offset: _,
1236 block_count: _,
1237 vmo_offset,
1238 _unused,
1239 options,
1240 } => {
1241 let mut options = *options;
1242 options.inline_crypto.dun += max as u32;
1243 Operation::Read {
1244 device_block_offset: orig_offset + max,
1245 block_count: rem,
1246 vmo_offset: *vmo_offset + max * block_size as u64,
1247 _unused: *_unused,
1248 options: options,
1249 }
1250 }
1251 Operation::Write {
1252 device_block_offset: _,
1253 block_count: _,
1254 _unused,
1255 vmo_offset,
1256 options,
1257 } => {
1258 let mut options = *options;
1259 options.inline_crypto.dun += max as u32;
1260 Operation::Write {
1261 device_block_offset: orig_offset + max,
1262 block_count: rem,
1263 _unused: *_unused,
1264 vmo_offset: *vmo_offset + max * block_size as u64,
1265 options: options,
1266 }
1267 }
1268 Operation::Trim { device_block_offset: _, block_count: _ } => {
1269 Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1270 }
1271 _ => unreachable!(),
1272 });
1273 }
1274 }
1275 None
1276 }
1277
1278 pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1280 if let Operation::Write { options, .. } = self {
1281 options.flags.contains(value)
1282 } else {
1283 false
1284 }
1285 }
1286
1287 pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1289 if let Operation::Write { options, .. } = self {
1290 let result = options.flags.contains(value);
1291 options.flags.remove(value);
1292 result
1293 } else {
1294 false
1295 }
1296 }
1297}
1298
1299#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1300pub enum GroupOrRequest {
1301 Group(u16),
1302 Request(u32),
1303}
1304
1305impl GroupOrRequest {
1306 fn is_group(&self) -> bool {
1307 matches!(self, Self::Group(_))
1308 }
1309
1310 fn group_id(&self) -> Option<u16> {
1311 match self {
1312 Self::Group(id) => Some(*id),
1313 Self::Request(_) => None,
1314 }
1315 }
1316}
1317
1318#[cfg(test)]
1319mod tests {
1320 use super::{
1321 BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1322 TraceFlowId,
1323 };
1324 use assert_matches::assert_matches;
1325 use block_protocol::{
1326 BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions, ReadOptions,
1327 WriteFlags, WriteOptions,
1328 };
1329 use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
1330 use fuchsia_sync::Mutex;
1331 use futures::FutureExt as _;
1332 use futures::channel::oneshot;
1333 use futures::future::BoxFuture;
1334 use std::borrow::Cow;
1335 use std::future::poll_fn;
1336 use std::num::NonZero;
1337 use std::pin::pin;
1338 use std::sync::Arc;
1339 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1340 use std::task::{Context, Poll};
1341 use zx::HandleBased as _;
1342 use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
1343
1344 #[derive(Default)]
1345 struct MockInterface {
1346 read_hook: Option<
1347 Box<
1348 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1349 + Send
1350 + Sync,
1351 >,
1352 >,
1353 write_hook:
1354 Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1355 barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1356 }
1357
1358 impl super::async_interface::Interface for MockInterface {
1359 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1360 Ok(())
1361 }
1362
1363 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1364 Cow::Owned(test_device_info())
1365 }
1366
1367 async fn read(
1368 &self,
1369 device_block_offset: u64,
1370 block_count: u32,
1371 vmo: &Arc<zx::Vmo>,
1372 vmo_offset: u64,
1373 _opts: ReadOptions,
1374 _trace_flow_id: TraceFlowId,
1375 ) -> Result<(), zx::Status> {
1376 if let Some(read_hook) = &self.read_hook {
1377 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1378 } else {
1379 unimplemented!();
1380 }
1381 }
1382
1383 async fn write(
1384 &self,
1385 device_block_offset: u64,
1386 _block_count: u32,
1387 _vmo: &Arc<zx::Vmo>,
1388 _vmo_offset: u64,
1389 opts: WriteOptions,
1390 _trace_flow_id: TraceFlowId,
1391 ) -> Result<(), zx::Status> {
1392 if opts.flags.contains(WriteFlags::PRE_BARRIER)
1393 && let Some(barrier_hook) = &self.barrier_hook
1394 {
1395 barrier_hook()?;
1396 }
1397 if let Some(write_hook) = &self.write_hook {
1398 write_hook(device_block_offset).await
1399 } else {
1400 unimplemented!();
1401 }
1402 }
1403
1404 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1405 Ok(())
1406 }
1407
1408 async fn trim(
1409 &self,
1410 _device_block_offset: u64,
1411 _block_count: u32,
1412 _trace_flow_id: TraceFlowId,
1413 ) -> Result<(), zx::Status> {
1414 unreachable!();
1415 }
1416
1417 async fn get_volume_info(
1418 &self,
1419 ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
1420 let () = std::future::pending().await;
1422 unreachable!();
1423 }
1424 }
1425
1426 const BLOCK_SIZE: u32 = 512;
1427 const MAX_TRANSFER_BLOCKS: u32 = 10;
1428
1429 fn test_device_info() -> DeviceInfo {
1430 DeviceInfo::Partition(PartitionInfo {
1431 device_flags: fblock::DeviceFlag::READONLY
1432 | fblock::DeviceFlag::BARRIER_SUPPORT
1433 | fblock::DeviceFlag::FUA_SUPPORT,
1434 max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1435 block_range: Some(0..100),
1436 type_guid: [1; 16],
1437 instance_guid: [2; 16],
1438 name: "foo".to_string(),
1439 flags: 0xabcd,
1440 })
1441 }
1442
1443 #[fuchsia::test]
1444 async fn test_barriers_ordering() {
1445 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1446 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1447 let barrier_called = Arc::new(AtomicBool::new(false));
1448
1449 futures::join!(
1450 async move {
1451 let barrier_called_clone = barrier_called.clone();
1452 let block_server = BlockServer::new(
1453 BLOCK_SIZE,
1454 Arc::new(MockInterface {
1455 barrier_hook: Some(Box::new(move || {
1456 barrier_called.store(true, Ordering::Relaxed);
1457 Ok(())
1458 })),
1459 write_hook: Some(Box::new(move |device_block_offset| {
1460 let barrier_called = barrier_called_clone.clone();
1461 Box::pin(async move {
1462 if device_block_offset % 2 == 0 {
1464 fasync::Timer::new(fasync::MonotonicInstant::after(
1465 zx::MonotonicDuration::from_millis(200),
1466 ))
1467 .await;
1468 }
1469 assert!(barrier_called.load(Ordering::Relaxed));
1470 Ok(())
1471 })
1472 })),
1473 ..MockInterface::default()
1474 }),
1475 );
1476 block_server.handle_requests(stream).await.unwrap();
1477 },
1478 async move {
1479 let (session_proxy, server) = fidl::endpoints::create_proxy();
1480
1481 proxy.open_session(server).unwrap();
1482
1483 let vmo_id = session_proxy
1484 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1485 .await
1486 .unwrap()
1487 .unwrap();
1488 assert_ne!(vmo_id.id, 0);
1489
1490 let mut fifo =
1491 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1492 let (mut reader, mut writer) = fifo.async_io();
1493
1494 writer
1495 .write_entries(&BlockFifoRequest {
1496 command: BlockFifoCommand {
1497 opcode: BlockOpcode::Write.into_primitive(),
1498 flags: BlockIoFlag::PRE_BARRIER.bits(),
1499 ..Default::default()
1500 },
1501 vmoid: vmo_id.id,
1502 dev_offset: 0,
1503 length: 5,
1504 vmo_offset: 6,
1505 ..Default::default()
1506 })
1507 .await
1508 .unwrap();
1509
1510 for i in 0..10 {
1511 writer
1512 .write_entries(&BlockFifoRequest {
1513 command: BlockFifoCommand {
1514 opcode: BlockOpcode::Write.into_primitive(),
1515 ..Default::default()
1516 },
1517 vmoid: vmo_id.id,
1518 dev_offset: i + 1,
1519 length: 5,
1520 vmo_offset: 6,
1521 ..Default::default()
1522 })
1523 .await
1524 .unwrap();
1525 }
1526 for _ in 0..11 {
1527 let mut response = BlockFifoResponse::default();
1528 reader.read_entries(&mut response).await.unwrap();
1529 assert_eq!(response.status, zx::sys::ZX_OK);
1530 }
1531
1532 std::mem::drop(proxy);
1533 }
1534 );
1535 }
1536
1537 #[fuchsia::test]
1538 async fn test_info() {
1539 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1540
1541 futures::join!(
1542 async {
1543 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1544 block_server.handle_requests(stream).await.unwrap();
1545 },
1546 async {
1547 let expected_info = test_device_info();
1548 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1549 info
1550 } else {
1551 unreachable!()
1552 };
1553
1554 let block_info = proxy.get_info().await.unwrap().unwrap();
1555 assert_eq!(
1556 block_info.block_count,
1557 partition_info.block_range.as_ref().unwrap().end
1558 - partition_info.block_range.as_ref().unwrap().start
1559 );
1560 assert_eq!(
1561 block_info.flags,
1562 fblock::DeviceFlag::READONLY
1563 | fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT
1564 | fblock::DeviceFlag::BARRIER_SUPPORT
1565 | fblock::DeviceFlag::FUA_SUPPORT
1566 );
1567
1568 assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1569
1570 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1571 assert_eq!(status, zx::sys::ZX_OK);
1572 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1573
1574 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1575 assert_eq!(status, zx::sys::ZX_OK);
1576 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1577
1578 let (status, name) = proxy.get_name().await.unwrap();
1579 assert_eq!(status, zx::sys::ZX_OK);
1580 assert_eq!(name.as_ref(), Some(&partition_info.name));
1581
1582 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1583 assert_eq!(metadata.name, name);
1584 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1585 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1586 assert_eq!(
1587 metadata.start_block_offset,
1588 Some(partition_info.block_range.as_ref().unwrap().start)
1589 );
1590 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1591 assert_eq!(metadata.flags, Some(partition_info.flags));
1592
1593 std::mem::drop(proxy);
1594 }
1595 );
1596 }
1597
1598 #[fuchsia::test]
1599 async fn test_attach_vmo() {
1600 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1601
1602 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1603 let koid = vmo.koid().unwrap();
1604
1605 futures::join!(
1606 async {
1607 let block_server = BlockServer::new(
1608 BLOCK_SIZE,
1609 Arc::new(MockInterface {
1610 read_hook: Some(Box::new(move |_, _, vmo, _| {
1611 assert_eq!(vmo.koid().unwrap(), koid);
1612 Box::pin(async { Ok(()) })
1613 })),
1614 ..MockInterface::default()
1615 }),
1616 );
1617 block_server.handle_requests(stream).await.unwrap();
1618 },
1619 async move {
1620 let (session_proxy, server) = fidl::endpoints::create_proxy();
1621
1622 proxy.open_session(server).unwrap();
1623
1624 let vmo_id = session_proxy
1625 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1626 .await
1627 .unwrap()
1628 .unwrap();
1629 assert_ne!(vmo_id.id, 0);
1630
1631 let mut fifo =
1632 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1633 let (mut reader, mut writer) = fifo.async_io();
1634
1635 let mut count = 1;
1637 loop {
1638 match session_proxy
1639 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1640 .await
1641 .unwrap()
1642 {
1643 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1644 Err(e) => {
1645 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1646 break;
1647 }
1648 }
1649
1650 if count % 10 == 0 {
1652 writer
1653 .write_entries(&BlockFifoRequest {
1654 command: BlockFifoCommand {
1655 opcode: BlockOpcode::Read.into_primitive(),
1656 ..Default::default()
1657 },
1658 vmoid: vmo_id.id,
1659 length: 1,
1660 ..Default::default()
1661 })
1662 .await
1663 .unwrap();
1664
1665 let mut response = BlockFifoResponse::default();
1666 reader.read_entries(&mut response).await.unwrap();
1667 assert_eq!(response.status, zx::sys::ZX_OK);
1668 }
1669
1670 count += 1;
1671 }
1672
1673 assert_eq!(count, u16::MAX as u64);
1674
1675 writer
1677 .write_entries(&BlockFifoRequest {
1678 command: BlockFifoCommand {
1679 opcode: BlockOpcode::CloseVmo.into_primitive(),
1680 ..Default::default()
1681 },
1682 vmoid: vmo_id.id,
1683 ..Default::default()
1684 })
1685 .await
1686 .unwrap();
1687
1688 let mut response = BlockFifoResponse::default();
1689 reader.read_entries(&mut response).await.unwrap();
1690 assert_eq!(response.status, zx::sys::ZX_OK);
1691
1692 let new_vmo_id = session_proxy
1693 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1694 .await
1695 .unwrap()
1696 .unwrap();
1697 assert_eq!(new_vmo_id.id, vmo_id.id);
1699
1700 std::mem::drop(proxy);
1701 }
1702 );
1703 }
1704
1705 #[fuchsia::test]
1706 async fn test_close() {
1707 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1708
1709 let mut server = std::pin::pin!(
1710 async {
1711 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1712 block_server.handle_requests(stream).await.unwrap();
1713 }
1714 .fuse()
1715 );
1716
1717 let mut client = std::pin::pin!(
1718 async {
1719 let (session_proxy, server) = fidl::endpoints::create_proxy();
1720
1721 proxy.open_session(server).unwrap();
1722
1723 std::mem::drop(proxy);
1726
1727 session_proxy.close().await.unwrap().unwrap();
1728
1729 let _: () = std::future::pending().await;
1731 }
1732 .fuse()
1733 );
1734
1735 futures::select!(
1736 _ = server => {}
1737 _ = client => unreachable!(),
1738 );
1739 }
1740
1741 #[derive(Default)]
1742 struct IoMockInterface {
1743 do_checks: bool,
1744 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1745 return_errors: bool,
1746 }
1747
1748 #[derive(Debug)]
1749 enum ExpectedOp {
1750 Read(u64, u32, u64),
1751 Write(u64, u32, u64),
1752 Trim(u64, u32),
1753 Flush,
1754 }
1755
1756 impl super::async_interface::Interface for IoMockInterface {
1757 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1758 Ok(())
1759 }
1760
1761 fn get_info(&self) -> Cow<'_, DeviceInfo> {
1762 Cow::Owned(test_device_info())
1763 }
1764
1765 async fn read(
1766 &self,
1767 device_block_offset: u64,
1768 block_count: u32,
1769 _vmo: &Arc<zx::Vmo>,
1770 vmo_offset: u64,
1771 _opts: ReadOptions,
1772 _trace_flow_id: TraceFlowId,
1773 ) -> Result<(), zx::Status> {
1774 if self.return_errors {
1775 Err(zx::Status::INTERNAL)
1776 } else {
1777 if self.do_checks {
1778 assert_matches!(
1779 self.expected_op.lock().take(),
1780 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1781 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1782 "Read {device_block_offset} {block_count} {vmo_offset}"
1783 );
1784 }
1785 Ok(())
1786 }
1787 }
1788
1789 async fn write(
1790 &self,
1791 device_block_offset: u64,
1792 block_count: u32,
1793 _vmo: &Arc<zx::Vmo>,
1794 vmo_offset: u64,
1795 _write_opts: WriteOptions,
1796 _trace_flow_id: TraceFlowId,
1797 ) -> Result<(), zx::Status> {
1798 if self.return_errors {
1799 Err(zx::Status::NOT_SUPPORTED)
1800 } else {
1801 if self.do_checks {
1802 assert_matches!(
1803 self.expected_op.lock().take(),
1804 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1805 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1806 "Write {device_block_offset} {block_count} {vmo_offset}"
1807 );
1808 }
1809 Ok(())
1810 }
1811 }
1812
1813 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1814 if self.return_errors {
1815 Err(zx::Status::NO_RESOURCES)
1816 } else {
1817 if self.do_checks {
1818 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1819 }
1820 Ok(())
1821 }
1822 }
1823
1824 async fn trim(
1825 &self,
1826 device_block_offset: u64,
1827 block_count: u32,
1828 _trace_flow_id: TraceFlowId,
1829 ) -> Result<(), zx::Status> {
1830 if self.return_errors {
1831 Err(zx::Status::NO_MEMORY)
1832 } else {
1833 if self.do_checks {
1834 assert_matches!(
1835 self.expected_op.lock().take(),
1836 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1837 block_count == b,
1838 "Trim {device_block_offset} {block_count}"
1839 );
1840 }
1841 Ok(())
1842 }
1843 }
1844 }
1845
1846 #[fuchsia::test]
1847 async fn test_io() {
1848 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1849
1850 let expected_op = Arc::new(Mutex::new(None));
1851 let expected_op_clone = expected_op.clone();
1852
1853 let server = async {
1854 let block_server = BlockServer::new(
1855 BLOCK_SIZE,
1856 Arc::new(IoMockInterface {
1857 return_errors: false,
1858 do_checks: true,
1859 expected_op: expected_op_clone,
1860 }),
1861 );
1862 block_server.handle_requests(stream).await.unwrap();
1863 };
1864
1865 let client = async move {
1866 let (session_proxy, server) = fidl::endpoints::create_proxy();
1867
1868 proxy.open_session(server).unwrap();
1869
1870 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1871 let vmo_id = session_proxy
1872 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1873 .await
1874 .unwrap()
1875 .unwrap();
1876
1877 let mut fifo =
1878 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1879 let (mut reader, mut writer) = fifo.async_io();
1880
1881 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1883 writer
1884 .write_entries(&BlockFifoRequest {
1885 command: BlockFifoCommand {
1886 opcode: BlockOpcode::Read.into_primitive(),
1887 ..Default::default()
1888 },
1889 vmoid: vmo_id.id,
1890 dev_offset: 1,
1891 length: 2,
1892 vmo_offset: 3,
1893 ..Default::default()
1894 })
1895 .await
1896 .unwrap();
1897
1898 let mut response = BlockFifoResponse::default();
1899 reader.read_entries(&mut response).await.unwrap();
1900 assert_eq!(response.status, zx::sys::ZX_OK);
1901
1902 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1904 writer
1905 .write_entries(&BlockFifoRequest {
1906 command: BlockFifoCommand {
1907 opcode: BlockOpcode::Write.into_primitive(),
1908 ..Default::default()
1909 },
1910 vmoid: vmo_id.id,
1911 dev_offset: 4,
1912 length: 5,
1913 vmo_offset: 6,
1914 ..Default::default()
1915 })
1916 .await
1917 .unwrap();
1918
1919 let mut response = BlockFifoResponse::default();
1920 reader.read_entries(&mut response).await.unwrap();
1921 assert_eq!(response.status, zx::sys::ZX_OK);
1922
1923 *expected_op.lock() = Some(ExpectedOp::Flush);
1925 writer
1926 .write_entries(&BlockFifoRequest {
1927 command: BlockFifoCommand {
1928 opcode: BlockOpcode::Flush.into_primitive(),
1929 ..Default::default()
1930 },
1931 ..Default::default()
1932 })
1933 .await
1934 .unwrap();
1935
1936 reader.read_entries(&mut response).await.unwrap();
1937 assert_eq!(response.status, zx::sys::ZX_OK);
1938
1939 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1941 writer
1942 .write_entries(&BlockFifoRequest {
1943 command: BlockFifoCommand {
1944 opcode: BlockOpcode::Trim.into_primitive(),
1945 ..Default::default()
1946 },
1947 dev_offset: 7,
1948 length: 8,
1949 ..Default::default()
1950 })
1951 .await
1952 .unwrap();
1953
1954 reader.read_entries(&mut response).await.unwrap();
1955 assert_eq!(response.status, zx::sys::ZX_OK);
1956
1957 std::mem::drop(proxy);
1958 };
1959
1960 futures::join!(server, client);
1961 }
1962
1963 #[fuchsia::test]
1964 async fn test_io_errors() {
1965 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1966
1967 futures::join!(
1968 async {
1969 let block_server = BlockServer::new(
1970 BLOCK_SIZE,
1971 Arc::new(IoMockInterface {
1972 return_errors: true,
1973 do_checks: false,
1974 expected_op: Arc::new(Mutex::new(None)),
1975 }),
1976 );
1977 block_server.handle_requests(stream).await.unwrap();
1978 },
1979 async move {
1980 let (session_proxy, server) = fidl::endpoints::create_proxy();
1981
1982 proxy.open_session(server).unwrap();
1983
1984 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1985 let vmo_id = session_proxy
1986 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1987 .await
1988 .unwrap()
1989 .unwrap();
1990
1991 let mut fifo =
1992 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1993 let (mut reader, mut writer) = fifo.async_io();
1994
1995 writer
1997 .write_entries(&BlockFifoRequest {
1998 command: BlockFifoCommand {
1999 opcode: BlockOpcode::Read.into_primitive(),
2000 ..Default::default()
2001 },
2002 vmoid: vmo_id.id,
2003 length: 1,
2004 reqid: 1,
2005 ..Default::default()
2006 })
2007 .await
2008 .unwrap();
2009
2010 let mut response = BlockFifoResponse::default();
2011 reader.read_entries(&mut response).await.unwrap();
2012 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
2013
2014 writer
2016 .write_entries(&BlockFifoRequest {
2017 command: BlockFifoCommand {
2018 opcode: BlockOpcode::Write.into_primitive(),
2019 ..Default::default()
2020 },
2021 vmoid: vmo_id.id,
2022 length: 1,
2023 reqid: 2,
2024 ..Default::default()
2025 })
2026 .await
2027 .unwrap();
2028
2029 reader.read_entries(&mut response).await.unwrap();
2030 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
2031
2032 writer
2034 .write_entries(&BlockFifoRequest {
2035 command: BlockFifoCommand {
2036 opcode: BlockOpcode::Flush.into_primitive(),
2037 ..Default::default()
2038 },
2039 reqid: 3,
2040 ..Default::default()
2041 })
2042 .await
2043 .unwrap();
2044
2045 reader.read_entries(&mut response).await.unwrap();
2046 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2047
2048 writer
2050 .write_entries(&BlockFifoRequest {
2051 command: BlockFifoCommand {
2052 opcode: BlockOpcode::Trim.into_primitive(),
2053 ..Default::default()
2054 },
2055 reqid: 4,
2056 length: 1,
2057 ..Default::default()
2058 })
2059 .await
2060 .unwrap();
2061
2062 reader.read_entries(&mut response).await.unwrap();
2063 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2064
2065 std::mem::drop(proxy);
2066 }
2067 );
2068 }
2069
2070 #[fuchsia::test]
2071 async fn test_invalid_args() {
2072 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2073
2074 futures::join!(
2075 async {
2076 let block_server = BlockServer::new(
2077 BLOCK_SIZE,
2078 Arc::new(IoMockInterface {
2079 return_errors: false,
2080 do_checks: false,
2081 expected_op: Arc::new(Mutex::new(None)),
2082 }),
2083 );
2084 block_server.handle_requests(stream).await.unwrap();
2085 },
2086 async move {
2087 let (session_proxy, server) = fidl::endpoints::create_proxy();
2088
2089 proxy.open_session(server).unwrap();
2090
2091 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2092 let vmo_id = session_proxy
2093 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2094 .await
2095 .unwrap()
2096 .unwrap();
2097
2098 let mut fifo =
2099 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2100
2101 async fn test(
2102 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2103 request: BlockFifoRequest,
2104 ) -> Result<(), zx::Status> {
2105 let (mut reader, mut writer) = fifo.async_io();
2106 writer.write_entries(&request).await.unwrap();
2107 let mut response = BlockFifoResponse::default();
2108 reader.read_entries(&mut response).await.unwrap();
2109 zx::Status::ok(response.status)
2110 }
2111
2112 let good_read_request = || BlockFifoRequest {
2115 command: BlockFifoCommand {
2116 opcode: BlockOpcode::Read.into_primitive(),
2117 ..Default::default()
2118 },
2119 length: 1,
2120 vmoid: vmo_id.id,
2121 ..Default::default()
2122 };
2123
2124 assert_eq!(
2125 test(
2126 &mut fifo,
2127 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2128 )
2129 .await,
2130 Err(zx::Status::IO)
2131 );
2132
2133 assert_eq!(
2134 test(
2135 &mut fifo,
2136 BlockFifoRequest {
2137 vmo_offset: 0xffff_ffff_ffff_ffff,
2138 ..good_read_request()
2139 }
2140 )
2141 .await,
2142 Err(zx::Status::OUT_OF_RANGE)
2143 );
2144
2145 assert_eq!(
2146 test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2147 Err(zx::Status::INVALID_ARGS)
2148 );
2149
2150 let good_write_request = || BlockFifoRequest {
2153 command: BlockFifoCommand {
2154 opcode: BlockOpcode::Write.into_primitive(),
2155 ..Default::default()
2156 },
2157 length: 1,
2158 vmoid: vmo_id.id,
2159 ..Default::default()
2160 };
2161
2162 assert_eq!(
2163 test(
2164 &mut fifo,
2165 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2166 )
2167 .await,
2168 Err(zx::Status::IO)
2169 );
2170
2171 assert_eq!(
2172 test(
2173 &mut fifo,
2174 BlockFifoRequest {
2175 vmo_offset: 0xffff_ffff_ffff_ffff,
2176 ..good_write_request()
2177 }
2178 )
2179 .await,
2180 Err(zx::Status::OUT_OF_RANGE)
2181 );
2182
2183 assert_eq!(
2184 test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2185 Err(zx::Status::INVALID_ARGS)
2186 );
2187
2188 assert_eq!(
2191 test(
2192 &mut fifo,
2193 BlockFifoRequest {
2194 command: BlockFifoCommand {
2195 opcode: BlockOpcode::CloseVmo.into_primitive(),
2196 ..Default::default()
2197 },
2198 vmoid: vmo_id.id + 1,
2199 ..Default::default()
2200 }
2201 )
2202 .await,
2203 Err(zx::Status::IO)
2204 );
2205
2206 std::mem::drop(proxy);
2207 }
2208 );
2209 }
2210
2211 #[fuchsia::test]
2212 async fn test_concurrent_requests() {
2213 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2214
2215 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2216 let waiting_readers_clone = waiting_readers.clone();
2217
2218 futures::join!(
2219 async move {
2220 let block_server = BlockServer::new(
2221 BLOCK_SIZE,
2222 Arc::new(MockInterface {
2223 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2224 let (tx, rx) = oneshot::channel();
2225 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2226 Box::pin(async move {
2227 let _ = rx.await;
2228 Ok(())
2229 })
2230 })),
2231 ..MockInterface::default()
2232 }),
2233 );
2234 block_server.handle_requests(stream).await.unwrap();
2235 },
2236 async move {
2237 let (session_proxy, server) = fidl::endpoints::create_proxy();
2238
2239 proxy.open_session(server).unwrap();
2240
2241 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2242 let vmo_id = session_proxy
2243 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2244 .await
2245 .unwrap()
2246 .unwrap();
2247
2248 let mut fifo =
2249 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2250 let (mut reader, mut writer) = fifo.async_io();
2251
2252 writer
2253 .write_entries(&BlockFifoRequest {
2254 command: BlockFifoCommand {
2255 opcode: BlockOpcode::Read.into_primitive(),
2256 ..Default::default()
2257 },
2258 reqid: 1,
2259 dev_offset: 1, vmoid: vmo_id.id,
2261 length: 1,
2262 ..Default::default()
2263 })
2264 .await
2265 .unwrap();
2266
2267 writer
2268 .write_entries(&BlockFifoRequest {
2269 command: BlockFifoCommand {
2270 opcode: BlockOpcode::Read.into_primitive(),
2271 ..Default::default()
2272 },
2273 reqid: 2,
2274 dev_offset: 2,
2275 vmoid: vmo_id.id,
2276 length: 1,
2277 ..Default::default()
2278 })
2279 .await
2280 .unwrap();
2281
2282 poll_fn(|cx: &mut Context<'_>| {
2284 if waiting_readers.lock().len() == 2 {
2285 Poll::Ready(())
2286 } else {
2287 cx.waker().wake_by_ref();
2289 Poll::Pending
2290 }
2291 })
2292 .await;
2293
2294 let mut response = BlockFifoResponse::default();
2295 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2296
2297 let (id, tx) = waiting_readers.lock().pop().unwrap();
2298 tx.send(()).unwrap();
2299
2300 reader.read_entries(&mut response).await.unwrap();
2301 assert_eq!(response.status, zx::sys::ZX_OK);
2302 assert_eq!(response.reqid, id);
2303
2304 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2305
2306 let (id, tx) = waiting_readers.lock().pop().unwrap();
2307 tx.send(()).unwrap();
2308
2309 reader.read_entries(&mut response).await.unwrap();
2310 assert_eq!(response.status, zx::sys::ZX_OK);
2311 assert_eq!(response.reqid, id);
2312 }
2313 );
2314 }
2315
2316 #[fuchsia::test]
2317 async fn test_groups() {
2318 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2319
2320 futures::join!(
2321 async move {
2322 let block_server = BlockServer::new(
2323 BLOCK_SIZE,
2324 Arc::new(MockInterface {
2325 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2326 ..MockInterface::default()
2327 }),
2328 );
2329 block_server.handle_requests(stream).await.unwrap();
2330 },
2331 async move {
2332 let (session_proxy, server) = fidl::endpoints::create_proxy();
2333
2334 proxy.open_session(server).unwrap();
2335
2336 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2337 let vmo_id = session_proxy
2338 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2339 .await
2340 .unwrap()
2341 .unwrap();
2342
2343 let mut fifo =
2344 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2345 let (mut reader, mut writer) = fifo.async_io();
2346
2347 writer
2348 .write_entries(&BlockFifoRequest {
2349 command: BlockFifoCommand {
2350 opcode: BlockOpcode::Read.into_primitive(),
2351 flags: BlockIoFlag::GROUP_ITEM.bits(),
2352 ..Default::default()
2353 },
2354 group: 1,
2355 vmoid: vmo_id.id,
2356 length: 1,
2357 ..Default::default()
2358 })
2359 .await
2360 .unwrap();
2361
2362 writer
2363 .write_entries(&BlockFifoRequest {
2364 command: BlockFifoCommand {
2365 opcode: BlockOpcode::Read.into_primitive(),
2366 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2367 ..Default::default()
2368 },
2369 reqid: 2,
2370 group: 1,
2371 vmoid: vmo_id.id,
2372 length: 1,
2373 ..Default::default()
2374 })
2375 .await
2376 .unwrap();
2377
2378 let mut response = BlockFifoResponse::default();
2379 reader.read_entries(&mut response).await.unwrap();
2380 assert_eq!(response.status, zx::sys::ZX_OK);
2381 assert_eq!(response.reqid, 2);
2382 assert_eq!(response.group, 1);
2383 }
2384 );
2385 }
2386
2387 #[fuchsia::test]
2388 async fn test_group_error() {
2389 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2390
2391 let counter = Arc::new(AtomicU64::new(0));
2392 let counter_clone = counter.clone();
2393
2394 futures::join!(
2395 async move {
2396 let block_server = BlockServer::new(
2397 BLOCK_SIZE,
2398 Arc::new(MockInterface {
2399 read_hook: Some(Box::new(move |_, _, _, _| {
2400 counter_clone.fetch_add(1, Ordering::Relaxed);
2401 Box::pin(async { Err(zx::Status::BAD_STATE) })
2402 })),
2403 ..MockInterface::default()
2404 }),
2405 );
2406 block_server.handle_requests(stream).await.unwrap();
2407 },
2408 async move {
2409 let (session_proxy, server) = fidl::endpoints::create_proxy();
2410
2411 proxy.open_session(server).unwrap();
2412
2413 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2414 let vmo_id = session_proxy
2415 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2416 .await
2417 .unwrap()
2418 .unwrap();
2419
2420 let mut fifo =
2421 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2422 let (mut reader, mut writer) = fifo.async_io();
2423
2424 writer
2425 .write_entries(&BlockFifoRequest {
2426 command: BlockFifoCommand {
2427 opcode: BlockOpcode::Read.into_primitive(),
2428 flags: BlockIoFlag::GROUP_ITEM.bits(),
2429 ..Default::default()
2430 },
2431 group: 1,
2432 vmoid: vmo_id.id,
2433 length: 1,
2434 ..Default::default()
2435 })
2436 .await
2437 .unwrap();
2438
2439 poll_fn(|cx: &mut Context<'_>| {
2441 if counter.load(Ordering::Relaxed) == 1 {
2442 Poll::Ready(())
2443 } else {
2444 cx.waker().wake_by_ref();
2446 Poll::Pending
2447 }
2448 })
2449 .await;
2450
2451 let mut response = BlockFifoResponse::default();
2452 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2453
2454 writer
2455 .write_entries(&BlockFifoRequest {
2456 command: BlockFifoCommand {
2457 opcode: BlockOpcode::Read.into_primitive(),
2458 flags: BlockIoFlag::GROUP_ITEM.bits(),
2459 ..Default::default()
2460 },
2461 group: 1,
2462 vmoid: vmo_id.id,
2463 length: 1,
2464 ..Default::default()
2465 })
2466 .await
2467 .unwrap();
2468
2469 writer
2470 .write_entries(&BlockFifoRequest {
2471 command: BlockFifoCommand {
2472 opcode: BlockOpcode::Read.into_primitive(),
2473 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2474 ..Default::default()
2475 },
2476 reqid: 2,
2477 group: 1,
2478 vmoid: vmo_id.id,
2479 length: 1,
2480 ..Default::default()
2481 })
2482 .await
2483 .unwrap();
2484
2485 reader.read_entries(&mut response).await.unwrap();
2486 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2487 assert_eq!(response.reqid, 2);
2488 assert_eq!(response.group, 1);
2489
2490 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2491
2492 assert_eq!(counter.load(Ordering::Relaxed), 1);
2494 }
2495 );
2496 }
2497
2498 #[fuchsia::test]
2499 async fn test_group_with_two_lasts() {
2500 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2501
2502 let (tx, rx) = oneshot::channel();
2503
2504 futures::join!(
2505 async move {
2506 let rx = Mutex::new(Some(rx));
2507 let block_server = BlockServer::new(
2508 BLOCK_SIZE,
2509 Arc::new(MockInterface {
2510 read_hook: Some(Box::new(move |_, _, _, _| {
2511 let rx = rx.lock().take().unwrap();
2512 Box::pin(async {
2513 let _ = rx.await;
2514 Ok(())
2515 })
2516 })),
2517 ..MockInterface::default()
2518 }),
2519 );
2520 block_server.handle_requests(stream).await.unwrap();
2521 },
2522 async move {
2523 let (session_proxy, server) = fidl::endpoints::create_proxy();
2524
2525 proxy.open_session(server).unwrap();
2526
2527 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2528 let vmo_id = session_proxy
2529 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2530 .await
2531 .unwrap()
2532 .unwrap();
2533
2534 let mut fifo =
2535 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2536 let (mut reader, mut writer) = fifo.async_io();
2537
2538 writer
2539 .write_entries(&BlockFifoRequest {
2540 command: BlockFifoCommand {
2541 opcode: BlockOpcode::Read.into_primitive(),
2542 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2543 ..Default::default()
2544 },
2545 reqid: 1,
2546 group: 1,
2547 vmoid: vmo_id.id,
2548 length: 1,
2549 ..Default::default()
2550 })
2551 .await
2552 .unwrap();
2553
2554 writer
2555 .write_entries(&BlockFifoRequest {
2556 command: BlockFifoCommand {
2557 opcode: BlockOpcode::Read.into_primitive(),
2558 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2559 ..Default::default()
2560 },
2561 reqid: 2,
2562 group: 1,
2563 vmoid: vmo_id.id,
2564 length: 1,
2565 ..Default::default()
2566 })
2567 .await
2568 .unwrap();
2569
2570 writer
2572 .write_entries(&BlockFifoRequest {
2573 command: BlockFifoCommand {
2574 opcode: BlockOpcode::CloseVmo.into_primitive(),
2575 ..Default::default()
2576 },
2577 reqid: 3,
2578 vmoid: vmo_id.id,
2579 ..Default::default()
2580 })
2581 .await
2582 .unwrap();
2583
2584 let mut response = BlockFifoResponse::default();
2586 reader.read_entries(&mut response).await.unwrap();
2587 assert_eq!(response.status, zx::sys::ZX_OK);
2588 assert_eq!(response.reqid, 3);
2589
2590 tx.send(()).unwrap();
2592
2593 let mut response = BlockFifoResponse::default();
2596 reader.read_entries(&mut response).await.unwrap();
2597 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2598 assert_eq!(response.reqid, 1);
2599 assert_eq!(response.group, 1);
2600 }
2601 );
2602 }
2603
2604 #[fuchsia::test(allow_stalls = false)]
2605 async fn test_requests_dont_block_sessions() {
2606 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2607
2608 let (tx, rx) = oneshot::channel();
2609
2610 fasync::Task::local(async move {
2611 let rx = Mutex::new(Some(rx));
2612 let block_server = BlockServer::new(
2613 BLOCK_SIZE,
2614 Arc::new(MockInterface {
2615 read_hook: Some(Box::new(move |_, _, _, _| {
2616 let rx = rx.lock().take().unwrap();
2617 Box::pin(async {
2618 let _ = rx.await;
2619 Ok(())
2620 })
2621 })),
2622 ..MockInterface::default()
2623 }),
2624 );
2625 block_server.handle_requests(stream).await.unwrap();
2626 })
2627 .detach();
2628
2629 let mut fut = pin!(async {
2630 let (session_proxy, server) = fidl::endpoints::create_proxy();
2631
2632 proxy.open_session(server).unwrap();
2633
2634 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2635 let vmo_id = session_proxy
2636 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2637 .await
2638 .unwrap()
2639 .unwrap();
2640
2641 let mut fifo =
2642 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2643 let (mut reader, mut writer) = fifo.async_io();
2644
2645 writer
2646 .write_entries(&BlockFifoRequest {
2647 command: BlockFifoCommand {
2648 opcode: BlockOpcode::Read.into_primitive(),
2649 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2650 ..Default::default()
2651 },
2652 reqid: 1,
2653 group: 1,
2654 vmoid: vmo_id.id,
2655 length: 1,
2656 ..Default::default()
2657 })
2658 .await
2659 .unwrap();
2660
2661 let mut response = BlockFifoResponse::default();
2662 reader.read_entries(&mut response).await.unwrap();
2663 assert_eq!(response.status, zx::sys::ZX_OK);
2664 });
2665
2666 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2668
2669 let mut fut2 = pin!(proxy.get_volume_info());
2670
2671 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2673
2674 let _ = tx.send(());
2677
2678 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2679 }
2680
2681 #[fuchsia::test]
2682 async fn test_request_flow_control() {
2683 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2684
2685 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2688 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2689 let event_clone = event.clone();
2690 futures::join!(
2691 async move {
2692 let block_server = BlockServer::new(
2693 BLOCK_SIZE,
2694 Arc::new(MockInterface {
2695 read_hook: Some(Box::new(move |_, _, _, _| {
2696 let event_clone = event_clone.clone();
2697 Box::pin(async move {
2698 if !event_clone.1.load(Ordering::SeqCst) {
2699 event_clone.0.listen().await;
2700 }
2701 Ok(())
2702 })
2703 })),
2704 ..MockInterface::default()
2705 }),
2706 );
2707 block_server.handle_requests(stream).await.unwrap();
2708 },
2709 async move {
2710 let (session_proxy, server) = fidl::endpoints::create_proxy();
2711
2712 proxy.open_session(server).unwrap();
2713
2714 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2715 let vmo_id = session_proxy
2716 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2717 .await
2718 .unwrap()
2719 .unwrap();
2720
2721 let mut fifo =
2722 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2723 let (mut reader, mut writer) = fifo.async_io();
2724
2725 for i in 0..MAX_REQUESTS {
2726 writer
2727 .write_entries(&BlockFifoRequest {
2728 command: BlockFifoCommand {
2729 opcode: BlockOpcode::Read.into_primitive(),
2730 ..Default::default()
2731 },
2732 reqid: (i + 1) as u32,
2733 dev_offset: i,
2734 vmoid: vmo_id.id,
2735 length: 1,
2736 ..Default::default()
2737 })
2738 .await
2739 .unwrap();
2740 }
2741 assert!(
2742 futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2743 command: BlockFifoCommand {
2744 opcode: BlockOpcode::Read.into_primitive(),
2745 ..Default::default()
2746 },
2747 reqid: u32::MAX,
2748 dev_offset: MAX_REQUESTS,
2749 vmoid: vmo_id.id,
2750 length: 1,
2751 ..Default::default()
2752 })))
2753 .is_pending()
2754 );
2755 event.1.store(true, Ordering::SeqCst);
2757 event.0.notify(usize::MAX);
2758 let mut finished_reqids = vec![];
2760 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2761 let mut response = BlockFifoResponse::default();
2762 reader.read_entries(&mut response).await.unwrap();
2763 assert_eq!(response.status, zx::sys::ZX_OK);
2764 finished_reqids.push(response.reqid);
2765 writer
2766 .write_entries(&BlockFifoRequest {
2767 command: BlockFifoCommand {
2768 opcode: BlockOpcode::Read.into_primitive(),
2769 ..Default::default()
2770 },
2771 reqid: (i + 1) as u32,
2772 dev_offset: i,
2773 vmoid: vmo_id.id,
2774 length: 1,
2775 ..Default::default()
2776 })
2777 .await
2778 .unwrap();
2779 }
2780 let mut response = BlockFifoResponse::default();
2781 for _ in 0..MAX_REQUESTS {
2782 reader.read_entries(&mut response).await.unwrap();
2783 assert_eq!(response.status, zx::sys::ZX_OK);
2784 finished_reqids.push(response.reqid);
2785 }
2786 finished_reqids.sort();
2789 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2790 let mut i = 1;
2791 for reqid in finished_reqids {
2792 assert_eq!(reqid, i);
2793 i += 1;
2794 }
2795 }
2796 );
2797 }
2798
2799 #[fuchsia::test]
2800 async fn test_passthrough_io_with_fixed_map() {
2801 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2802
2803 let expected_op = Arc::new(Mutex::new(None));
2804 let expected_op_clone = expected_op.clone();
2805 futures::join!(
2806 async {
2807 let block_server = BlockServer::new(
2808 BLOCK_SIZE,
2809 Arc::new(IoMockInterface {
2810 return_errors: false,
2811 do_checks: true,
2812 expected_op: expected_op_clone,
2813 }),
2814 );
2815 block_server.handle_requests(stream).await.unwrap();
2816 },
2817 async move {
2818 let (session_proxy, server) = fidl::endpoints::create_proxy();
2819
2820 let mapping = fblock::BlockOffsetMapping {
2821 source_block_offset: 0,
2822 target_block_offset: 10,
2823 length: 20,
2824 };
2825 proxy.open_session_with_offset_map(server, &mapping).unwrap();
2826
2827 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2828 let vmo_id = session_proxy
2829 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2830 .await
2831 .unwrap()
2832 .unwrap();
2833
2834 let mut fifo =
2835 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2836 let (mut reader, mut writer) = fifo.async_io();
2837
2838 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2840 writer
2841 .write_entries(&BlockFifoRequest {
2842 command: BlockFifoCommand {
2843 opcode: BlockOpcode::Read.into_primitive(),
2844 ..Default::default()
2845 },
2846 vmoid: vmo_id.id,
2847 dev_offset: 1,
2848 length: 2,
2849 vmo_offset: 3,
2850 ..Default::default()
2851 })
2852 .await
2853 .unwrap();
2854
2855 let mut response = BlockFifoResponse::default();
2856 reader.read_entries(&mut response).await.unwrap();
2857 assert_eq!(response.status, zx::sys::ZX_OK);
2858
2859 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2861 writer
2862 .write_entries(&BlockFifoRequest {
2863 command: BlockFifoCommand {
2864 opcode: BlockOpcode::Write.into_primitive(),
2865 ..Default::default()
2866 },
2867 vmoid: vmo_id.id,
2868 dev_offset: 4,
2869 length: 5,
2870 vmo_offset: 6,
2871 ..Default::default()
2872 })
2873 .await
2874 .unwrap();
2875
2876 reader.read_entries(&mut response).await.unwrap();
2877 assert_eq!(response.status, zx::sys::ZX_OK);
2878
2879 *expected_op.lock() = Some(ExpectedOp::Flush);
2881 writer
2882 .write_entries(&BlockFifoRequest {
2883 command: BlockFifoCommand {
2884 opcode: BlockOpcode::Flush.into_primitive(),
2885 ..Default::default()
2886 },
2887 ..Default::default()
2888 })
2889 .await
2890 .unwrap();
2891
2892 reader.read_entries(&mut response).await.unwrap();
2893 assert_eq!(response.status, zx::sys::ZX_OK);
2894
2895 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2897 writer
2898 .write_entries(&BlockFifoRequest {
2899 command: BlockFifoCommand {
2900 opcode: BlockOpcode::Trim.into_primitive(),
2901 ..Default::default()
2902 },
2903 dev_offset: 7,
2904 length: 3,
2905 ..Default::default()
2906 })
2907 .await
2908 .unwrap();
2909
2910 reader.read_entries(&mut response).await.unwrap();
2911 assert_eq!(response.status, zx::sys::ZX_OK);
2912
2913 *expected_op.lock() = None;
2915 writer
2916 .write_entries(&BlockFifoRequest {
2917 command: BlockFifoCommand {
2918 opcode: BlockOpcode::Read.into_primitive(),
2919 ..Default::default()
2920 },
2921 vmoid: vmo_id.id,
2922 dev_offset: 19,
2923 length: 2,
2924 vmo_offset: 3,
2925 ..Default::default()
2926 })
2927 .await
2928 .unwrap();
2929
2930 reader.read_entries(&mut response).await.unwrap();
2931 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2932
2933 std::mem::drop(proxy);
2934 }
2935 );
2936 }
2937
2938 #[fuchsia::test]
2939 fn operation_map() {
2940 const BLOCK_SIZE: u32 = 512;
2941
2942 #[track_caller]
2943 fn expect_map_result(
2944 mut operation: Operation,
2945 mapping: Option<BlockOffsetMapping>,
2946 max_blocks: Option<NonZero<u32>>,
2947 expected_operations: Vec<Operation>,
2948 ) {
2949 let mut ops = vec![];
2950 while let Some(remainder) =
2951 operation.map(mapping.as_ref(), max_blocks.clone(), BLOCK_SIZE)
2952 {
2953 ops.push(operation);
2954 operation = remainder;
2955 }
2956 ops.push(operation);
2957 assert_eq!(ops, expected_operations);
2958 }
2959
2960 expect_map_result(
2962 Operation::Read {
2963 device_block_offset: 10,
2964 block_count: 200,
2965 _unused: 0,
2966 vmo_offset: 0,
2967 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2968 },
2969 None,
2970 None,
2971 vec![Operation::Read {
2972 device_block_offset: 10,
2973 block_count: 200,
2974 _unused: 0,
2975 vmo_offset: 0,
2976 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2977 }],
2978 );
2979
2980 expect_map_result(
2982 Operation::Read {
2983 device_block_offset: 10,
2984 block_count: 200,
2985 _unused: 0,
2986 vmo_offset: 0,
2987 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2988 },
2989 None,
2990 NonZero::new(120),
2991 vec![
2992 Operation::Read {
2993 device_block_offset: 10,
2994 block_count: 120,
2995 _unused: 0,
2996 vmo_offset: 0,
2997 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2998 },
2999 Operation::Read {
3000 device_block_offset: 130,
3001 block_count: 80,
3002 _unused: 0,
3003 vmo_offset: 120 * BLOCK_SIZE as u64,
3004 options: ReadOptions {
3005 inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3007 },
3008 },
3009 ],
3010 );
3011 expect_map_result(
3012 Operation::Trim { device_block_offset: 10, block_count: 200 },
3013 None,
3014 NonZero::new(120),
3015 vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
3016 );
3017
3018 expect_map_result(
3020 Operation::Read {
3021 device_block_offset: 10,
3022 block_count: 200,
3023 _unused: 0,
3024 vmo_offset: 0,
3025 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3026 },
3027 Some(BlockOffsetMapping {
3028 source_block_offset: 10,
3029 target_block_offset: 100,
3030 length: 200,
3031 }),
3032 NonZero::new(120),
3033 vec![
3034 Operation::Read {
3035 device_block_offset: 100,
3036 block_count: 120,
3037 _unused: 0,
3038 vmo_offset: 0,
3039 options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3040 },
3041 Operation::Read {
3042 device_block_offset: 220,
3043 block_count: 80,
3044 _unused: 0,
3045 vmo_offset: 120 * BLOCK_SIZE as u64,
3046 options: ReadOptions {
3047 inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3048 },
3049 },
3050 ],
3051 );
3052 expect_map_result(
3053 Operation::Trim { device_block_offset: 10, block_count: 200 },
3054 Some(BlockOffsetMapping {
3055 source_block_offset: 10,
3056 target_block_offset: 100,
3057 length: 200,
3058 }),
3059 NonZero::new(120),
3060 vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3061 );
3062 }
3063
3064 #[fuchsia::test]
3066 async fn test_pre_barrier_flush_failure() {
3067 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3068
3069 struct NoBarrierInterface;
3070 impl super::async_interface::Interface for NoBarrierInterface {
3071 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3072 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3073 device_flags: fblock::DeviceFlag::empty(), max_transfer_blocks: NonZero::new(100),
3075 block_range: Some(0..100),
3076 type_guid: [0; 16],
3077 instance_guid: [0; 16],
3078 name: "test".to_string(),
3079 flags: 0,
3080 }))
3081 }
3082 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3083 Ok(())
3084 }
3085 async fn read(
3086 &self,
3087 _: u64,
3088 _: u32,
3089 _: &Arc<zx::Vmo>,
3090 _: u64,
3091 _: ReadOptions,
3092 _: TraceFlowId,
3093 ) -> Result<(), zx::Status> {
3094 unreachable!()
3095 }
3096 async fn write(
3097 &self,
3098 _: u64,
3099 _: u32,
3100 _: &Arc<zx::Vmo>,
3101 _: u64,
3102 _: WriteOptions,
3103 _: TraceFlowId,
3104 ) -> Result<(), zx::Status> {
3105 panic!("Write should not be called");
3106 }
3107 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3108 Err(zx::Status::IO)
3109 }
3110 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3111 unreachable!()
3112 }
3113 }
3114
3115 futures::join!(
3116 async move {
3117 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3118 block_server.handle_requests(stream).await.unwrap();
3119 },
3120 async move {
3121 let (session_proxy, server) = fidl::endpoints::create_proxy();
3122 proxy.open_session(server).unwrap();
3123 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3124 let vmo_id = session_proxy
3125 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3126 .await
3127 .unwrap()
3128 .unwrap();
3129
3130 let mut fifo =
3131 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3132 let (mut reader, mut writer) = fifo.async_io();
3133
3134 writer
3135 .write_entries(&BlockFifoRequest {
3136 command: BlockFifoCommand {
3137 opcode: BlockOpcode::Write.into_primitive(),
3138 flags: BlockIoFlag::PRE_BARRIER.bits(),
3139 ..Default::default()
3140 },
3141 vmoid: vmo_id.id,
3142 length: 1,
3143 ..Default::default()
3144 })
3145 .await
3146 .unwrap();
3147
3148 let mut response = BlockFifoResponse::default();
3149 reader.read_entries(&mut response).await.unwrap();
3150 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3151 }
3152 );
3153 }
3154
3155 #[fuchsia::test]
3158 async fn test_post_barrier_write_failure() {
3159 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3160
3161 struct NoBarrierInterface;
3162 impl super::async_interface::Interface for NoBarrierInterface {
3163 fn get_info(&self) -> Cow<'_, DeviceInfo> {
3164 Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3165 device_flags: fblock::DeviceFlag::empty(), max_transfer_blocks: NonZero::new(100),
3167 block_range: Some(0..100),
3168 type_guid: [0; 16],
3169 instance_guid: [0; 16],
3170 name: "test".to_string(),
3171 flags: 0,
3172 }))
3173 }
3174 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3175 Ok(())
3176 }
3177 async fn read(
3178 &self,
3179 _: u64,
3180 _: u32,
3181 _: &Arc<zx::Vmo>,
3182 _: u64,
3183 _: ReadOptions,
3184 _: TraceFlowId,
3185 ) -> Result<(), zx::Status> {
3186 unreachable!()
3187 }
3188 async fn write(
3189 &self,
3190 _: u64,
3191 _: u32,
3192 _: &Arc<zx::Vmo>,
3193 _: u64,
3194 _: WriteOptions,
3195 _: TraceFlowId,
3196 ) -> Result<(), zx::Status> {
3197 Err(zx::Status::IO)
3198 }
3199 async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3200 panic!("Flush should not be called")
3201 }
3202 async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3203 unreachable!()
3204 }
3205 }
3206
3207 futures::join!(
3208 async move {
3209 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3210 block_server.handle_requests(stream).await.unwrap();
3211 },
3212 async move {
3213 let (session_proxy, server) = fidl::endpoints::create_proxy();
3214 proxy.open_session(server).unwrap();
3215 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3216 let vmo_id = session_proxy
3217 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3218 .await
3219 .unwrap()
3220 .unwrap();
3221
3222 let mut fifo =
3223 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3224 let (mut reader, mut writer) = fifo.async_io();
3225
3226 writer
3227 .write_entries(&BlockFifoRequest {
3228 command: BlockFifoCommand {
3229 opcode: BlockOpcode::Write.into_primitive(),
3230 flags: BlockIoFlag::FORCE_ACCESS.bits(),
3231 ..Default::default()
3232 },
3233 vmoid: vmo_id.id,
3234 length: 1,
3235 ..Default::default()
3236 })
3237 .await
3238 .unwrap();
3239
3240 let mut response = BlockFifoResponse::default();
3241 reader.read_entries(&mut response).await.unwrap();
3242 assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3243 }
3244 );
3245 }
3246}