1use async_trait::async_trait;
6use fidl::endpoints::ServerEnd;
7use fidl_fuchsia_hardware_block::{BlockProxy, MAX_TRANSFER_UNBOUNDED};
8use fidl_fuchsia_hardware_block_partition::PartitionProxy;
9use fidl_fuchsia_hardware_block_volume::VolumeProxy;
10use fuchsia_sync::Mutex;
11use futures::channel::oneshot;
12use futures::executor::block_on;
13use lazy_static::lazy_static;
14use std::collections::HashMap;
15use std::future::Future;
16use std::hash::{Hash, Hasher};
17use std::mem::MaybeUninit;
18use std::num::NonZero;
19use std::ops::{DerefMut, Range};
20use std::pin::Pin;
21use std::sync::atomic::{AtomicU16, Ordering};
22use std::sync::Arc;
23use std::task::{Context, Poll, Waker};
24use zx::sys::zx_handle_t;
25use zx::{self as zx, HandleBased as _};
26use {
27 fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
28 fuchsia_async as fasync, storage_trace as trace,
29};
30
31pub use cache::Cache;
32
33pub use block::Flag as BlockFlags;
34
35pub use block_protocol::*;
36
37pub mod cache;
38
39const TEMP_VMO_SIZE: usize = 65536;
40
41pub const NO_TRACE_ID: u64 = 0;
43
44pub use block_driver::{BlockIoFlag, BlockOpcode};
45
46fn fidl_to_status(error: fidl::Error) -> zx::Status {
47 match error {
48 fidl::Error::ClientChannelClosed { status, .. } => status,
49 _ => zx::Status::INTERNAL,
50 }
51}
52
53fn opcode_str(opcode: u8) -> &'static str {
54 match BlockOpcode::from_primitive(opcode) {
55 Some(BlockOpcode::Read) => "read",
56 Some(BlockOpcode::Write) => "write",
57 Some(BlockOpcode::Flush) => "flush",
58 Some(BlockOpcode::Trim) => "trim",
59 Some(BlockOpcode::CloseVmo) => "close_vmo",
60 None => "unknown",
61 }
62}
63
64fn generate_trace_flow_id(request_id: u32) -> u64 {
67 lazy_static! {
68 static ref SELF_HANDLE: zx_handle_t = fuchsia_runtime::process_self().raw_handle();
69 };
70 *SELF_HANDLE as u64 + (request_id as u64) << 32
71}
72
73pub enum BufferSlice<'a> {
74 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
75 Memory(&'a [u8]),
76}
77
78impl<'a> BufferSlice<'a> {
79 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
80 BufferSlice::VmoId { vmo_id, offset, length }
81 }
82}
83
84impl<'a> From<&'a [u8]> for BufferSlice<'a> {
85 fn from(buf: &'a [u8]) -> Self {
86 BufferSlice::Memory(buf)
87 }
88}
89
90pub enum MutableBufferSlice<'a> {
91 VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
92 Memory(&'a mut [u8]),
93}
94
95impl<'a> MutableBufferSlice<'a> {
96 pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
97 MutableBufferSlice::VmoId { vmo_id, offset, length }
98 }
99}
100
101impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
102 fn from(buf: &'a mut [u8]) -> Self {
103 MutableBufferSlice::Memory(buf)
104 }
105}
106
107#[derive(Default)]
108struct RequestState {
109 result: Option<zx::Status>,
110 waker: Option<Waker>,
111}
112
113#[derive(Default)]
114struct FifoState {
115 fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
117
118 next_request_id: u32,
120
121 queue: std::collections::VecDeque<BlockFifoRequest>,
123
124 map: HashMap<u32, RequestState>,
126
127 poller_waker: Option<Waker>,
129}
130
131impl FifoState {
132 fn terminate(&mut self) {
133 self.fifo.take();
134 for (_, request_state) in self.map.iter_mut() {
135 request_state.result.get_or_insert(zx::Status::CANCELED);
136 if let Some(waker) = request_state.waker.take() {
137 waker.wake();
138 }
139 }
140 if let Some(waker) = self.poller_waker.take() {
141 waker.wake();
142 }
143 }
144
145 fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
147 let fifo = if let Some(fifo) = self.fifo.as_ref() {
148 fifo
149 } else {
150 return true;
151 };
152
153 loop {
154 let slice = self.queue.as_slices().0;
155 if slice.is_empty() {
156 return false;
157 }
158 match fifo.try_write(context, slice) {
159 Poll::Ready(Ok(sent)) => {
160 self.queue.drain(0..sent);
161 }
162 Poll::Ready(Err(_)) => {
163 self.terminate();
164 return true;
165 }
166 Poll::Pending => {
167 return false;
168 }
169 }
170 }
171 }
172}
173
174type FifoStateRef = Arc<Mutex<FifoState>>;
175
176struct ResponseFuture {
178 request_id: u32,
179 fifo_state: FifoStateRef,
180}
181
182impl ResponseFuture {
183 fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
184 ResponseFuture { request_id, fifo_state }
185 }
186}
187
188impl Future for ResponseFuture {
189 type Output = Result<(), zx::Status>;
190
191 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
192 let mut state = self.fifo_state.lock();
193 let request_state = state.map.get_mut(&self.request_id).unwrap();
194 if let Some(result) = request_state.result {
195 Poll::Ready(result.into())
196 } else {
197 request_state.waker.replace(context.waker().clone());
198 Poll::Pending
199 }
200 }
201}
202
203impl Drop for ResponseFuture {
204 fn drop(&mut self) {
205 self.fifo_state.lock().map.remove(&self.request_id).unwrap();
206 }
207}
208
209#[derive(Debug)]
211#[must_use]
212pub struct VmoId(AtomicU16);
213
214impl VmoId {
215 pub fn new(id: u16) -> Self {
217 Self(AtomicU16::new(id))
218 }
219
220 pub fn take(&self) -> Self {
222 Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
223 }
224
225 pub fn is_valid(&self) -> bool {
226 self.id() != block_driver::BLOCK_VMOID_INVALID
227 }
228
229 #[must_use]
231 pub fn into_id(self) -> u16 {
232 self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
233 }
234
235 pub fn id(&self) -> u16 {
236 self.0.load(Ordering::Relaxed)
237 }
238}
239
240impl PartialEq for VmoId {
241 fn eq(&self, other: &Self) -> bool {
242 self.id() == other.id()
243 }
244}
245
246impl Eq for VmoId {}
247
248impl Drop for VmoId {
249 fn drop(&mut self) {
250 assert_eq!(
251 self.0.load(Ordering::Relaxed),
252 block_driver::BLOCK_VMOID_INVALID,
253 "Did you forget to detach?"
254 );
255 }
256}
257
258impl Hash for VmoId {
259 fn hash<H: Hasher>(&self, state: &mut H) {
260 self.id().hash(state);
261 }
262}
263
264#[async_trait]
268pub trait BlockClient: Send + Sync {
269 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status>;
271
272 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status>;
274
275 async fn read_at(
277 &self,
278 buffer_slice: MutableBufferSlice<'_>,
279 device_offset: u64,
280 ) -> Result<(), zx::Status> {
281 self.read_at_traced(buffer_slice, device_offset, 0).await
282 }
283
284 async fn read_at_traced(
285 &self,
286 buffer_slice: MutableBufferSlice<'_>,
287 device_offset: u64,
288 trace_flow_id: u64,
289 ) -> Result<(), zx::Status>;
290
291 async fn write_at(
293 &self,
294 buffer_slice: BufferSlice<'_>,
295 device_offset: u64,
296 ) -> Result<(), zx::Status> {
297 self.write_at_with_opts_traced(
298 buffer_slice,
299 device_offset,
300 WriteOptions::empty(),
301 NO_TRACE_ID,
302 )
303 .await
304 }
305
306 async fn write_at_with_opts(
307 &self,
308 buffer_slice: BufferSlice<'_>,
309 device_offset: u64,
310 opts: WriteOptions,
311 ) -> Result<(), zx::Status> {
312 self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID).await
313 }
314
315 async fn write_at_with_opts_traced(
316 &self,
317 buffer_slice: BufferSlice<'_>,
318 device_offset: u64,
319 opts: WriteOptions,
320 trace_flow_id: u64,
321 ) -> Result<(), zx::Status>;
322
323 async fn trim(&self, device_range: Range<u64>) -> Result<(), zx::Status> {
325 self.trim_traced(device_range, NO_TRACE_ID).await
326 }
327
328 async fn trim_traced(
329 &self,
330 device_range: Range<u64>,
331 trace_flow_id: u64,
332 ) -> Result<(), zx::Status>;
333
334 async fn flush(&self) -> Result<(), zx::Status> {
335 self.flush_traced(NO_TRACE_ID).await
336 }
337
338 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status>;
340
341 async fn close(&self) -> Result<(), zx::Status>;
343
344 fn block_size(&self) -> u32;
346
347 fn block_count(&self) -> u64;
349
350 fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
352
353 fn block_flags(&self) -> BlockFlags;
355
356 fn is_connected(&self) -> bool;
358}
359
360struct Common {
361 block_size: u32,
362 block_count: u64,
363 max_transfer_blocks: Option<NonZero<u32>>,
364 block_flags: BlockFlags,
365 fifo_state: FifoStateRef,
366 temp_vmo: futures::lock::Mutex<zx::Vmo>,
367 temp_vmo_id: VmoId,
368}
369
370impl Common {
371 fn new(
372 fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
373 info: &block::BlockInfo,
374 temp_vmo: zx::Vmo,
375 temp_vmo_id: VmoId,
376 ) -> Self {
377 let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
378 fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
379 Self {
380 block_size: info.block_size,
381 block_count: info.block_count,
382 max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
383 NonZero::new(info.max_transfer_size / info.block_size)
384 } else {
385 None
386 },
387 block_flags: info.flags,
388 fifo_state,
389 temp_vmo: futures::lock::Mutex::new(temp_vmo),
390 temp_vmo_id,
391 }
392 }
393
394 fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
395 if bytes % self.block_size as u64 != 0 {
396 Err(zx::Status::INVALID_ARGS)
397 } else {
398 Ok(bytes / self.block_size as u64)
399 }
400 }
401
402 async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
404 async move {
405 let (request_id, trace_flow_id) = {
406 let mut state = self.fifo_state.lock();
407 if state.fifo.is_none() {
408 return Err(zx::Status::CANCELED);
410 }
411 trace::duration!(
412 c"storage",
413 c"block_client::send::start",
414 "op" => opcode_str(request.command.opcode)
415 );
416 let request_id = state.next_request_id;
417 state.next_request_id = state.next_request_id.overflowing_add(1).0;
418 assert!(
419 state.map.insert(request_id, RequestState::default()).is_none(),
420 "request id in use!"
421 );
422 request.reqid = request_id;
423 if request.trace_flow_id == NO_TRACE_ID {
424 request.trace_flow_id = generate_trace_flow_id(request_id);
425 }
426 let trace_flow_id = request.trace_flow_id;
427 trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
428 state.queue.push_back(request);
429 if let Some(waker) = state.poller_waker.clone() {
430 state.poll_send_requests(&mut Context::from_waker(&waker));
431 }
432 (request_id, trace_flow_id)
433 };
434 ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
435 trace::duration!(c"storage", c"block_client::send::end");
436 trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
437 Ok(())
438 }
439 .await
440 }
441
442 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
443 self.send(BlockFifoRequest {
444 command: BlockFifoCommand {
445 opcode: BlockOpcode::CloseVmo.into_primitive(),
446 flags: 0,
447 ..Default::default()
448 },
449 vmoid: vmo_id.into_id(),
450 ..Default::default()
451 })
452 .await
453 }
454
455 async fn read_at(
456 &self,
457 buffer_slice: MutableBufferSlice<'_>,
458 device_offset: u64,
459 trace_flow_id: u64,
460 ) -> Result<(), zx::Status> {
461 match buffer_slice {
462 MutableBufferSlice::VmoId { vmo_id, offset, length } => {
463 self.send(BlockFifoRequest {
464 command: BlockFifoCommand {
465 opcode: BlockOpcode::Read.into_primitive(),
466 flags: 0,
467 ..Default::default()
468 },
469 vmoid: vmo_id.id(),
470 length: self
471 .to_blocks(length)?
472 .try_into()
473 .map_err(|_| zx::Status::INVALID_ARGS)?,
474 vmo_offset: self.to_blocks(offset)?,
475 dev_offset: self.to_blocks(device_offset)?,
476 trace_flow_id,
477 ..Default::default()
478 })
479 .await?
480 }
481 MutableBufferSlice::Memory(mut slice) => {
482 let temp_vmo = self.temp_vmo.lock().await;
483 let mut device_block = self.to_blocks(device_offset)?;
484 loop {
485 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
486 let block_count = self.to_blocks(to_do as u64)? as u32;
487 self.send(BlockFifoRequest {
488 command: BlockFifoCommand {
489 opcode: BlockOpcode::Read.into_primitive(),
490 flags: 0,
491 ..Default::default()
492 },
493 vmoid: self.temp_vmo_id.id(),
494 length: block_count,
495 vmo_offset: 0,
496 dev_offset: device_block,
497 trace_flow_id,
498 ..Default::default()
499 })
500 .await?;
501 temp_vmo.read(&mut slice[..to_do], 0)?;
502 if to_do == slice.len() {
503 break;
504 }
505 device_block += block_count as u64;
506 slice = &mut slice[to_do..];
507 }
508 }
509 }
510 Ok(())
511 }
512
513 async fn write_at(
514 &self,
515 buffer_slice: BufferSlice<'_>,
516 device_offset: u64,
517 opts: WriteOptions,
518 trace_flow_id: u64,
519 ) -> Result<(), zx::Status> {
520 let flags = if opts.contains(WriteOptions::FORCE_ACCESS) {
521 BlockIoFlag::FORCE_ACCESS.bits()
522 } else {
523 0
524 };
525 match buffer_slice {
526 BufferSlice::VmoId { vmo_id, offset, length } => {
527 self.send(BlockFifoRequest {
528 command: BlockFifoCommand {
529 opcode: BlockOpcode::Write.into_primitive(),
530 flags,
531 ..Default::default()
532 },
533 vmoid: vmo_id.id(),
534 length: self
535 .to_blocks(length)?
536 .try_into()
537 .map_err(|_| zx::Status::INVALID_ARGS)?,
538 vmo_offset: self.to_blocks(offset)?,
539 dev_offset: self.to_blocks(device_offset)?,
540 trace_flow_id,
541 ..Default::default()
542 })
543 .await?;
544 }
545 BufferSlice::Memory(mut slice) => {
546 let temp_vmo = self.temp_vmo.lock().await;
547 let mut device_block = self.to_blocks(device_offset)?;
548 loop {
549 let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
550 let block_count = self.to_blocks(to_do as u64)? as u32;
551 temp_vmo.write(&slice[..to_do], 0)?;
552 self.send(BlockFifoRequest {
553 command: BlockFifoCommand {
554 opcode: BlockOpcode::Write.into_primitive(),
555 flags,
556 ..Default::default()
557 },
558 vmoid: self.temp_vmo_id.id(),
559 length: block_count,
560 vmo_offset: 0,
561 dev_offset: device_block,
562 trace_flow_id,
563 ..Default::default()
564 })
565 .await?;
566 if to_do == slice.len() {
567 break;
568 }
569 device_block += block_count as u64;
570 slice = &slice[to_do..];
571 }
572 }
573 }
574 Ok(())
575 }
576
577 async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
578 let length = self.to_blocks(device_range.end - device_range.start)? as u32;
579 let dev_offset = self.to_blocks(device_range.start)?;
580 self.send(BlockFifoRequest {
581 command: BlockFifoCommand {
582 opcode: BlockOpcode::Trim.into_primitive(),
583 flags: 0,
584 ..Default::default()
585 },
586 vmoid: block_driver::BLOCK_VMOID_INVALID,
587 length,
588 dev_offset,
589 trace_flow_id,
590 ..Default::default()
591 })
592 .await
593 }
594
595 async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
596 self.send(BlockFifoRequest {
597 command: BlockFifoCommand {
598 opcode: BlockOpcode::Flush.into_primitive(),
599 flags: 0,
600 ..Default::default()
601 },
602 vmoid: block_driver::BLOCK_VMOID_INVALID,
603 trace_flow_id,
604 ..Default::default()
605 })
606 .await
607 }
608
609 fn block_size(&self) -> u32 {
610 self.block_size
611 }
612
613 fn block_count(&self) -> u64 {
614 self.block_count
615 }
616
617 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
618 self.max_transfer_blocks.clone()
619 }
620
621 fn block_flags(&self) -> BlockFlags {
622 self.block_flags
623 }
624
625 fn is_connected(&self) -> bool {
626 self.fifo_state.lock().fifo.is_some()
627 }
628}
629
630impl Drop for Common {
631 fn drop(&mut self) {
632 let _ = self.temp_vmo_id.take().into_id();
635 self.fifo_state.lock().terminate();
636 }
637}
638
639pub struct RemoteBlockClient {
641 session: block::SessionProxy,
642 common: Common,
643}
644
645pub trait AsBlockProxy {
646 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
647
648 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
649}
650
651impl<T: AsBlockProxy> AsBlockProxy for &T {
652 fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
653 AsBlockProxy::get_info(*self)
654 }
655 fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
656 AsBlockProxy::open_session(*self, session)
657 }
658}
659
660macro_rules! impl_as_block_proxy {
661 ($name:ident) => {
662 impl AsBlockProxy for $name {
663 async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
664 $name::get_info(self).await
665 }
666
667 fn open_session(
668 &self,
669 session: ServerEnd<block::SessionMarker>,
670 ) -> Result<(), fidl::Error> {
671 $name::open_session(self, session)
672 }
673 }
674 };
675}
676
677impl_as_block_proxy!(BlockProxy);
678impl_as_block_proxy!(PartitionProxy);
679impl_as_block_proxy!(VolumeProxy);
680
681impl RemoteBlockClient {
682 pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
684 let info =
685 remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
686 let (session, server) = fidl::endpoints::create_proxy();
687 let () = remote.open_session(server).map_err(fidl_to_status)?;
688 Self::from_session(info, session).await
689 }
690
691 pub async fn from_session(
692 info: block::BlockInfo,
693 session: block::SessionProxy,
694 ) -> Result<Self, zx::Status> {
695 let fifo =
696 session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
697 let fifo = fasync::Fifo::from_fifo(fifo);
698 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
699 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
700 let vmo_id =
701 session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
702 let vmo_id = VmoId::new(vmo_id.id);
703 Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
704 }
705}
706
707#[async_trait]
708impl BlockClient for RemoteBlockClient {
709 async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
710 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
711 let vmo_id = self
712 .session
713 .attach_vmo(dup)
714 .await
715 .map_err(fidl_to_status)?
716 .map_err(zx::Status::from_raw)?;
717 Ok(VmoId::new(vmo_id.id))
718 }
719
720 async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
721 self.common.detach_vmo(vmo_id).await
722 }
723
724 async fn read_at_traced(
725 &self,
726 buffer_slice: MutableBufferSlice<'_>,
727 device_offset: u64,
728 trace_flow_id: u64,
729 ) -> Result<(), zx::Status> {
730 self.common.read_at(buffer_slice, device_offset, trace_flow_id).await
731 }
732
733 async fn write_at_with_opts_traced(
734 &self,
735 buffer_slice: BufferSlice<'_>,
736 device_offset: u64,
737 opts: WriteOptions,
738 trace_flow_id: u64,
739 ) -> Result<(), zx::Status> {
740 self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
741 }
742
743 async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
744 self.common.trim(range, trace_flow_id).await
745 }
746
747 async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
748 self.common.flush(trace_flow_id).await
749 }
750
751 async fn close(&self) -> Result<(), zx::Status> {
752 let () =
753 self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
754 Ok(())
755 }
756
757 fn block_size(&self) -> u32 {
758 self.common.block_size()
759 }
760
761 fn block_count(&self) -> u64 {
762 self.common.block_count()
763 }
764
765 fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
766 self.common.max_transfer_blocks()
767 }
768
769 fn block_flags(&self) -> BlockFlags {
770 self.common.block_flags()
771 }
772
773 fn is_connected(&self) -> bool {
774 self.common.is_connected()
775 }
776}
777
778pub struct RemoteBlockClientSync {
779 session: block::SessionSynchronousProxy,
780 common: Common,
781}
782
783impl RemoteBlockClientSync {
784 pub fn new(
788 client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
789 ) -> Result<Self, zx::Status> {
790 let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
791 let info = remote
792 .get_info(zx::MonotonicInstant::INFINITE)
793 .map_err(fidl_to_status)?
794 .map_err(zx::Status::from_raw)?;
795 let (client, server) = fidl::endpoints::create_endpoints();
796 let () = remote.open_session(server).map_err(fidl_to_status)?;
797 let session = block::SessionSynchronousProxy::new(client.into_channel());
798 let fifo = session
799 .get_fifo(zx::MonotonicInstant::INFINITE)
800 .map_err(fidl_to_status)?
801 .map_err(zx::Status::from_raw)?;
802 let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
803 let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
804 let vmo_id = session
805 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
806 .map_err(fidl_to_status)?
807 .map_err(zx::Status::from_raw)?;
808 let vmo_id = VmoId::new(vmo_id.id);
809
810 let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
813 std::thread::spawn(move || {
814 let mut executor = fasync::LocalExecutor::new();
815 let fifo = fasync::Fifo::from_fifo(fifo);
816 let common = Common::new(fifo, &info, temp_vmo, vmo_id);
817 let fifo_state = common.fifo_state.clone();
818 let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
819 executor.run_singlethreaded(FifoPoller { fifo_state });
820 });
821 block_on(receiver).map_err(|_| zx::Status::CANCELED)?
822 }
823
824 pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
825 let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
826 let vmo_id = self
827 .session
828 .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
829 .map_err(fidl_to_status)?
830 .map_err(zx::Status::from_raw)?;
831 Ok(VmoId::new(vmo_id.id))
832 }
833
834 pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
835 block_on(self.common.detach_vmo(vmo_id))
836 }
837
838 pub fn read_at(
839 &self,
840 buffer_slice: MutableBufferSlice<'_>,
841 device_offset: u64,
842 ) -> Result<(), zx::Status> {
843 block_on(self.common.read_at(buffer_slice, device_offset, NO_TRACE_ID))
844 }
845
846 pub fn write_at(
847 &self,
848 buffer_slice: BufferSlice<'_>,
849 device_offset: u64,
850 ) -> Result<(), zx::Status> {
851 block_on(self.common.write_at(
852 buffer_slice,
853 device_offset,
854 WriteOptions::empty(),
855 NO_TRACE_ID,
856 ))
857 }
858
859 pub fn flush(&self) -> Result<(), zx::Status> {
860 block_on(self.common.flush(NO_TRACE_ID))
861 }
862
863 pub fn close(&self) -> Result<(), zx::Status> {
864 let () = self
865 .session
866 .close(zx::MonotonicInstant::INFINITE)
867 .map_err(fidl_to_status)?
868 .map_err(zx::Status::from_raw)?;
869 Ok(())
870 }
871
872 pub fn block_size(&self) -> u32 {
873 self.common.block_size()
874 }
875
876 pub fn block_count(&self) -> u64 {
877 self.common.block_count()
878 }
879
880 pub fn is_connected(&self) -> bool {
881 self.common.is_connected()
882 }
883}
884
885impl Drop for RemoteBlockClientSync {
886 fn drop(&mut self) {
887 let _ = self.close();
889 }
890}
891
892struct FifoPoller {
894 fifo_state: FifoStateRef,
895}
896
897impl Future for FifoPoller {
898 type Output = ();
899
900 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
901 let mut state_lock = self.fifo_state.lock();
902 let state = state_lock.deref_mut(); if state.poll_send_requests(context) {
906 return Poll::Ready(());
907 }
908
909 let fifo = state.fifo.as_ref().unwrap(); loop {
912 let mut response = MaybeUninit::uninit();
913 match fifo.try_read(context, &mut response) {
914 Poll::Pending => {
915 state.poller_waker = Some(context.waker().clone());
916 return Poll::Pending;
917 }
918 Poll::Ready(Ok(_)) => {
919 let response = unsafe { response.assume_init() };
920 let request_id = response.reqid;
921 if let Some(request_state) = state.map.get_mut(&request_id) {
923 request_state.result.replace(zx::Status::from_raw(response.status));
924 if let Some(waker) = request_state.waker.take() {
925 waker.wake();
926 }
927 }
928 }
929 Poll::Ready(Err(_)) => {
930 state.terminate();
931 return Poll::Ready(());
932 }
933 }
934 }
935 }
936}
937
938#[cfg(test)]
939mod tests {
940 use super::{
941 BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
942 RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
943 };
944 use block_server::{BlockServer, DeviceInfo, PartitionInfo};
945 use fidl::endpoints::RequestStream as _;
946 use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
947 use futures::join;
948 use futures::stream::futures_unordered::FuturesUnordered;
949 use futures::stream::StreamExt as _;
950 use ramdevice_client::RamdiskClient;
951 use std::borrow::Cow;
952 use std::num::NonZero;
953 use std::sync::atomic::{AtomicBool, Ordering};
954 use std::sync::Arc;
955 use {fidl_fuchsia_hardware_block as block, fuchsia_async as fasync};
956
957 const RAMDISK_BLOCK_SIZE: u64 = 1024;
958 const RAMDISK_BLOCK_COUNT: u64 = 1024;
959
960 pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
961 let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
962 .await
963 .expect("RamdiskClient::create failed");
964 let client_end = ramdisk.open().expect("ramdisk.open failed");
965 let proxy = client_end.into_proxy();
966 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
967 assert_eq!(block_client.block_size(), 1024);
968 let client_end = ramdisk.open().expect("ramdisk.open failed");
969 let proxy = client_end.into_proxy();
970 (ramdisk, proxy, block_client)
971 }
972
973 #[fuchsia::test]
974 async fn test_against_ram_disk() {
975 let (_ramdisk, block_proxy, block_client) = make_ramdisk().await;
976
977 let stats_before = block_proxy
978 .get_stats(false)
979 .await
980 .expect("get_stats failed")
981 .map_err(zx::Status::from_raw)
982 .expect("get_stats error");
983
984 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
985 vmo.write(b"hello", 5).expect("vmo.write failed");
986 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
987 block_client
988 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
989 .await
990 .expect("write_at failed");
991 block_client
992 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
993 .await
994 .expect("read_at failed");
995 let mut buf: [u8; 5] = Default::default();
996 vmo.read(&mut buf, 1029).expect("vmo.read failed");
997 assert_eq!(&buf, b"hello");
998 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
999
1000 let stats_after = block_proxy
1002 .get_stats(false)
1003 .await
1004 .expect("get_stats failed")
1005 .map_err(zx::Status::from_raw)
1006 .expect("get_stats error");
1007 assert_eq!(
1009 stats_before.write.success.total_calls + 1,
1010 stats_after.write.success.total_calls
1011 );
1012 assert_eq!(
1013 stats_before.write.success.bytes_transferred + 1024,
1014 stats_after.write.success.bytes_transferred
1015 );
1016 assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls);
1017 assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls);
1019 assert_eq!(
1020 stats_before.read.success.bytes_transferred + 2048,
1021 stats_after.read.success.bytes_transferred
1022 );
1023 assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls);
1024 }
1025
1026 #[fuchsia::test]
1027 async fn test_against_ram_disk_with_flush() {
1028 let (_ramdisk, block_proxy, block_client) = make_ramdisk().await;
1029
1030 let stats_before = block_proxy
1031 .get_stats(false)
1032 .await
1033 .expect("get_stats failed")
1034 .map_err(zx::Status::from_raw)
1035 .expect("get_stats error");
1036
1037 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1038 vmo.write(b"hello", 5).expect("vmo.write failed");
1039 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1040 block_client
1041 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1042 .await
1043 .expect("write_at failed");
1044 block_client.flush().await.expect("flush failed");
1045 block_client
1046 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1047 .await
1048 .expect("read_at failed");
1049 let mut buf: [u8; 5] = Default::default();
1050 vmo.read(&mut buf, 1029).expect("vmo.read failed");
1051 assert_eq!(&buf, b"hello");
1052 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1053
1054 let stats_after = block_proxy
1056 .get_stats(false)
1057 .await
1058 .expect("get_stats failed")
1059 .map_err(zx::Status::from_raw)
1060 .expect("get_stats error");
1061 assert_eq!(
1063 stats_before.write.success.total_calls + 1,
1064 stats_after.write.success.total_calls
1065 );
1066 assert_eq!(
1067 stats_before.write.success.bytes_transferred + 1024,
1068 stats_after.write.success.bytes_transferred
1069 );
1070 assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls);
1071 assert_eq!(
1073 stats_before.flush.success.total_calls + 1,
1074 stats_after.flush.success.total_calls
1075 );
1076 assert_eq!(stats_before.flush.failure.total_calls, stats_after.flush.failure.total_calls);
1077 assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls);
1079 assert_eq!(
1080 stats_before.read.success.bytes_transferred + 2048,
1081 stats_after.read.success.bytes_transferred
1082 );
1083 assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls);
1084 }
1085
1086 #[fuchsia::test]
1087 async fn test_alignment() {
1088 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1089 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1090 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1091 block_client
1092 .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1093 .await
1094 .expect_err("expected failure due to bad alignment");
1095 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1096 }
1097
1098 #[fuchsia::test]
1099 async fn test_parallel_io() {
1100 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1101 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1102 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1103 let mut reads = Vec::new();
1104 for _ in 0..1024 {
1105 reads.push(
1106 block_client
1107 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1108 .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1109 );
1110 }
1111 futures::future::join_all(reads).await;
1112 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1113 }
1114
1115 #[fuchsia::test]
1116 async fn test_closed_device() {
1117 let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1118 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1119 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1120 let mut reads = Vec::new();
1121 for _ in 0..1024 {
1122 reads.push(
1123 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1124 );
1125 }
1126 assert!(block_client.is_connected());
1127 let _ = futures::join!(futures::future::join_all(reads), async {
1128 ramdisk.destroy().await.expect("ramdisk.destroy failed")
1129 });
1130 while block_client
1132 .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1133 .await
1134 .is_ok()
1135 {}
1136
1137 while block_client.is_connected() {
1140 fasync::Timer::new(fasync::MonotonicInstant::after(
1142 zx::MonotonicDuration::from_millis(500),
1143 ))
1144 .await;
1145 }
1146
1147 assert_eq!(block_client.is_connected(), false);
1149 let _ = block_client.detach_vmo(vmo_id).await;
1150 }
1151
1152 #[fuchsia::test]
1153 async fn test_cancelled_reads() {
1154 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1155 let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1156 let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1157 {
1158 let mut reads = FuturesUnordered::new();
1159 for _ in 0..1024 {
1160 reads.push(
1161 block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1162 );
1163 }
1164 for _ in 0..500 {
1166 reads.next().await;
1167 }
1168 }
1169 block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1170 }
1171
1172 #[fuchsia::test]
1173 async fn test_parallel_large_read_and_write_with_memory_succeds() {
1174 let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1175 let block_client_ref = &block_client;
1176 let test_one = |offset, len, fill| async move {
1177 let buf = vec![fill; len];
1178 block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1179 let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1181 block_client_ref
1182 .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1183 .await
1184 .expect("read_at failed");
1185 assert_eq!(
1186 &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1187 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1188 );
1189 assert_eq!(
1190 &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1191 &buf[..]
1192 );
1193 assert_eq!(
1194 &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1195 &[0; RAMDISK_BLOCK_SIZE as usize][..]
1196 );
1197 };
1198 const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1199 join!(
1200 test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1201 test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1202 );
1203 }
1204
1205 struct FakeBlockServer<'a> {
1209 server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1210 channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1211 fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1212 }
1213
1214 impl<'a> FakeBlockServer<'a> {
1215 fn new(
1227 server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1228 channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1229 fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1230 ) -> FakeBlockServer<'a> {
1231 FakeBlockServer {
1232 server_channel: Some(server_channel),
1233 channel_handler: Box::new(channel_handler),
1234 fifo_handler: Box::new(fifo_handler),
1235 }
1236 }
1237
1238 async fn run(&mut self) {
1240 let server = self.server_channel.take().unwrap();
1241
1242 let (server_fifo, client_fifo) =
1244 zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1245 .expect("Fifo::create failed");
1246 let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1247
1248 let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1249 let fifo_future = Abortable::new(
1250 async {
1251 let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1252 let (mut reader, mut writer) = fifo.async_io();
1253 let mut request = BlockFifoRequest::default();
1254 loop {
1255 match reader.read_entries(&mut request).await {
1256 Ok(1) => {}
1257 Err(zx::Status::PEER_CLOSED) => break,
1258 Err(e) => panic!("read_entry failed {:?}", e),
1259 _ => unreachable!(),
1260 };
1261
1262 let response = self.fifo_handler.as_ref()(request);
1263 writer
1264 .write_entries(std::slice::from_ref(&response))
1265 .await
1266 .expect("write_entries failed");
1267 }
1268 },
1269 fifo_future_abort_registration,
1270 );
1271
1272 let channel_future = async {
1273 server
1274 .into_stream()
1275 .for_each_concurrent(None, |request| async {
1276 let request = request.expect("unexpected fidl error");
1277
1278 match request {
1279 block::BlockRequest::GetInfo { responder } => {
1280 responder
1281 .send(Ok(&block::BlockInfo {
1282 block_count: 1024,
1283 block_size: 512,
1284 max_transfer_size: 1024 * 1024,
1285 flags: block::Flag::empty(),
1286 }))
1287 .expect("send failed");
1288 }
1289 block::BlockRequest::OpenSession { session, control_handle: _ } => {
1290 let stream = session.into_stream();
1291 stream
1292 .for_each(|request| async {
1293 let request = request.expect("unexpected fidl error");
1294 if self.channel_handler.as_ref()(&request) {
1297 return;
1298 }
1299 match request {
1300 block::SessionRequest::GetFifo { responder } => {
1301 match maybe_server_fifo.lock().take() {
1302 Some(fifo) => {
1303 responder.send(Ok(fifo.downcast()))
1304 }
1305 None => responder.send(Err(
1306 zx::Status::NO_RESOURCES.into_raw(),
1307 )),
1308 }
1309 .expect("send failed")
1310 }
1311 block::SessionRequest::AttachVmo {
1312 vmo: _,
1313 responder,
1314 } => responder
1315 .send(Ok(&block::VmoId { id: 1 }))
1316 .expect("send failed"),
1317 block::SessionRequest::Close { responder } => {
1318 fifo_future_abort.abort();
1319 responder.send(Ok(())).expect("send failed")
1320 }
1321 }
1322 })
1323 .await
1324 }
1325 _ => panic!("Unexpected message"),
1326 }
1327 })
1328 .await;
1329 };
1330
1331 let _result = join!(fifo_future, channel_future);
1332 }
1334 }
1335
1336 #[fuchsia::test]
1337 async fn test_block_close_is_called() {
1338 let close_called = fuchsia_sync::Mutex::new(false);
1339 let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1340
1341 std::thread::spawn(move || {
1342 let _block_client =
1343 RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1344 });
1346
1347 let channel_handler = |request: &block::SessionRequest| -> bool {
1348 if let block::SessionRequest::Close { .. } = request {
1349 *close_called.lock() = true;
1350 }
1351 false
1352 };
1353 FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1354
1355 assert!(*close_called.lock());
1357 }
1358
1359 #[fuchsia::test]
1360 async fn test_block_flush_is_called() {
1361 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1362
1363 struct Interface {
1364 flush_called: Arc<AtomicBool>,
1365 }
1366 impl block_server::async_interface::Interface for Interface {
1367 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1368 Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1369 device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1370 max_transfer_blocks: None,
1371 block_range: Some(0..1000),
1372 type_guid: [0; 16],
1373 instance_guid: [0; 16],
1374 name: "foo".to_string(),
1375 flags: 0,
1376 })))
1377 }
1378
1379 async fn read(
1380 &self,
1381 _device_block_offset: u64,
1382 _block_count: u32,
1383 _vmo: &Arc<zx::Vmo>,
1384 _vmo_offset: u64,
1385 _trace_flow_id: Option<NonZero<u64>>,
1386 ) -> Result<(), zx::Status> {
1387 unreachable!();
1388 }
1389
1390 async fn write(
1391 &self,
1392 _device_block_offset: u64,
1393 _block_count: u32,
1394 _vmo: &Arc<zx::Vmo>,
1395 _vmo_offset: u64,
1396 _opts: WriteOptions,
1397 _trace_flow_id: Option<NonZero<u64>>,
1398 ) -> Result<(), zx::Status> {
1399 unreachable!();
1400 }
1401
1402 async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1403 self.flush_called.store(true, Ordering::Relaxed);
1404 Ok(())
1405 }
1406
1407 async fn trim(
1408 &self,
1409 _device_block_offset: u64,
1410 _block_count: u32,
1411 _trace_flow_id: Option<NonZero<u64>>,
1412 ) -> Result<(), zx::Status> {
1413 unreachable!();
1414 }
1415 }
1416
1417 let flush_called = Arc::new(AtomicBool::new(false));
1418
1419 futures::join!(
1420 async {
1421 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1422
1423 block_client.flush().await.expect("flush failed");
1424 },
1425 async {
1426 let block_server = BlockServer::new(
1427 512,
1428 Arc::new(Interface { flush_called: flush_called.clone() }),
1429 );
1430 block_server.handle_requests(stream.cast_stream()).await.unwrap();
1431 }
1432 );
1433
1434 assert!(flush_called.load(Ordering::Relaxed));
1435 }
1436
1437 #[fuchsia::test]
1438 async fn test_trace_flow_ids_set() {
1439 let (proxy, server) = fidl::endpoints::create_proxy();
1440
1441 futures::join!(
1442 async {
1443 let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1444 block_client.flush().await.expect("flush failed");
1445 },
1446 async {
1447 let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1448 let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1449 if request.trace_flow_id > 0 {
1450 *flow_id.lock() = Some(request.trace_flow_id);
1451 }
1452 BlockFifoResponse {
1453 status: zx::Status::OK.into_raw(),
1454 reqid: request.reqid,
1455 ..Default::default()
1456 }
1457 };
1458 FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1459 assert!(flow_id.lock().is_some());
1461 }
1462 );
1463 }
1464}