1use crate::bin::Bin;
5use anyhow::{Error, anyhow};
6use block_protocol::{BlockFifoRequest, BlockFifoResponse};
7use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
8use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
9use fuchsia_sync::Mutex;
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18use zx::HandleBased;
19use {
20 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
21 fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
22};
23
24pub mod async_interface;
25mod bin;
26pub mod c_interface;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34 Block(BlockInfo),
35 Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39 pub fn block_count(&self) -> Option<u64> {
40 match self {
41 Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
42 Self::Partition(PartitionInfo { block_range, .. }) => {
43 block_range.as_ref().map(|range| range.end - range.start)
44 }
45 }
46 }
47
48 pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
49 match self {
50 Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
51 Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
52 max_transfer_blocks.clone()
53 }
54 }
55 }
56
57 fn max_transfer_size(&self, block_size: u32) -> u32 {
58 if let Some(max_blocks) = self.max_transfer_blocks() {
59 max_blocks.get() * block_size
60 } else {
61 MAX_TRANSFER_UNBOUNDED
62 }
63 }
64}
65
66#[derive(Clone, Default)]
68pub struct BlockInfo {
69 pub device_flags: fblock::Flag,
70 pub block_count: u64,
71 pub max_transfer_blocks: Option<NonZero<u32>>,
72}
73
74#[derive(Clone, Default)]
76pub struct PartitionInfo {
77 pub device_flags: fblock::Flag,
79 pub max_transfer_blocks: Option<NonZero<u32>>,
80 pub block_range: Option<Range<u64>>,
84 pub type_guid: [u8; 16],
85 pub instance_guid: [u8; 16],
86 pub name: String,
87 pub flags: u64,
88}
89
90struct ActiveRequest<S> {
93 session: S,
94 group_or_request: GroupOrRequest,
95 trace_flow_id: TraceFlowId,
96 vmo_bin_key: Option<usize>,
97 status: zx::Status,
98 count: u32,
99 req_id: Option<u32>,
100}
101
102pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
103
104impl<S> Default for ActiveRequests<S> {
105 fn default() -> Self {
106 Self(Mutex::new(ActiveRequestsInner { requests: Slab::default(), vmo_bin: Bin::new() }))
107 }
108}
109
110impl<S> ActiveRequests<S> {
111 fn complete_and_take_response(
112 &self,
113 request_id: RequestId,
114 status: zx::Status,
115 ) -> Option<(S, BlockFifoResponse)> {
116 self.0.lock().complete_and_take_response(request_id, status)
117 }
118}
119
120struct ActiveRequestsInner<S> {
121 requests: Slab<ActiveRequest<S>>,
122 vmo_bin: Bin<Arc<zx::Vmo>>,
123}
124
125impl<S> ActiveRequestsInner<S> {
127 fn complete(&mut self, request_id: RequestId, status: zx::Status) {
129 let group = &mut self.requests[request_id.0];
130
131 group.count = group.count.checked_sub(1).unwrap();
132 if status != zx::Status::OK && group.status == zx::Status::OK {
133 group.status = status
134 }
135
136 fuchsia_trace::duration!(
137 c"storage",
138 c"block_server::finish_transaction",
139 "request_id" => request_id.0,
140 "group_completed" => group.count == 0,
141 "status" => status.into_raw());
142 if let Some(trace_flow_id) = group.trace_flow_id {
143 fuchsia_trace::flow_step!(
144 c"storage",
145 c"block_server::finish_request",
146 trace_flow_id.get().into()
147 );
148 }
149 }
150
151 fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
153 let group = &self.requests[request_id.0];
154 match group.req_id {
155 Some(reqid) if group.count == 0 => {
156 let group = self.requests.remove(request_id.0);
157 if let Some(vmo_bin_key) = group.vmo_bin_key {
158 self.vmo_bin.release(vmo_bin_key);
159 }
160 Some((
161 group.session,
162 BlockFifoResponse {
163 status: group.status.into_raw(),
164 reqid,
165 group: group.group_or_request.group_id().unwrap_or(0),
166 ..Default::default()
167 },
168 ))
169 }
170 _ => None,
171 }
172 }
173
174 fn complete_and_take_response(
176 &mut self,
177 request_id: RequestId,
178 status: zx::Status,
179 ) -> Option<(S, BlockFifoResponse)> {
180 self.complete(request_id, status);
181 self.take_response(request_id)
182 }
183}
184
185pub struct BlockServer<SM> {
188 block_size: u32,
189 session_manager: Arc<SM>,
190}
191
192#[derive(Debug)]
194pub struct BlockOffsetMapping {
195 source_block_offset: u64,
196 target_block_offset: u64,
197 length: u64,
198}
199
200impl BlockOffsetMapping {
201 fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
202 blocks.0 >= self.source_block_offset
203 && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
204 }
205}
206
207impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
208 type Error = zx::Status;
209
210 fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
211 wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
212 wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
213 Ok(Self {
214 source_block_offset: wire.source_block_offset,
215 target_block_offset: wire.target_block_offset,
216 length: wire.length,
217 })
218 }
219}
220
221pub struct OffsetMap {
223 mapping: Option<BlockOffsetMapping>,
224 max_transfer_blocks: Option<NonZero<u32>>,
225}
226
227impl OffsetMap {
228 pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
230 Self { mapping: Some(mapping), max_transfer_blocks }
231 }
232
233 pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
235 Self { mapping: None, max_transfer_blocks }
236 }
237
238 pub fn is_empty(&self) -> bool {
239 self.mapping.is_none()
240 }
241
242 fn mapping(&self) -> Option<&BlockOffsetMapping> {
243 self.mapping.as_ref()
244 }
245
246 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
247 self.max_transfer_blocks
248 }
249}
250
251pub trait SessionManager: 'static {
254 type Session;
255
256 fn on_attach_vmo(
257 self: Arc<Self>,
258 vmo: &Arc<zx::Vmo>,
259 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
260
261 fn open_session(
266 self: Arc<Self>,
267 stream: fblock::SessionRequestStream,
268 offset_map: OffsetMap,
269 block_size: u32,
270 ) -> impl Future<Output = Result<(), Error>> + Send;
271
272 fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
274
275 fn get_volume_info(
277 &self,
278 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
279 {
280 async { Err(zx::Status::NOT_SUPPORTED) }
281 }
282
283 fn query_slices(
285 &self,
286 _start_slices: &[u64],
287 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
288 async { Err(zx::Status::NOT_SUPPORTED) }
289 }
290
291 fn extend(
293 &self,
294 _start_slice: u64,
295 _slice_count: u64,
296 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
297 async { Err(zx::Status::NOT_SUPPORTED) }
298 }
299
300 fn shrink(
302 &self,
303 _start_slice: u64,
304 _slice_count: u64,
305 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
306 async { Err(zx::Status::NOT_SUPPORTED) }
307 }
308
309 fn active_requests(&self) -> &ActiveRequests<Self::Session>;
311}
312
313pub trait IntoSessionManager {
314 type SM: SessionManager;
315
316 fn into_session_manager(self) -> Arc<Self::SM>;
317}
318
319impl<SM: SessionManager> BlockServer<SM> {
320 pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
321 Self { block_size, session_manager: session_manager.into_session_manager() }
322 }
323
324 pub fn session_manager(&self) -> &SM {
325 self.session_manager.as_ref()
326 }
327
328 pub async fn handle_requests(
330 &self,
331 mut requests: fvolume::VolumeRequestStream,
332 ) -> Result<(), Error> {
333 let scope = fasync::Scope::new();
334 loop {
335 match requests.try_next().await {
336 Ok(Some(request)) => {
337 if let Some(session) = self.handle_request(request).await? {
338 scope.spawn(session.map(|_| ()));
339 }
340 }
341 Ok(None) => break,
342 Err(err) => log::warn!(err:?; "Invalid request"),
343 }
344 }
345 scope.await;
346 Ok(())
347 }
348
349 async fn handle_request(
352 &self,
353 request: fvolume::VolumeRequest,
354 ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
355 match request {
356 fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
357 Ok(info) => {
358 let max_transfer_size = info.max_transfer_size(self.block_size);
359 let (block_count, flags) = match info.as_ref() {
360 DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
361 (*block_count, *device_flags)
362 }
363 DeviceInfo::Partition(partition_info) => {
364 let block_count = if let Some(range) =
365 partition_info.block_range.as_ref()
366 {
367 range.end - range.start
368 } else {
369 let volume_info = self.session_manager.get_volume_info().await?;
370 volume_info.0.slice_size * volume_info.1.partition_slice_count
371 / self.block_size as u64
372 };
373 (block_count, partition_info.device_flags)
374 }
375 };
376 responder.send(Ok(&fblock::BlockInfo {
377 block_count,
378 block_size: self.block_size,
379 max_transfer_size,
380 flags,
381 }))?;
382 }
383 Err(status) => responder.send(Err(status.into_raw()))?,
384 },
385 fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
386 match self.device_info().await {
387 Ok(info) => {
388 return Ok(Some(self.session_manager.clone().open_session(
389 session.into_stream(),
390 OffsetMap::empty(info.max_transfer_blocks()),
391 self.block_size,
392 )));
393 }
394 Err(status) => session.close_with_epitaph(status)?,
395 }
396 }
397 fvolume::VolumeRequest::OpenSessionWithOffsetMap {
398 session,
399 mapping,
400 control_handle: _,
401 } => match self.device_info().await {
402 Ok(info) => {
403 let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
404 Ok(m) => m,
405 Err(status) => {
406 session.close_with_epitaph(status)?;
407 return Ok(None);
408 }
409 };
410 if let Some(max) = info.block_count() {
411 if initial_mapping.target_block_offset + initial_mapping.length > max {
412 log::warn!(
413 "Invalid mapping for session: {initial_mapping:?} (max {max})"
414 );
415 session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
416 return Ok(None);
417 }
418 }
419 return Ok(Some(self.session_manager.clone().open_session(
420 session.into_stream(),
421 OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
422 self.block_size,
423 )));
424 }
425 Err(status) => session.close_with_epitaph(status)?,
426 },
427 fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
428 Ok(info) => {
429 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
430 let mut guid =
431 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
432 guid.value.copy_from_slice(&partition_info.type_guid);
433 responder.send(zx::sys::ZX_OK, Some(&guid))?;
434 } else {
435 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
436 }
437 }
438 Err(status) => {
439 responder.send(status.into_raw(), None)?;
440 }
441 },
442 fvolume::VolumeRequest::GetInstanceGuid { responder } => {
443 match self.device_info().await {
444 Ok(info) => {
445 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
446 let mut guid =
447 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
448 guid.value.copy_from_slice(&partition_info.instance_guid);
449 responder.send(zx::sys::ZX_OK, Some(&guid))?;
450 } else {
451 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
452 }
453 }
454 Err(status) => {
455 responder.send(status.into_raw(), None)?;
456 }
457 }
458 }
459 fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
460 Ok(info) => {
461 if let DeviceInfo::Partition(partition_info) = info.as_ref() {
462 responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
463 } else {
464 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
465 }
466 }
467 Err(status) => {
468 responder.send(status.into_raw(), None)?;
469 }
470 },
471 fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
472 Ok(info) => {
473 if let DeviceInfo::Partition(info) = info.as_ref() {
474 let mut type_guid =
475 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
476 type_guid.value.copy_from_slice(&info.type_guid);
477 let mut instance_guid =
478 fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
479 instance_guid.value.copy_from_slice(&info.instance_guid);
480 responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
481 name: Some(info.name.clone()),
482 type_guid: Some(type_guid),
483 instance_guid: Some(instance_guid),
484 start_block_offset: info.block_range.as_ref().map(|range| range.start),
485 num_blocks: info
486 .block_range
487 .as_ref()
488 .map(|range| range.end - range.start),
489 flags: Some(info.flags),
490 ..Default::default()
491 }))?;
492 }
493 }
494 Err(status) => responder.send(Err(status.into_raw()))?,
495 },
496 fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
497 match self.session_manager.query_slices(&start_slices).await {
498 Ok(mut results) => {
499 let results_len = results.len();
500 assert!(results_len <= 16);
501 results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
502 responder.send(
503 zx::sys::ZX_OK,
504 &results.try_into().unwrap(),
505 results_len as u64,
506 )?;
507 }
508 Err(s) => {
509 responder.send(
510 s.into_raw(),
511 &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
512 0,
513 )?;
514 }
515 }
516 }
517 fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
518 match self.session_manager.get_volume_info().await {
519 Ok((manager_info, volume_info)) => {
520 responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
521 }
522 Err(s) => responder.send(s.into_raw(), None, None)?,
523 }
524 }
525 fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
526 responder.send(
527 zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
528 .into_raw(),
529 )?;
530 }
531 fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
532 responder.send(
533 zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
534 .into_raw(),
535 )?;
536 }
537 fvolume::VolumeRequest::Destroy { responder, .. } => {
538 responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
539 }
540 }
541 Ok(None)
542 }
543
544 async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
545 self.session_manager.get_info().await
546 }
547}
548
549struct SessionHelper<SM: SessionManager> {
550 session_manager: Arc<SM>,
551 offset_map: OffsetMap,
552 block_size: u32,
553 peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
554 vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
555}
556
557impl<SM: SessionManager> SessionHelper<SM> {
558 fn new(
559 session_manager: Arc<SM>,
560 offset_map: OffsetMap,
561 block_size: u32,
562 ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
563 let (peer_fifo, fifo) = zx::Fifo::create(16)?;
564 Ok((
565 Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
566 fifo,
567 ))
568 }
569
570 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
571 match request {
572 fblock::SessionRequest::GetFifo { responder } => {
573 let rights = zx::Rights::TRANSFER
574 | zx::Rights::READ
575 | zx::Rights::WRITE
576 | zx::Rights::SIGNAL
577 | zx::Rights::WAIT;
578 match self.peer_fifo.duplicate_handle(rights) {
579 Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
580 Err(s) => responder.send(Err(s.into_raw()))?,
581 }
582 Ok(())
583 }
584 fblock::SessionRequest::AttachVmo { vmo, responder } => {
585 let vmo = Arc::new(vmo);
586 let vmo_id = {
587 let mut vmos = self.vmos.lock();
588 if vmos.len() == u16::MAX as usize {
589 responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
590 return Ok(());
591 } else {
592 let vmo_id = match vmos.last_entry() {
593 None => 1,
594 Some(o) => {
595 o.key().checked_add(1).unwrap_or_else(|| {
596 let mut vmo_id = 1;
597 for (&id, _) in &*vmos {
599 if id > vmo_id {
600 break;
601 }
602 vmo_id = id + 1;
603 }
604 vmo_id
605 })
606 }
607 };
608 vmos.insert(vmo_id, vmo.clone());
609 vmo_id
610 }
611 };
612 self.session_manager.clone().on_attach_vmo(&vmo).await?;
613 responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
614 Ok(())
615 }
616 fblock::SessionRequest::Close { responder } => {
617 responder.send(Ok(()))?;
618 Err(anyhow!("Closed"))
619 }
620 }
621 }
622
623 fn decode_fifo_request(
625 &self,
626 session: SM::Session,
627 request: &BlockFifoRequest,
628 ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
629 let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
630 let mut operation = BlockOpcode::from_primitive(request.command.opcode)
631 .ok_or(zx::Status::INVALID_ARGS)
632 .and_then(|code| {
633 if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
634 if request.length == 0 {
635 return Err(zx::Status::INVALID_ARGS);
636 }
637 if request.dev_offset.checked_add(request.length as u64).is_none()
639 || (code != BlockOpcode::Trim
640 && (request.length as u64 * self.block_size as u64)
641 .checked_add(request.vmo_offset)
642 .is_none())
643 {
644 return Err(zx::Status::OUT_OF_RANGE);
645 }
646 }
647 Ok(match code {
648 BlockOpcode::Read => Operation::Read {
649 device_block_offset: request.dev_offset,
650 block_count: request.length,
651 _unused: 0,
652 vmo_offset: request
653 .vmo_offset
654 .checked_mul(self.block_size as u64)
655 .ok_or(zx::Status::OUT_OF_RANGE)?,
656 },
657 BlockOpcode::Write => {
658 let mut options = WriteOptions::default();
659 if flags.contains(BlockIoFlag::FORCE_ACCESS) {
660 options.flags |= WriteFlags::FORCE_ACCESS;
661 }
662 if flags.contains(BlockIoFlag::PRE_BARRIER) {
663 options.flags |= WriteFlags::PRE_BARRIER;
664 }
665 Operation::Write {
666 device_block_offset: request.dev_offset,
667 block_count: request.length,
668 _unused: 0,
669 options: options,
670 vmo_offset: request
671 .vmo_offset
672 .checked_mul(self.block_size as u64)
673 .ok_or(zx::Status::OUT_OF_RANGE)?,
674 }
675 }
676 BlockOpcode::Flush => Operation::Flush,
677 BlockOpcode::Trim => Operation::Trim {
678 device_block_offset: request.dev_offset,
679 block_count: request.length,
680 },
681 BlockOpcode::CloseVmo => Operation::CloseVmo,
682 })
683 });
684
685 let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
686 GroupOrRequest::Group(request.group)
687 } else {
688 GroupOrRequest::Request(request.reqid)
689 };
690
691 let mut active_requests = self.session_manager.active_requests().0.lock();
692 let mut request_id = None;
693
694 if group_or_request.is_group() {
704 for (key, group) in &mut active_requests.requests {
708 if group.group_or_request == group_or_request {
709 if group.req_id.is_some() {
710 if group.status == zx::Status::OK {
712 group.status = zx::Status::INVALID_ARGS;
713 }
714 return Err(None);
716 }
717 if flags.contains(BlockIoFlag::GROUP_LAST) {
718 group.req_id = Some(request.reqid);
719 if group.status != zx::Status::OK {
722 operation = Err(group.status);
723 }
724 } else if group.status != zx::Status::OK {
725 return Err(None);
728 }
729 request_id = Some(RequestId(key));
730 group.count += 1;
731 break;
732 }
733 }
734 }
735
736 let mut retain_vmo = false;
737 let vmo = match &operation {
738 Ok(Operation::Read { .. } | Operation::Write { .. }) => {
739 self.vmos.lock().get(&request.vmoid).cloned().map_or(Err(zx::Status::IO), |vmo| {
740 retain_vmo = true;
741 Ok(Some(vmo))
742 })
743 }
744 Ok(Operation::CloseVmo) => {
745 self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |vmo| {
746 active_requests.vmo_bin.add(vmo.clone());
747 Ok(Some(vmo))
748 })
749 }
750 _ => Ok(None),
751 }
752 .unwrap_or_else(|e| {
753 operation = Err(e);
754 None
755 });
756
757 let trace_flow_id = NonZero::new(request.trace_flow_id);
758 let request_id = request_id.unwrap_or_else(|| {
759 let vmo_bin_key =
760 if retain_vmo { Some(active_requests.vmo_bin.retain()) } else { None };
761 RequestId(active_requests.requests.insert(ActiveRequest {
762 session,
763 group_or_request,
764 trace_flow_id,
765 vmo_bin_key,
766 status: zx::Status::OK,
767 count: 1,
768 req_id: if !flags.contains(BlockIoFlag::GROUP_ITEM)
769 || flags.contains(BlockIoFlag::GROUP_LAST)
770 {
771 Some(request.reqid)
772 } else {
773 None
774 },
775 }))
776 });
777
778 Ok(DecodedRequest {
779 request_id,
780 trace_flow_id,
781 operation: operation.map_err(|status| {
782 active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
783 })?,
784 vmo,
785 })
786 }
787
788 fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
789 std::mem::take(&mut *self.vmos.lock())
790 }
791
792 fn map_request(
794 &self,
795 mut request: DecodedRequest,
796 ) -> Result<(DecodedRequest, Option<DecodedRequest>), Option<BlockFifoResponse>> {
797 let mut active_requests = self.session_manager.active_requests().0.lock();
798 let active_request = &mut active_requests.requests[request.request_id.0];
799 if active_request.status != zx::Status::OK {
800 return Err(active_requests
801 .complete_and_take_response(request.request_id, zx::Status::BAD_STATE)
802 .map(|(_, r)| r));
803 }
804 let mapping = self.offset_map.mapping();
805 match (mapping, request.operation.blocks()) {
806 (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
807 return Err(active_requests
808 .complete_and_take_response(request.request_id, zx::Status::OUT_OF_RANGE)
809 .map(|(_, r)| r));
810 }
811 _ => {}
812 }
813 let remainder = request.operation.map(
814 self.offset_map.mapping(),
815 self.offset_map.max_transfer_blocks(),
816 self.block_size,
817 );
818 if remainder.is_some() {
819 active_request.count += 1;
820 }
821 static CACHE: AtomicU64 = AtomicU64::new(0);
822 if let Some(context) =
823 fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
824 {
825 use fuchsia_trace::ArgValue;
826 let trace_args = [
827 ArgValue::of("request_id", request.request_id.0),
828 ArgValue::of("opcode", request.operation.trace_label()),
829 ];
830 let _scope = fuchsia_trace::duration(
831 c"storage",
832 c"block_server::start_transaction",
833 &trace_args,
834 );
835 if let Some(trace_flow_id) = active_request.trace_flow_id {
836 fuchsia_trace::flow_step(
837 &context,
838 c"block_server::start_transaction",
839 trace_flow_id.get().into(),
840 &[],
841 );
842 }
843 }
844 let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
845 Ok((request, remainder))
846 }
847
848 fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
849 self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
850 }
851}
852
853#[repr(transparent)]
854#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
855pub struct RequestId(usize);
856
857#[derive(Clone, Debug)]
858struct DecodedRequest {
859 request_id: RequestId,
860 trace_flow_id: TraceFlowId,
861 operation: Operation,
862 vmo: Option<Arc<zx::Vmo>>,
863}
864
865pub type WriteFlags = block_protocol::WriteFlags;
867pub type WriteOptions = block_protocol::WriteOptions;
868
869#[repr(C)]
870#[derive(Clone, Debug, PartialEq, Eq)]
871pub enum Operation {
872 Read {
877 device_block_offset: u64,
878 block_count: u32,
879 _unused: u32,
880 vmo_offset: u64,
881 },
882 Write {
883 device_block_offset: u64,
884 block_count: u32,
885 _unused: u32,
886 vmo_offset: u64,
887 options: WriteOptions,
888 },
889 Flush,
890 Trim {
891 device_block_offset: u64,
892 block_count: u32,
893 },
894 CloseVmo,
896}
897
898impl Operation {
899 fn trace_label(&self) -> &'static str {
900 match self {
901 Operation::Read { .. } => "read",
902 Operation::Write { .. } => "write",
903 Operation::Flush { .. } => "flush",
904 Operation::Trim { .. } => "trim",
905 Operation::CloseVmo { .. } => "close_vmo",
906 }
907 }
908
909 fn blocks(&self) -> Option<(u64, u32)> {
911 match self {
912 Operation::Read { device_block_offset, block_count, .. }
913 | Operation::Write { device_block_offset, block_count, .. }
914 | Operation::Trim { device_block_offset, block_count, .. } => {
915 Some((*device_block_offset, *block_count))
916 }
917 _ => None,
918 }
919 }
920
921 fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
923 match self {
924 Operation::Read { device_block_offset, block_count, .. }
925 | Operation::Write { device_block_offset, block_count, .. }
926 | Operation::Trim { device_block_offset, block_count, .. } => {
927 Some((device_block_offset, block_count))
928 }
929 _ => None,
930 }
931 }
932
933 fn map(
936 &mut self,
937 mapping: Option<&BlockOffsetMapping>,
938 max_blocks: Option<NonZero<u32>>,
939 block_size: u32,
940 ) -> Option<Self> {
941 let mut max = match self {
942 Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
943 _ => None,
944 };
945 let (offset, length) = self.blocks_mut()?;
946 let orig_offset = *offset;
947 if let Some(mapping) = mapping {
948 let delta = *offset - mapping.source_block_offset;
949 debug_assert!(*offset - mapping.source_block_offset < mapping.length);
950 *offset = mapping.target_block_offset + delta;
951 let mapping_max = mapping.target_block_offset + mapping.length - *offset;
952 max = match max {
953 None => Some(mapping_max),
954 Some(m) => Some(std::cmp::min(m, mapping_max)),
955 };
956 };
957 if let Some(max) = max {
958 if *length as u64 > max {
959 let rem = (*length as u64 - max) as u32;
960 *length = max as u32;
961 return Some(match self {
962 Operation::Read {
963 device_block_offset: _,
964 block_count: _,
965 vmo_offset,
966 _unused,
967 } => Operation::Read {
968 device_block_offset: orig_offset + max,
969 block_count: rem,
970 vmo_offset: *vmo_offset + max * block_size as u64,
971 _unused: *_unused,
972 },
973 Operation::Write {
974 device_block_offset: _,
975 block_count: _,
976 _unused,
977 vmo_offset,
978 options,
979 } => {
980 let new_options = WriteOptions {
982 flags: options.flags & !WriteFlags::PRE_BARRIER,
983 ..*options
984 };
985
986 Operation::Write {
987 device_block_offset: orig_offset + max,
988 block_count: rem,
989 _unused: *_unused,
990 vmo_offset: *vmo_offset + max * block_size as u64,
991 options: new_options,
992 }
993 }
994 Operation::Trim { device_block_offset: _, block_count: _ } => {
995 Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
996 }
997 _ => unreachable!(),
998 });
999 }
1000 }
1001 None
1002 }
1003}
1004
1005#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1006pub enum GroupOrRequest {
1007 Group(u16),
1008 Request(u32),
1009}
1010
1011impl GroupOrRequest {
1012 fn is_group(&self) -> bool {
1013 matches!(self, Self::Group(_))
1014 }
1015
1016 fn group_id(&self) -> Option<u16> {
1017 match self {
1018 Self::Group(id) => Some(*id),
1019 Self::Request(_) => None,
1020 }
1021 }
1022}
1023
1024#[cfg(test)]
1025mod tests {
1026 use super::{
1027 BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1028 TraceFlowId,
1029 };
1030 use assert_matches::assert_matches;
1031 use block_protocol::{
1032 BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteFlags, WriteOptions,
1033 };
1034 use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1035 use fuchsia_sync::Mutex;
1036 use futures::FutureExt as _;
1037 use futures::channel::oneshot;
1038 use futures::future::BoxFuture;
1039 use std::borrow::Cow;
1040 use std::future::poll_fn;
1041 use std::num::NonZero;
1042 use std::pin::pin;
1043 use std::sync::Arc;
1044 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1045 use std::task::{Context, Poll};
1046 use zx::{AsHandleRef as _, HandleBased as _};
1047 use {
1048 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1049 fuchsia_async as fasync,
1050 };
1051
1052 #[derive(Default)]
1053 struct MockInterface {
1054 read_hook: Option<
1055 Box<
1056 dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1057 + Send
1058 + Sync,
1059 >,
1060 >,
1061 write_hook:
1062 Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1063 barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1064 }
1065
1066 impl super::async_interface::Interface for MockInterface {
1067 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1068 Ok(())
1069 }
1070
1071 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1072 Ok(Cow::Owned(test_device_info()))
1073 }
1074
1075 async fn read(
1076 &self,
1077 device_block_offset: u64,
1078 block_count: u32,
1079 vmo: &Arc<zx::Vmo>,
1080 vmo_offset: u64,
1081 _trace_flow_id: TraceFlowId,
1082 ) -> Result<(), zx::Status> {
1083 if let Some(read_hook) = &self.read_hook {
1084 read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1085 } else {
1086 unimplemented!();
1087 }
1088 }
1089
1090 async fn write(
1091 &self,
1092 device_block_offset: u64,
1093 _block_count: u32,
1094 _vmo: &Arc<zx::Vmo>,
1095 _vmo_offset: u64,
1096 _opts: WriteOptions,
1097 _trace_flow_id: TraceFlowId,
1098 ) -> Result<(), zx::Status> {
1099 if let Some(write_hook) = &self.write_hook {
1100 write_hook(device_block_offset).await
1101 } else {
1102 unimplemented!();
1103 }
1104 }
1105
1106 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1107 unreachable!();
1108 }
1109
1110 fn barrier(&self) -> Result<(), zx::Status> {
1111 if let Some(barrier_hook) = &self.barrier_hook {
1112 barrier_hook()
1113 } else {
1114 unimplemented!();
1115 }
1116 }
1117
1118 async fn trim(
1119 &self,
1120 _device_block_offset: u64,
1121 _block_count: u32,
1122 _trace_flow_id: TraceFlowId,
1123 ) -> Result<(), zx::Status> {
1124 unreachable!();
1125 }
1126
1127 async fn get_volume_info(
1128 &self,
1129 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1130 let () = std::future::pending().await;
1132 unreachable!();
1133 }
1134 }
1135
1136 const BLOCK_SIZE: u32 = 512;
1137 const MAX_TRANSFER_BLOCKS: u32 = 10;
1138
1139 fn test_device_info() -> DeviceInfo {
1140 DeviceInfo::Partition(PartitionInfo {
1141 device_flags: fblock::Flag::READONLY,
1142 max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1143 block_range: Some(0..100),
1144 type_guid: [1; 16],
1145 instance_guid: [2; 16],
1146 name: "foo".to_string(),
1147 flags: 0xabcd,
1148 })
1149 }
1150
1151 #[fuchsia::test]
1152 async fn test_split_request_only_issues_single_barrier() {
1153 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1154 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1155 let barrier_called = Arc::new(AtomicU32::new(0));
1156 let barrier_called_clone = barrier_called.clone();
1157
1158 futures::join!(
1159 async move {
1160 let block_server = BlockServer::new(
1161 BLOCK_SIZE,
1162 Arc::new(MockInterface {
1163 barrier_hook: Some(Box::new(move || {
1164 barrier_called_clone.fetch_add(1, Ordering::Relaxed);
1165 Ok(())
1166 })),
1167 write_hook: Some(Box::new(move |_device_block_offset| {
1168 Box::pin(async move { Ok(()) })
1169 })),
1170 ..MockInterface::default()
1171 }),
1172 );
1173 block_server.handle_requests(stream).await.unwrap();
1174 },
1175 async move {
1176 let (session_proxy, server) = fidl::endpoints::create_proxy();
1177
1178 proxy.open_session(server).unwrap();
1179
1180 let vmo_id = session_proxy
1181 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1182 .await
1183 .unwrap()
1184 .unwrap();
1185 assert_ne!(vmo_id.id, 0);
1186
1187 let mut fifo =
1188 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1189 let (mut reader, mut writer) = fifo.async_io();
1190 writer
1193 .write_entries(&BlockFifoRequest {
1194 command: BlockFifoCommand {
1195 opcode: BlockOpcode::Write.into_primitive(),
1196 flags: BlockIoFlag::PRE_BARRIER.bits(),
1197 ..Default::default()
1198 },
1199 vmoid: vmo_id.id,
1200 dev_offset: 0,
1201 length: MAX_TRANSFER_BLOCKS + 1,
1202 vmo_offset: 6,
1203 ..Default::default()
1204 })
1205 .await
1206 .unwrap();
1207
1208 let mut response = BlockFifoResponse::default();
1209 reader.read_entries(&mut response).await.unwrap();
1210 assert_eq!(response.status, zx::sys::ZX_OK);
1211
1212 assert_eq!(barrier_called.load(Ordering::Relaxed), 1);
1213
1214 std::mem::drop(proxy);
1215 }
1216 );
1217 }
1218
1219 #[fuchsia::test]
1220 async fn test_barriers_not_supported() {
1221 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1222 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1223
1224 futures::join!(
1225 async move {
1226 let block_server = BlockServer::new(
1227 BLOCK_SIZE,
1228 Arc::new(MockInterface {
1229 barrier_hook: Some(Box::new(move || Err(zx::Status::NOT_SUPPORTED))),
1230 write_hook: Some(Box::new(move |_device_block_offset| {
1231 Box::pin(async move { Ok(()) })
1232 })),
1233 ..MockInterface::default()
1234 }),
1235 );
1236 block_server.handle_requests(stream).await.unwrap();
1237 },
1238 async move {
1239 let (session_proxy, server) = fidl::endpoints::create_proxy();
1240
1241 proxy.open_session(server).unwrap();
1242
1243 let vmo_id = session_proxy
1244 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1245 .await
1246 .unwrap()
1247 .unwrap();
1248 assert_ne!(vmo_id.id, 0);
1249
1250 let mut fifo =
1251 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1252 let (mut reader, mut writer) = fifo.async_io();
1253
1254 writer
1257 .write_entries(&BlockFifoRequest {
1258 command: BlockFifoCommand {
1259 opcode: BlockOpcode::Write.into_primitive(),
1260 flags: BlockIoFlag::PRE_BARRIER.bits(),
1261 ..Default::default()
1262 },
1263 vmoid: vmo_id.id,
1264 dev_offset: 0,
1265 length: MAX_TRANSFER_BLOCKS + 1,
1266 vmo_offset: 6,
1267 ..Default::default()
1268 })
1269 .await
1270 .unwrap();
1271
1272 let mut response = BlockFifoResponse::default();
1273 reader.read_entries(&mut response).await.unwrap();
1274 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1275
1276 std::mem::drop(proxy);
1277 }
1278 );
1279 }
1280
1281 #[fuchsia::test]
1282 async fn test_barriers_ordering() {
1283 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1284 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1285 let barrier_called = Arc::new(AtomicBool::new(false));
1286
1287 futures::join!(
1288 async move {
1289 let barrier_called_clone = barrier_called.clone();
1290 let block_server = BlockServer::new(
1291 BLOCK_SIZE,
1292 Arc::new(MockInterface {
1293 barrier_hook: Some(Box::new(move || {
1294 barrier_called.store(true, Ordering::Relaxed);
1295 Ok(())
1296 })),
1297 write_hook: Some(Box::new(move |device_block_offset| {
1298 let barrier_called = barrier_called_clone.clone();
1299 Box::pin(async move {
1300 if device_block_offset % 2 == 0 {
1302 fasync::Timer::new(fasync::MonotonicInstant::after(
1303 zx::MonotonicDuration::from_millis(200),
1304 ))
1305 .await;
1306 }
1307 assert!(barrier_called.load(Ordering::Relaxed));
1308 Ok(())
1309 })
1310 })),
1311 ..MockInterface::default()
1312 }),
1313 );
1314 block_server.handle_requests(stream).await.unwrap();
1315 },
1316 async move {
1317 let (session_proxy, server) = fidl::endpoints::create_proxy();
1318
1319 proxy.open_session(server).unwrap();
1320
1321 let vmo_id = session_proxy
1322 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1323 .await
1324 .unwrap()
1325 .unwrap();
1326 assert_ne!(vmo_id.id, 0);
1327
1328 let mut fifo =
1329 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1330 let (mut reader, mut writer) = fifo.async_io();
1331
1332 writer
1333 .write_entries(&BlockFifoRequest {
1334 command: BlockFifoCommand {
1335 opcode: BlockOpcode::Write.into_primitive(),
1336 flags: BlockIoFlag::PRE_BARRIER.bits(),
1337 ..Default::default()
1338 },
1339 vmoid: vmo_id.id,
1340 dev_offset: 0,
1341 length: 5,
1342 vmo_offset: 6,
1343 ..Default::default()
1344 })
1345 .await
1346 .unwrap();
1347
1348 for i in 0..10 {
1349 writer
1350 .write_entries(&BlockFifoRequest {
1351 command: BlockFifoCommand {
1352 opcode: BlockOpcode::Write.into_primitive(),
1353 ..Default::default()
1354 },
1355 vmoid: vmo_id.id,
1356 dev_offset: i + 1,
1357 length: 5,
1358 vmo_offset: 6,
1359 ..Default::default()
1360 })
1361 .await
1362 .unwrap();
1363 }
1364 for _ in 0..11 {
1365 let mut response = BlockFifoResponse::default();
1366 reader.read_entries(&mut response).await.unwrap();
1367 assert_eq!(response.status, zx::sys::ZX_OK);
1368 }
1369
1370 std::mem::drop(proxy);
1371 }
1372 );
1373 }
1374
1375 #[fuchsia::test]
1376 async fn test_info() {
1377 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1378
1379 futures::join!(
1380 async {
1381 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1382 block_server.handle_requests(stream).await.unwrap();
1383 },
1384 async {
1385 let expected_info = test_device_info();
1386 let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1387 info
1388 } else {
1389 unreachable!()
1390 };
1391
1392 let block_info = proxy.get_info().await.unwrap().unwrap();
1393 assert_eq!(
1394 block_info.block_count,
1395 partition_info.block_range.as_ref().unwrap().end
1396 - partition_info.block_range.as_ref().unwrap().start
1397 );
1398 assert_eq!(block_info.flags, fblock::Flag::READONLY);
1399
1400 assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1401
1402 let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1403 assert_eq!(status, zx::sys::ZX_OK);
1404 assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1405
1406 let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1407 assert_eq!(status, zx::sys::ZX_OK);
1408 assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1409
1410 let (status, name) = proxy.get_name().await.unwrap();
1411 assert_eq!(status, zx::sys::ZX_OK);
1412 assert_eq!(name.as_ref(), Some(&partition_info.name));
1413
1414 let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1415 assert_eq!(metadata.name, name);
1416 assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1417 assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1418 assert_eq!(
1419 metadata.start_block_offset,
1420 Some(partition_info.block_range.as_ref().unwrap().start)
1421 );
1422 assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1423 assert_eq!(metadata.flags, Some(partition_info.flags));
1424
1425 std::mem::drop(proxy);
1426 }
1427 );
1428 }
1429
1430 #[fuchsia::test]
1431 async fn test_attach_vmo() {
1432 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1433
1434 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1435 let koid = vmo.get_koid().unwrap();
1436
1437 futures::join!(
1438 async {
1439 let block_server = BlockServer::new(
1440 BLOCK_SIZE,
1441 Arc::new(MockInterface {
1442 read_hook: Some(Box::new(move |_, _, vmo, _| {
1443 assert_eq!(vmo.get_koid().unwrap(), koid);
1444 Box::pin(async { Ok(()) })
1445 })),
1446 ..MockInterface::default()
1447 }),
1448 );
1449 block_server.handle_requests(stream).await.unwrap();
1450 },
1451 async move {
1452 let (session_proxy, server) = fidl::endpoints::create_proxy();
1453
1454 proxy.open_session(server).unwrap();
1455
1456 let vmo_id = session_proxy
1457 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1458 .await
1459 .unwrap()
1460 .unwrap();
1461 assert_ne!(vmo_id.id, 0);
1462
1463 let mut fifo =
1464 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1465 let (mut reader, mut writer) = fifo.async_io();
1466
1467 let mut count = 1;
1469 loop {
1470 match session_proxy
1471 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1472 .await
1473 .unwrap()
1474 {
1475 Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1476 Err(e) => {
1477 assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1478 break;
1479 }
1480 }
1481
1482 if count % 10 == 0 {
1484 writer
1485 .write_entries(&BlockFifoRequest {
1486 command: BlockFifoCommand {
1487 opcode: BlockOpcode::Read.into_primitive(),
1488 ..Default::default()
1489 },
1490 vmoid: vmo_id.id,
1491 length: 1,
1492 ..Default::default()
1493 })
1494 .await
1495 .unwrap();
1496
1497 let mut response = BlockFifoResponse::default();
1498 reader.read_entries(&mut response).await.unwrap();
1499 assert_eq!(response.status, zx::sys::ZX_OK);
1500 }
1501
1502 count += 1;
1503 }
1504
1505 assert_eq!(count, u16::MAX as u64);
1506
1507 writer
1509 .write_entries(&BlockFifoRequest {
1510 command: BlockFifoCommand {
1511 opcode: BlockOpcode::CloseVmo.into_primitive(),
1512 ..Default::default()
1513 },
1514 vmoid: vmo_id.id,
1515 ..Default::default()
1516 })
1517 .await
1518 .unwrap();
1519
1520 let mut response = BlockFifoResponse::default();
1521 reader.read_entries(&mut response).await.unwrap();
1522 assert_eq!(response.status, zx::sys::ZX_OK);
1523
1524 let new_vmo_id = session_proxy
1525 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1526 .await
1527 .unwrap()
1528 .unwrap();
1529 assert_eq!(new_vmo_id.id, vmo_id.id);
1531
1532 std::mem::drop(proxy);
1533 }
1534 );
1535 }
1536
1537 #[fuchsia::test]
1538 async fn test_close() {
1539 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1540
1541 let mut server = std::pin::pin!(
1542 async {
1543 let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1544 block_server.handle_requests(stream).await.unwrap();
1545 }
1546 .fuse()
1547 );
1548
1549 let mut client = std::pin::pin!(
1550 async {
1551 let (session_proxy, server) = fidl::endpoints::create_proxy();
1552
1553 proxy.open_session(server).unwrap();
1554
1555 std::mem::drop(proxy);
1558
1559 session_proxy.close().await.unwrap().unwrap();
1560
1561 let _: () = std::future::pending().await;
1563 }
1564 .fuse()
1565 );
1566
1567 futures::select!(
1568 _ = server => {}
1569 _ = client => unreachable!(),
1570 );
1571 }
1572
1573 #[derive(Default)]
1574 struct IoMockInterface {
1575 do_checks: bool,
1576 expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1577 return_errors: bool,
1578 }
1579
1580 #[derive(Debug)]
1581 enum ExpectedOp {
1582 Read(u64, u32, u64),
1583 Write(u64, u32, u64),
1584 Trim(u64, u32),
1585 Flush,
1586 }
1587
1588 impl super::async_interface::Interface for IoMockInterface {
1589 async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1590 Ok(())
1591 }
1592
1593 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1594 Ok(Cow::Owned(test_device_info()))
1595 }
1596
1597 async fn read(
1598 &self,
1599 device_block_offset: u64,
1600 block_count: u32,
1601 _vmo: &Arc<zx::Vmo>,
1602 vmo_offset: u64,
1603 _trace_flow_id: TraceFlowId,
1604 ) -> Result<(), zx::Status> {
1605 if self.return_errors {
1606 Err(zx::Status::INTERNAL)
1607 } else {
1608 if self.do_checks {
1609 assert_matches!(
1610 self.expected_op.lock().take(),
1611 Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1612 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1613 "Read {device_block_offset} {block_count} {vmo_offset}"
1614 );
1615 }
1616 Ok(())
1617 }
1618 }
1619
1620 async fn write(
1621 &self,
1622 device_block_offset: u64,
1623 block_count: u32,
1624 _vmo: &Arc<zx::Vmo>,
1625 vmo_offset: u64,
1626 _opts: WriteOptions,
1627 _trace_flow_id: TraceFlowId,
1628 ) -> Result<(), zx::Status> {
1629 if self.return_errors {
1630 Err(zx::Status::NOT_SUPPORTED)
1631 } else {
1632 if self.do_checks {
1633 assert_matches!(
1634 self.expected_op.lock().take(),
1635 Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1636 block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1637 "Write {device_block_offset} {block_count} {vmo_offset}"
1638 );
1639 }
1640 Ok(())
1641 }
1642 }
1643
1644 async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1645 if self.return_errors {
1646 Err(zx::Status::NO_RESOURCES)
1647 } else {
1648 if self.do_checks {
1649 assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1650 }
1651 Ok(())
1652 }
1653 }
1654
1655 fn barrier(&self) -> Result<(), zx::Status> {
1656 unreachable!()
1657 }
1658
1659 async fn trim(
1660 &self,
1661 device_block_offset: u64,
1662 block_count: u32,
1663 _trace_flow_id: TraceFlowId,
1664 ) -> Result<(), zx::Status> {
1665 if self.return_errors {
1666 Err(zx::Status::NO_MEMORY)
1667 } else {
1668 if self.do_checks {
1669 assert_matches!(
1670 self.expected_op.lock().take(),
1671 Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1672 block_count == b,
1673 "Trim {device_block_offset} {block_count}"
1674 );
1675 }
1676 Ok(())
1677 }
1678 }
1679 }
1680
1681 #[fuchsia::test]
1682 async fn test_io() {
1683 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1684
1685 let expected_op = Arc::new(Mutex::new(None));
1686 let expected_op_clone = expected_op.clone();
1687
1688 let server = async {
1689 let block_server = BlockServer::new(
1690 BLOCK_SIZE,
1691 Arc::new(IoMockInterface {
1692 return_errors: false,
1693 do_checks: true,
1694 expected_op: expected_op_clone,
1695 }),
1696 );
1697 block_server.handle_requests(stream).await.unwrap();
1698 };
1699
1700 let client = async move {
1701 let (session_proxy, server) = fidl::endpoints::create_proxy();
1702
1703 proxy.open_session(server).unwrap();
1704
1705 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1706 let vmo_id = session_proxy
1707 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1708 .await
1709 .unwrap()
1710 .unwrap();
1711
1712 let mut fifo =
1713 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1714 let (mut reader, mut writer) = fifo.async_io();
1715
1716 *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1718 writer
1719 .write_entries(&BlockFifoRequest {
1720 command: BlockFifoCommand {
1721 opcode: BlockOpcode::Read.into_primitive(),
1722 ..Default::default()
1723 },
1724 vmoid: vmo_id.id,
1725 dev_offset: 1,
1726 length: 2,
1727 vmo_offset: 3,
1728 ..Default::default()
1729 })
1730 .await
1731 .unwrap();
1732
1733 let mut response = BlockFifoResponse::default();
1734 reader.read_entries(&mut response).await.unwrap();
1735 assert_eq!(response.status, zx::sys::ZX_OK);
1736
1737 *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1739 writer
1740 .write_entries(&BlockFifoRequest {
1741 command: BlockFifoCommand {
1742 opcode: BlockOpcode::Write.into_primitive(),
1743 ..Default::default()
1744 },
1745 vmoid: vmo_id.id,
1746 dev_offset: 4,
1747 length: 5,
1748 vmo_offset: 6,
1749 ..Default::default()
1750 })
1751 .await
1752 .unwrap();
1753
1754 let mut response = BlockFifoResponse::default();
1755 reader.read_entries(&mut response).await.unwrap();
1756 assert_eq!(response.status, zx::sys::ZX_OK);
1757
1758 *expected_op.lock() = Some(ExpectedOp::Flush);
1760 writer
1761 .write_entries(&BlockFifoRequest {
1762 command: BlockFifoCommand {
1763 opcode: BlockOpcode::Flush.into_primitive(),
1764 ..Default::default()
1765 },
1766 ..Default::default()
1767 })
1768 .await
1769 .unwrap();
1770
1771 reader.read_entries(&mut response).await.unwrap();
1772 assert_eq!(response.status, zx::sys::ZX_OK);
1773
1774 *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1776 writer
1777 .write_entries(&BlockFifoRequest {
1778 command: BlockFifoCommand {
1779 opcode: BlockOpcode::Trim.into_primitive(),
1780 ..Default::default()
1781 },
1782 dev_offset: 7,
1783 length: 8,
1784 ..Default::default()
1785 })
1786 .await
1787 .unwrap();
1788
1789 reader.read_entries(&mut response).await.unwrap();
1790 assert_eq!(response.status, zx::sys::ZX_OK);
1791
1792 std::mem::drop(proxy);
1793 };
1794
1795 futures::join!(server, client);
1796 }
1797
1798 #[fuchsia::test]
1799 async fn test_io_errors() {
1800 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1801
1802 futures::join!(
1803 async {
1804 let block_server = BlockServer::new(
1805 BLOCK_SIZE,
1806 Arc::new(IoMockInterface {
1807 return_errors: true,
1808 do_checks: false,
1809 expected_op: Arc::new(Mutex::new(None)),
1810 }),
1811 );
1812 block_server.handle_requests(stream).await.unwrap();
1813 },
1814 async move {
1815 let (session_proxy, server) = fidl::endpoints::create_proxy();
1816
1817 proxy.open_session(server).unwrap();
1818
1819 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1820 let vmo_id = session_proxy
1821 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1822 .await
1823 .unwrap()
1824 .unwrap();
1825
1826 let mut fifo =
1827 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1828 let (mut reader, mut writer) = fifo.async_io();
1829
1830 writer
1832 .write_entries(&BlockFifoRequest {
1833 command: BlockFifoCommand {
1834 opcode: BlockOpcode::Read.into_primitive(),
1835 ..Default::default()
1836 },
1837 vmoid: vmo_id.id,
1838 length: 1,
1839 reqid: 1,
1840 ..Default::default()
1841 })
1842 .await
1843 .unwrap();
1844
1845 let mut response = BlockFifoResponse::default();
1846 reader.read_entries(&mut response).await.unwrap();
1847 assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1848
1849 writer
1851 .write_entries(&BlockFifoRequest {
1852 command: BlockFifoCommand {
1853 opcode: BlockOpcode::Write.into_primitive(),
1854 ..Default::default()
1855 },
1856 vmoid: vmo_id.id,
1857 length: 1,
1858 reqid: 2,
1859 ..Default::default()
1860 })
1861 .await
1862 .unwrap();
1863
1864 reader.read_entries(&mut response).await.unwrap();
1865 assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1866
1867 writer
1869 .write_entries(&BlockFifoRequest {
1870 command: BlockFifoCommand {
1871 opcode: BlockOpcode::Flush.into_primitive(),
1872 ..Default::default()
1873 },
1874 reqid: 3,
1875 ..Default::default()
1876 })
1877 .await
1878 .unwrap();
1879
1880 reader.read_entries(&mut response).await.unwrap();
1881 assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1882
1883 writer
1885 .write_entries(&BlockFifoRequest {
1886 command: BlockFifoCommand {
1887 opcode: BlockOpcode::Trim.into_primitive(),
1888 ..Default::default()
1889 },
1890 reqid: 4,
1891 length: 1,
1892 ..Default::default()
1893 })
1894 .await
1895 .unwrap();
1896
1897 reader.read_entries(&mut response).await.unwrap();
1898 assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1899
1900 std::mem::drop(proxy);
1901 }
1902 );
1903 }
1904
1905 #[fuchsia::test]
1906 async fn test_invalid_args() {
1907 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1908
1909 futures::join!(
1910 async {
1911 let block_server = BlockServer::new(
1912 BLOCK_SIZE,
1913 Arc::new(IoMockInterface {
1914 return_errors: false,
1915 do_checks: false,
1916 expected_op: Arc::new(Mutex::new(None)),
1917 }),
1918 );
1919 block_server.handle_requests(stream).await.unwrap();
1920 },
1921 async move {
1922 let (session_proxy, server) = fidl::endpoints::create_proxy();
1923
1924 proxy.open_session(server).unwrap();
1925
1926 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1927 let vmo_id = session_proxy
1928 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1929 .await
1930 .unwrap()
1931 .unwrap();
1932
1933 let mut fifo =
1934 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1935
1936 async fn test(
1937 fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1938 request: BlockFifoRequest,
1939 ) -> Result<(), zx::Status> {
1940 let (mut reader, mut writer) = fifo.async_io();
1941 writer.write_entries(&request).await.unwrap();
1942 let mut response = BlockFifoResponse::default();
1943 reader.read_entries(&mut response).await.unwrap();
1944 zx::Status::ok(response.status)
1945 }
1946
1947 let good_read_request = || BlockFifoRequest {
1950 command: BlockFifoCommand {
1951 opcode: BlockOpcode::Read.into_primitive(),
1952 ..Default::default()
1953 },
1954 length: 1,
1955 vmoid: vmo_id.id,
1956 ..Default::default()
1957 };
1958
1959 assert_eq!(
1960 test(
1961 &mut fifo,
1962 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1963 )
1964 .await,
1965 Err(zx::Status::IO)
1966 );
1967
1968 assert_eq!(
1969 test(
1970 &mut fifo,
1971 BlockFifoRequest {
1972 vmo_offset: 0xffff_ffff_ffff_ffff,
1973 ..good_read_request()
1974 }
1975 )
1976 .await,
1977 Err(zx::Status::OUT_OF_RANGE)
1978 );
1979
1980 assert_eq!(
1981 test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
1982 Err(zx::Status::INVALID_ARGS)
1983 );
1984
1985 let good_write_request = || BlockFifoRequest {
1988 command: BlockFifoCommand {
1989 opcode: BlockOpcode::Write.into_primitive(),
1990 ..Default::default()
1991 },
1992 length: 1,
1993 vmoid: vmo_id.id,
1994 ..Default::default()
1995 };
1996
1997 assert_eq!(
1998 test(
1999 &mut fifo,
2000 BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2001 )
2002 .await,
2003 Err(zx::Status::IO)
2004 );
2005
2006 assert_eq!(
2007 test(
2008 &mut fifo,
2009 BlockFifoRequest {
2010 vmo_offset: 0xffff_ffff_ffff_ffff,
2011 ..good_write_request()
2012 }
2013 )
2014 .await,
2015 Err(zx::Status::OUT_OF_RANGE)
2016 );
2017
2018 assert_eq!(
2019 test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2020 Err(zx::Status::INVALID_ARGS)
2021 );
2022
2023 assert_eq!(
2026 test(
2027 &mut fifo,
2028 BlockFifoRequest {
2029 command: BlockFifoCommand {
2030 opcode: BlockOpcode::CloseVmo.into_primitive(),
2031 ..Default::default()
2032 },
2033 vmoid: vmo_id.id + 1,
2034 ..Default::default()
2035 }
2036 )
2037 .await,
2038 Err(zx::Status::IO)
2039 );
2040
2041 std::mem::drop(proxy);
2042 }
2043 );
2044 }
2045
2046 #[fuchsia::test]
2047 async fn test_concurrent_requests() {
2048 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2049
2050 let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2051 let waiting_readers_clone = waiting_readers.clone();
2052
2053 futures::join!(
2054 async move {
2055 let block_server = BlockServer::new(
2056 BLOCK_SIZE,
2057 Arc::new(MockInterface {
2058 read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2059 let (tx, rx) = oneshot::channel();
2060 waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2061 Box::pin(async move {
2062 let _ = rx.await;
2063 Ok(())
2064 })
2065 })),
2066 ..MockInterface::default()
2067 }),
2068 );
2069 block_server.handle_requests(stream).await.unwrap();
2070 },
2071 async move {
2072 let (session_proxy, server) = fidl::endpoints::create_proxy();
2073
2074 proxy.open_session(server).unwrap();
2075
2076 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2077 let vmo_id = session_proxy
2078 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2079 .await
2080 .unwrap()
2081 .unwrap();
2082
2083 let mut fifo =
2084 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2085 let (mut reader, mut writer) = fifo.async_io();
2086
2087 writer
2088 .write_entries(&BlockFifoRequest {
2089 command: BlockFifoCommand {
2090 opcode: BlockOpcode::Read.into_primitive(),
2091 ..Default::default()
2092 },
2093 reqid: 1,
2094 dev_offset: 1, vmoid: vmo_id.id,
2096 length: 1,
2097 ..Default::default()
2098 })
2099 .await
2100 .unwrap();
2101
2102 writer
2103 .write_entries(&BlockFifoRequest {
2104 command: BlockFifoCommand {
2105 opcode: BlockOpcode::Read.into_primitive(),
2106 ..Default::default()
2107 },
2108 reqid: 2,
2109 dev_offset: 2,
2110 vmoid: vmo_id.id,
2111 length: 1,
2112 ..Default::default()
2113 })
2114 .await
2115 .unwrap();
2116
2117 poll_fn(|cx: &mut Context<'_>| {
2119 if waiting_readers.lock().len() == 2 {
2120 Poll::Ready(())
2121 } else {
2122 cx.waker().wake_by_ref();
2124 Poll::Pending
2125 }
2126 })
2127 .await;
2128
2129 let mut response = BlockFifoResponse::default();
2130 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2131
2132 let (id, tx) = waiting_readers.lock().pop().unwrap();
2133 tx.send(()).unwrap();
2134
2135 reader.read_entries(&mut response).await.unwrap();
2136 assert_eq!(response.status, zx::sys::ZX_OK);
2137 assert_eq!(response.reqid, id);
2138
2139 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2140
2141 let (id, tx) = waiting_readers.lock().pop().unwrap();
2142 tx.send(()).unwrap();
2143
2144 reader.read_entries(&mut response).await.unwrap();
2145 assert_eq!(response.status, zx::sys::ZX_OK);
2146 assert_eq!(response.reqid, id);
2147 }
2148 );
2149 }
2150
2151 #[fuchsia::test]
2152 async fn test_groups() {
2153 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2154
2155 futures::join!(
2156 async move {
2157 let block_server = BlockServer::new(
2158 BLOCK_SIZE,
2159 Arc::new(MockInterface {
2160 read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2161 ..MockInterface::default()
2162 }),
2163 );
2164 block_server.handle_requests(stream).await.unwrap();
2165 },
2166 async move {
2167 let (session_proxy, server) = fidl::endpoints::create_proxy();
2168
2169 proxy.open_session(server).unwrap();
2170
2171 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2172 let vmo_id = session_proxy
2173 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2174 .await
2175 .unwrap()
2176 .unwrap();
2177
2178 let mut fifo =
2179 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2180 let (mut reader, mut writer) = fifo.async_io();
2181
2182 writer
2183 .write_entries(&BlockFifoRequest {
2184 command: BlockFifoCommand {
2185 opcode: BlockOpcode::Read.into_primitive(),
2186 flags: BlockIoFlag::GROUP_ITEM.bits(),
2187 ..Default::default()
2188 },
2189 group: 1,
2190 vmoid: vmo_id.id,
2191 length: 1,
2192 ..Default::default()
2193 })
2194 .await
2195 .unwrap();
2196
2197 writer
2198 .write_entries(&BlockFifoRequest {
2199 command: BlockFifoCommand {
2200 opcode: BlockOpcode::Read.into_primitive(),
2201 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2202 ..Default::default()
2203 },
2204 reqid: 2,
2205 group: 1,
2206 vmoid: vmo_id.id,
2207 length: 1,
2208 ..Default::default()
2209 })
2210 .await
2211 .unwrap();
2212
2213 let mut response = BlockFifoResponse::default();
2214 reader.read_entries(&mut response).await.unwrap();
2215 assert_eq!(response.status, zx::sys::ZX_OK);
2216 assert_eq!(response.reqid, 2);
2217 assert_eq!(response.group, 1);
2218 }
2219 );
2220 }
2221
2222 #[fuchsia::test]
2223 async fn test_group_error() {
2224 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2225
2226 let counter = Arc::new(AtomicU64::new(0));
2227 let counter_clone = counter.clone();
2228
2229 futures::join!(
2230 async move {
2231 let block_server = BlockServer::new(
2232 BLOCK_SIZE,
2233 Arc::new(MockInterface {
2234 read_hook: Some(Box::new(move |_, _, _, _| {
2235 counter_clone.fetch_add(1, Ordering::Relaxed);
2236 Box::pin(async { Err(zx::Status::BAD_STATE) })
2237 })),
2238 ..MockInterface::default()
2239 }),
2240 );
2241 block_server.handle_requests(stream).await.unwrap();
2242 },
2243 async move {
2244 let (session_proxy, server) = fidl::endpoints::create_proxy();
2245
2246 proxy.open_session(server).unwrap();
2247
2248 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2249 let vmo_id = session_proxy
2250 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2251 .await
2252 .unwrap()
2253 .unwrap();
2254
2255 let mut fifo =
2256 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2257 let (mut reader, mut writer) = fifo.async_io();
2258
2259 writer
2260 .write_entries(&BlockFifoRequest {
2261 command: BlockFifoCommand {
2262 opcode: BlockOpcode::Read.into_primitive(),
2263 flags: BlockIoFlag::GROUP_ITEM.bits(),
2264 ..Default::default()
2265 },
2266 group: 1,
2267 vmoid: vmo_id.id,
2268 length: 1,
2269 ..Default::default()
2270 })
2271 .await
2272 .unwrap();
2273
2274 poll_fn(|cx: &mut Context<'_>| {
2276 if counter.load(Ordering::Relaxed) == 1 {
2277 Poll::Ready(())
2278 } else {
2279 cx.waker().wake_by_ref();
2281 Poll::Pending
2282 }
2283 })
2284 .await;
2285
2286 let mut response = BlockFifoResponse::default();
2287 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2288
2289 writer
2290 .write_entries(&BlockFifoRequest {
2291 command: BlockFifoCommand {
2292 opcode: BlockOpcode::Read.into_primitive(),
2293 flags: BlockIoFlag::GROUP_ITEM.bits(),
2294 ..Default::default()
2295 },
2296 group: 1,
2297 vmoid: vmo_id.id,
2298 length: 1,
2299 ..Default::default()
2300 })
2301 .await
2302 .unwrap();
2303
2304 writer
2305 .write_entries(&BlockFifoRequest {
2306 command: BlockFifoCommand {
2307 opcode: BlockOpcode::Read.into_primitive(),
2308 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2309 ..Default::default()
2310 },
2311 reqid: 2,
2312 group: 1,
2313 vmoid: vmo_id.id,
2314 length: 1,
2315 ..Default::default()
2316 })
2317 .await
2318 .unwrap();
2319
2320 reader.read_entries(&mut response).await.unwrap();
2321 assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2322 assert_eq!(response.reqid, 2);
2323 assert_eq!(response.group, 1);
2324
2325 assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2326
2327 assert_eq!(counter.load(Ordering::Relaxed), 1);
2329 }
2330 );
2331 }
2332
2333 #[fuchsia::test]
2334 async fn test_group_with_two_lasts() {
2335 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2336
2337 let (tx, rx) = oneshot::channel();
2338
2339 futures::join!(
2340 async move {
2341 let rx = Mutex::new(Some(rx));
2342 let block_server = BlockServer::new(
2343 BLOCK_SIZE,
2344 Arc::new(MockInterface {
2345 read_hook: Some(Box::new(move |_, _, _, _| {
2346 let rx = rx.lock().take().unwrap();
2347 Box::pin(async {
2348 let _ = rx.await;
2349 Ok(())
2350 })
2351 })),
2352 ..MockInterface::default()
2353 }),
2354 );
2355 block_server.handle_requests(stream).await.unwrap();
2356 },
2357 async move {
2358 let (session_proxy, server) = fidl::endpoints::create_proxy();
2359
2360 proxy.open_session(server).unwrap();
2361
2362 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2363 let vmo_id = session_proxy
2364 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2365 .await
2366 .unwrap()
2367 .unwrap();
2368
2369 let mut fifo =
2370 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2371 let (mut reader, mut writer) = fifo.async_io();
2372
2373 writer
2374 .write_entries(&BlockFifoRequest {
2375 command: BlockFifoCommand {
2376 opcode: BlockOpcode::Read.into_primitive(),
2377 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2378 ..Default::default()
2379 },
2380 reqid: 1,
2381 group: 1,
2382 vmoid: vmo_id.id,
2383 length: 1,
2384 ..Default::default()
2385 })
2386 .await
2387 .unwrap();
2388
2389 writer
2390 .write_entries(&BlockFifoRequest {
2391 command: BlockFifoCommand {
2392 opcode: BlockOpcode::Read.into_primitive(),
2393 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2394 ..Default::default()
2395 },
2396 reqid: 2,
2397 group: 1,
2398 vmoid: vmo_id.id,
2399 length: 1,
2400 ..Default::default()
2401 })
2402 .await
2403 .unwrap();
2404
2405 writer
2407 .write_entries(&BlockFifoRequest {
2408 command: BlockFifoCommand {
2409 opcode: BlockOpcode::CloseVmo.into_primitive(),
2410 ..Default::default()
2411 },
2412 reqid: 3,
2413 vmoid: vmo_id.id,
2414 ..Default::default()
2415 })
2416 .await
2417 .unwrap();
2418
2419 let mut response = BlockFifoResponse::default();
2421 reader.read_entries(&mut response).await.unwrap();
2422 assert_eq!(response.status, zx::sys::ZX_OK);
2423 assert_eq!(response.reqid, 3);
2424
2425 tx.send(()).unwrap();
2427
2428 let mut response = BlockFifoResponse::default();
2431 reader.read_entries(&mut response).await.unwrap();
2432 assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2433 assert_eq!(response.reqid, 1);
2434 assert_eq!(response.group, 1);
2435 }
2436 );
2437 }
2438
2439 #[fuchsia::test(allow_stalls = false)]
2440 async fn test_requests_dont_block_sessions() {
2441 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2442
2443 let (tx, rx) = oneshot::channel();
2444
2445 fasync::Task::local(async move {
2446 let rx = Mutex::new(Some(rx));
2447 let block_server = BlockServer::new(
2448 BLOCK_SIZE,
2449 Arc::new(MockInterface {
2450 read_hook: Some(Box::new(move |_, _, _, _| {
2451 let rx = rx.lock().take().unwrap();
2452 Box::pin(async {
2453 let _ = rx.await;
2454 Ok(())
2455 })
2456 })),
2457 ..MockInterface::default()
2458 }),
2459 );
2460 block_server.handle_requests(stream).await.unwrap();
2461 })
2462 .detach();
2463
2464 let mut fut = pin!(async {
2465 let (session_proxy, server) = fidl::endpoints::create_proxy();
2466
2467 proxy.open_session(server).unwrap();
2468
2469 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2470 let vmo_id = session_proxy
2471 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2472 .await
2473 .unwrap()
2474 .unwrap();
2475
2476 let mut fifo =
2477 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2478 let (mut reader, mut writer) = fifo.async_io();
2479
2480 writer
2481 .write_entries(&BlockFifoRequest {
2482 command: BlockFifoCommand {
2483 opcode: BlockOpcode::Read.into_primitive(),
2484 flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2485 ..Default::default()
2486 },
2487 reqid: 1,
2488 group: 1,
2489 vmoid: vmo_id.id,
2490 length: 1,
2491 ..Default::default()
2492 })
2493 .await
2494 .unwrap();
2495
2496 let mut response = BlockFifoResponse::default();
2497 reader.read_entries(&mut response).await.unwrap();
2498 assert_eq!(response.status, zx::sys::ZX_OK);
2499 });
2500
2501 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2503
2504 let mut fut2 = pin!(proxy.get_volume_info());
2505
2506 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2508
2509 let _ = tx.send(());
2512
2513 assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2514 }
2515
2516 #[fuchsia::test]
2517 async fn test_request_flow_control() {
2518 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2519
2520 const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2523 let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2524 let event_clone = event.clone();
2525 futures::join!(
2526 async move {
2527 let block_server = BlockServer::new(
2528 BLOCK_SIZE,
2529 Arc::new(MockInterface {
2530 read_hook: Some(Box::new(move |_, _, _, _| {
2531 let event_clone = event_clone.clone();
2532 Box::pin(async move {
2533 if !event_clone.1.load(Ordering::SeqCst) {
2534 event_clone.0.listen().await;
2535 }
2536 Ok(())
2537 })
2538 })),
2539 ..MockInterface::default()
2540 }),
2541 );
2542 block_server.handle_requests(stream).await.unwrap();
2543 },
2544 async move {
2545 let (session_proxy, server) = fidl::endpoints::create_proxy();
2546
2547 proxy.open_session(server).unwrap();
2548
2549 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2550 let vmo_id = session_proxy
2551 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2552 .await
2553 .unwrap()
2554 .unwrap();
2555
2556 let mut fifo =
2557 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2558 let (mut reader, mut writer) = fifo.async_io();
2559
2560 for i in 0..MAX_REQUESTS {
2561 writer
2562 .write_entries(&BlockFifoRequest {
2563 command: BlockFifoCommand {
2564 opcode: BlockOpcode::Read.into_primitive(),
2565 ..Default::default()
2566 },
2567 reqid: (i + 1) as u32,
2568 dev_offset: i,
2569 vmoid: vmo_id.id,
2570 length: 1,
2571 ..Default::default()
2572 })
2573 .await
2574 .unwrap();
2575 }
2576 assert!(
2577 futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2578 command: BlockFifoCommand {
2579 opcode: BlockOpcode::Read.into_primitive(),
2580 ..Default::default()
2581 },
2582 reqid: u32::MAX,
2583 dev_offset: MAX_REQUESTS,
2584 vmoid: vmo_id.id,
2585 length: 1,
2586 ..Default::default()
2587 })))
2588 .is_pending()
2589 );
2590 event.1.store(true, Ordering::SeqCst);
2592 event.0.notify(usize::MAX);
2593 let mut finished_reqids = vec![];
2595 for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2596 let mut response = BlockFifoResponse::default();
2597 reader.read_entries(&mut response).await.unwrap();
2598 assert_eq!(response.status, zx::sys::ZX_OK);
2599 finished_reqids.push(response.reqid);
2600 writer
2601 .write_entries(&BlockFifoRequest {
2602 command: BlockFifoCommand {
2603 opcode: BlockOpcode::Read.into_primitive(),
2604 ..Default::default()
2605 },
2606 reqid: (i + 1) as u32,
2607 dev_offset: i,
2608 vmoid: vmo_id.id,
2609 length: 1,
2610 ..Default::default()
2611 })
2612 .await
2613 .unwrap();
2614 }
2615 let mut response = BlockFifoResponse::default();
2616 for _ in 0..MAX_REQUESTS {
2617 reader.read_entries(&mut response).await.unwrap();
2618 assert_eq!(response.status, zx::sys::ZX_OK);
2619 finished_reqids.push(response.reqid);
2620 }
2621 finished_reqids.sort();
2624 assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2625 let mut i = 1;
2626 for reqid in finished_reqids {
2627 assert_eq!(reqid, i);
2628 i += 1;
2629 }
2630 }
2631 );
2632 }
2633
2634 #[fuchsia::test]
2635 async fn test_passthrough_io_with_fixed_map() {
2636 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2637
2638 let expected_op = Arc::new(Mutex::new(None));
2639 let expected_op_clone = expected_op.clone();
2640 futures::join!(
2641 async {
2642 let block_server = BlockServer::new(
2643 BLOCK_SIZE,
2644 Arc::new(IoMockInterface {
2645 return_errors: false,
2646 do_checks: true,
2647 expected_op: expected_op_clone,
2648 }),
2649 );
2650 block_server.handle_requests(stream).await.unwrap();
2651 },
2652 async move {
2653 let (session_proxy, server) = fidl::endpoints::create_proxy();
2654
2655 let mapping = fblock::BlockOffsetMapping {
2656 source_block_offset: 0,
2657 target_block_offset: 10,
2658 length: 20,
2659 };
2660 proxy.open_session_with_offset_map(server, &mapping).unwrap();
2661
2662 let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2663 let vmo_id = session_proxy
2664 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2665 .await
2666 .unwrap()
2667 .unwrap();
2668
2669 let mut fifo =
2670 fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2671 let (mut reader, mut writer) = fifo.async_io();
2672
2673 *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2675 writer
2676 .write_entries(&BlockFifoRequest {
2677 command: BlockFifoCommand {
2678 opcode: BlockOpcode::Read.into_primitive(),
2679 ..Default::default()
2680 },
2681 vmoid: vmo_id.id,
2682 dev_offset: 1,
2683 length: 2,
2684 vmo_offset: 3,
2685 ..Default::default()
2686 })
2687 .await
2688 .unwrap();
2689
2690 let mut response = BlockFifoResponse::default();
2691 reader.read_entries(&mut response).await.unwrap();
2692 assert_eq!(response.status, zx::sys::ZX_OK);
2693
2694 *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2696 writer
2697 .write_entries(&BlockFifoRequest {
2698 command: BlockFifoCommand {
2699 opcode: BlockOpcode::Write.into_primitive(),
2700 ..Default::default()
2701 },
2702 vmoid: vmo_id.id,
2703 dev_offset: 4,
2704 length: 5,
2705 vmo_offset: 6,
2706 ..Default::default()
2707 })
2708 .await
2709 .unwrap();
2710
2711 reader.read_entries(&mut response).await.unwrap();
2712 assert_eq!(response.status, zx::sys::ZX_OK);
2713
2714 *expected_op.lock() = Some(ExpectedOp::Flush);
2716 writer
2717 .write_entries(&BlockFifoRequest {
2718 command: BlockFifoCommand {
2719 opcode: BlockOpcode::Flush.into_primitive(),
2720 ..Default::default()
2721 },
2722 ..Default::default()
2723 })
2724 .await
2725 .unwrap();
2726
2727 reader.read_entries(&mut response).await.unwrap();
2728 assert_eq!(response.status, zx::sys::ZX_OK);
2729
2730 *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2732 writer
2733 .write_entries(&BlockFifoRequest {
2734 command: BlockFifoCommand {
2735 opcode: BlockOpcode::Trim.into_primitive(),
2736 ..Default::default()
2737 },
2738 dev_offset: 7,
2739 length: 3,
2740 ..Default::default()
2741 })
2742 .await
2743 .unwrap();
2744
2745 reader.read_entries(&mut response).await.unwrap();
2746 assert_eq!(response.status, zx::sys::ZX_OK);
2747
2748 *expected_op.lock() = None;
2750 writer
2751 .write_entries(&BlockFifoRequest {
2752 command: BlockFifoCommand {
2753 opcode: BlockOpcode::Read.into_primitive(),
2754 ..Default::default()
2755 },
2756 vmoid: vmo_id.id,
2757 dev_offset: 19,
2758 length: 2,
2759 vmo_offset: 3,
2760 ..Default::default()
2761 })
2762 .await
2763 .unwrap();
2764
2765 reader.read_entries(&mut response).await.unwrap();
2766 assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2767
2768 std::mem::drop(proxy);
2769 }
2770 );
2771 }
2772
2773 #[fuchsia::test]
2774 fn operation_map() {
2775 fn expect_map_result(
2776 mut operation: Operation,
2777 mapping: Option<BlockOffsetMapping>,
2778 max_blocks: Option<NonZero<u32>>,
2779 expected_operations: Vec<Operation>,
2780 ) {
2781 let mut ops = vec![];
2782 while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
2783 ops.push(operation);
2784 operation = remainder;
2785 }
2786 ops.push(operation);
2787 assert_eq!(ops, expected_operations);
2788 }
2789
2790 expect_map_result(
2792 Operation::Read {
2793 device_block_offset: 10,
2794 block_count: 200,
2795 _unused: 0,
2796 vmo_offset: 0,
2797 },
2798 None,
2799 None,
2800 vec![Operation::Read {
2801 device_block_offset: 10,
2802 block_count: 200,
2803 _unused: 0,
2804 vmo_offset: 0,
2805 }],
2806 );
2807
2808 expect_map_result(
2810 Operation::Read {
2811 device_block_offset: 10,
2812 block_count: 200,
2813 _unused: 0,
2814 vmo_offset: 0,
2815 },
2816 None,
2817 NonZero::new(120),
2818 vec![
2819 Operation::Read {
2820 device_block_offset: 10,
2821 block_count: 120,
2822 _unused: 0,
2823 vmo_offset: 0,
2824 },
2825 Operation::Read {
2826 device_block_offset: 130,
2827 block_count: 80,
2828 _unused: 0,
2829 vmo_offset: 120 * 512,
2830 },
2831 ],
2832 );
2833 expect_map_result(
2834 Operation::Write {
2835 device_block_offset: 10,
2836 block_count: 200,
2837 _unused: 0,
2838 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2839 vmo_offset: 0,
2840 },
2841 None,
2842 NonZero::new(120),
2843 vec![
2844 Operation::Write {
2845 device_block_offset: 10,
2846 block_count: 120,
2847 _unused: 0,
2848 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2849 vmo_offset: 0,
2850 },
2851 Operation::Write {
2852 device_block_offset: 130,
2853 block_count: 80,
2854 _unused: 0,
2855 options: WriteOptions::default(),
2856 vmo_offset: 120 * 512,
2857 },
2858 ],
2859 );
2860 expect_map_result(
2861 Operation::Trim { device_block_offset: 10, block_count: 200 },
2862 None,
2863 NonZero::new(120),
2864 vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2865 );
2866
2867 expect_map_result(
2869 Operation::Read {
2870 device_block_offset: 10,
2871 block_count: 200,
2872 _unused: 0,
2873 vmo_offset: 0,
2874 },
2875 Some(BlockOffsetMapping {
2876 source_block_offset: 10,
2877 target_block_offset: 100,
2878 length: 200,
2879 }),
2880 NonZero::new(120),
2881 vec![
2882 Operation::Read {
2883 device_block_offset: 100,
2884 block_count: 120,
2885 _unused: 0,
2886 vmo_offset: 0,
2887 },
2888 Operation::Read {
2889 device_block_offset: 220,
2890 block_count: 80,
2891 _unused: 0,
2892 vmo_offset: 120 * 512,
2893 },
2894 ],
2895 );
2896 expect_map_result(
2897 Operation::Write {
2898 device_block_offset: 10,
2899 block_count: 200,
2900 _unused: 0,
2901 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2902 vmo_offset: 0,
2903 },
2904 Some(BlockOffsetMapping {
2905 source_block_offset: 10,
2906 target_block_offset: 100,
2907 length: 200,
2908 }),
2909 NonZero::new(120),
2910 vec![
2911 Operation::Write {
2912 device_block_offset: 100,
2913 block_count: 120,
2914 _unused: 0,
2915 options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2916 vmo_offset: 0,
2917 },
2918 Operation::Write {
2919 device_block_offset: 220,
2920 block_count: 80,
2921 _unused: 0,
2922 options: WriteOptions::default(),
2923 vmo_offset: 120 * 512,
2924 },
2925 ],
2926 );
2927 expect_map_result(
2928 Operation::Trim { device_block_offset: 10, block_count: 200 },
2929 Some(BlockOffsetMapping {
2930 source_block_offset: 10,
2931 target_block_offset: 100,
2932 length: 200,
2933 }),
2934 NonZero::new(120),
2935 vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
2936 );
2937 }
2938}