1use super::{
6 ActiveRequests, DecodedRequest, DeviceInfo, FIFO_MAX_REQUESTS, HandleRequestResult,
7 IntoOrchestrator, OffsetMap, Operation, SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags, WriteOptions};
11use fidl_fuchsia_storage_block as fblock;
12use fidl_fuchsia_storage_block::DeviceFlag;
13use fuchsia_async as fasync;
14use fuchsia_sync::Mutex;
15use futures::future::{Fuse, FusedFuture, join};
16use futures::stream::FuturesUnordered;
17use futures::{FutureExt, StreamExt, select_biased};
18use std::borrow::Cow;
19use std::collections::VecDeque;
20use std::future::{Future, poll_fn};
21use std::mem::MaybeUninit;
22use std::pin::pin;
23use std::sync::{Arc, OnceLock};
24use std::task::{Poll, ready};
25use storage_device::buffer::Buffer;
26use storage_device::buffer_allocator::{BufferAllocator, BufferSource};
27
28pub trait Interface: Send + Sync + Unpin + 'static {
29 fn open_session(
44 &self,
45 session_manager: Arc<SessionManager<Self>>,
46 stream: fblock::SessionRequestStream,
47 offset_map: OffsetMap,
48 block_size: u32,
49 ) -> impl Future<Output = Result<(), Error>> + Send {
50 session_manager.serve_session(stream, offset_map, block_size)
52 }
53
54 fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
58 async { Ok(()) }
59 }
60
61 fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
63
64 fn get_info(&self) -> Cow<'_, DeviceInfo>;
66
67 fn read(
69 &self,
70 device_block_offset: u64,
71 block_count: u32,
72 vmo: &Arc<zx::Vmo>,
73 vmo_offset: u64, opts: ReadOptions,
75 trace_flow_id: TraceFlowId,
76 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
77
78 fn write(
80 &self,
81 device_block_offset: u64,
82 block_count: u32,
83 vmo: &Arc<zx::Vmo>,
84 vmo_offset: u64, opts: WriteOptions,
86 trace_flow_id: TraceFlowId,
87 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
88
89 fn flush(
91 &self,
92 trace_flow_id: TraceFlowId,
93 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
94
95 fn trim(
97 &self,
98 device_block_offset: u64,
99 block_count: u32,
100 trace_flow_id: TraceFlowId,
101 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
102
103 fn get_volume_info(
105 &self,
106 ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
107 {
108 async { Err(zx::Status::NOT_SUPPORTED) }
109 }
110
111 fn query_slices(
113 &self,
114 _start_slices: &[u64],
115 ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
116 async { Err(zx::Status::NOT_SUPPORTED) }
117 }
118
119 fn extend(
121 &self,
122 _start_slice: u64,
123 _slice_count: u64,
124 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
125 async { Err(zx::Status::NOT_SUPPORTED) }
126 }
127
128 fn shrink(
130 &self,
131 _start_slice: u64,
132 _slice_count: u64,
133 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
134 async { Err(zx::Status::NOT_SUPPORTED) }
135 }
136}
137
138pub struct PassthroughSession(fblock::SessionProxy);
140
141impl PassthroughSession {
142 pub fn new(proxy: fblock::SessionProxy) -> Self {
143 Self(proxy)
144 }
145
146 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
147 match request {
148 fblock::SessionRequest::GetFifo { responder } => {
149 responder.send(self.0.get_fifo().await?)?;
150 }
151 fblock::SessionRequest::AttachVmo { vmo, responder } => {
152 responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
153 }
154 fblock::SessionRequest::Close { responder } => {
155 responder.send(self.0.close().await?)?;
156 }
157 }
158 Ok(())
159 }
160
161 pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
163 while let Some(Ok(request)) = stream.next().await {
164 if let Err(error) = self.handle_request(request).await {
165 log::warn!(error:?; "FIDL error");
166 }
167 }
168 Ok(())
169 }
170}
171
172pub struct SessionManager<I: Interface + ?Sized> {
173 interface: Arc<I>,
174 active_requests: ActiveRequests<usize>,
175
176 buffer_allocator: OnceLock<BufferAllocator>,
179}
180
181impl<I: Interface + ?Sized> Drop for SessionManager<I> {
182 fn drop(&mut self) {
183 if let Some(allocator) = self.buffer_allocator.get() {
184 self.interface.on_detach_vmo(allocator.buffer_source().vmo());
185 }
186 }
187}
188
189impl<I: Interface + ?Sized> SessionManager<I> {
190 pub fn new(interface: Arc<I>) -> Self {
191 Self {
192 interface,
193 active_requests: ActiveRequests::default(),
194 buffer_allocator: OnceLock::new(),
195 }
196 }
197
198 pub fn interface(&self) -> &I {
199 self.interface.as_ref()
200 }
201
202 pub async fn serve_session(
204 self: Arc<Self>,
205 stream: fblock::SessionRequestStream,
206 offset_map: OffsetMap,
207 block_size: u32,
208 ) -> Result<(), Error> {
209 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
210 let session = Arc::new(Session {
211 helper: Arc::new(helper),
212 interface: self.interface.clone(),
213 close_callback: Mutex::new(None),
214 });
215
216 let (stop_sender, stop_receiver) = futures::channel::oneshot::channel();
217
218 let mut stream = stream.fuse();
219 let scope = fasync::Scope::new();
220 let session_clone = session.clone();
221 let mut fifo_task = scope
222 .spawn(async move {
223 if let Err(status) = session_clone.run_fifo(fifo, stop_receiver).await {
224 if status != zx::Status::PEER_CLOSED {
225 log::error!(status:?; "FIFO error");
226 }
227 }
228 })
229 .fuse();
230
231 scopeguard::defer! {
233 for (_, (vmo, _)) in session.helper.take_vmos() {
234 self.interface.on_detach_vmo(&vmo);
235 }
236 }
237
238 let mut closing = false;
239 let mut stop_sender = Some(stop_sender);
240
241 loop {
242 futures::select! {
243 maybe_req = if closing {
244 futures::future::pending().left_future()
245 } else {
246 stream.next().right_future()
247 } => {
248 if let Some(req) = maybe_req {
249 match session.helper.handle_request(req?).await? {
250 HandleRequestResult::Ok => {},
251 HandleRequestResult::Closed(callback) => {
252 *session.close_callback.lock() = Some(callback);
253 if let Some(sender) = stop_sender.take() {
255 let _ = sender.send(());
256 }
257 closing = true;
258 }
259 }
260 } else {
261 if let Some(sender) = stop_sender.take() {
263 let _ = sender.send(());
264 }
265 closing = true;
266 }
267 }
268 _ = fifo_task => break,
269 }
270 }
271 Ok(())
272 }
273}
274
275pub struct Session<I: Interface + ?Sized> {
276 interface: Arc<I>,
277 helper: Arc<SessionHelper<SessionManager<I>>>,
278 close_callback: Mutex<Option<Box<dyn FnOnce() + Send + 'static>>>,
279}
280
281impl<I: Interface + ?Sized> Session<I> {
282 async fn run_fifo(
284 &self,
285 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
286 stop_signal: futures::channel::oneshot::Receiver<()>,
287 ) -> Result<(), zx::Status> {
288 scopeguard::defer! {
289 self.helper.drop_active_requests(|session| *session == self as *const _ as usize);
292 }
293
294 let mut fifo = fasync::Fifo::from_fifo(fifo);
304 let (mut reader, mut writer) = fifo.async_io();
305 let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
306 let active_requests = &self.helper.session_manager().active_requests;
307 let mut active_request_futures = FuturesUnordered::new();
308 let mut responses = Vec::new();
309
310 let mut map_future = pin!(Fuse::terminated());
315 let mut pending_mappings: VecDeque<DecodedRequest> = VecDeque::new();
316
317 let mut stop_signal = pin!(stop_signal.fuse());
320 let mut is_closed = false;
321
322 loop {
323 let new_requests = {
324 let pending_requests = active_request_futures.len() + responses.len();
327
328 if is_closed
329 && pending_requests == 0
330 && map_future.is_terminated()
331 && pending_mappings.is_empty()
332 {
333 return Ok(());
334 }
335
336 let count = requests.len().saturating_sub(pending_requests);
337 let mut receive_requests = pin!(if count == 0 || is_closed {
338 Fuse::terminated()
339 } else {
340 reader.read_entries(&mut requests[..count]).fuse()
341 });
342 let mut send_responses = pin!(if responses.is_empty() {
343 Fuse::terminated()
344 } else {
345 poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
346 match ready!(writer.try_write(cx, &responses[..])) {
347 Ok(written) => {
348 responses.drain(..written);
349 Poll::Ready(Ok(()))
350 }
351 Err(status) => Poll::Ready(Err(status)),
352 }
353 })
354 .fuse()
355 });
356
357 select_biased!(
360 res = send_responses => {
361 res?;
362 0
363 },
364 response = active_request_futures.select_next_some() => {
365 responses.extend(response);
366 0
367 }
368 result = map_future => {
369 match result {
370 Ok((request, remainder, commit_decompression_buffers)) => {
371 active_request_futures.push(self.process_fifo_request(
372 request,
373 commit_decompression_buffers,
374 ));
375 if let Some(remainder) = remainder {
376 map_future.set(
377 self.map_request_or_get_response(remainder).fuse()
378 );
379 }
380 }
381 Err(response) => responses.extend(response),
382 }
383 if map_future.is_terminated() {
384 if let Some(request) = pending_mappings.pop_front() {
385 map_future.set(self.map_request_or_get_response(request).fuse());
386 }
387 }
388 0
389 }
390 _ = stop_signal => {
391 is_closed = true;
392 0
393 }
394 count = receive_requests => {
395 count?
396 }
397 )
398 };
399
400 for request in &mut requests[..new_requests] {
403 match self.helper.decode_fifo_request(self as *const _ as usize, unsafe {
404 request.assume_init_mut()
405 }) {
406 Ok(DecodedRequest {
407 operation: Operation::CloseVmo, vmo, request_id, ..
408 }) => {
409 if let Some(vmo) = vmo {
410 self.interface.on_detach_vmo(vmo.as_ref());
411 }
412 responses.extend(
413 active_requests
414 .complete_and_take_response(request_id, zx::Status::OK)
415 .map(|(_, response)| response),
416 );
417 }
418 Ok(request) => {
419 if map_future.is_terminated() {
420 map_future.set(self.map_request_or_get_response(request).fuse());
421 } else {
422 pending_mappings.push_back(request);
423 }
424 }
425 Err(None) => {}
426 Err(Some(response)) => responses.push(response),
427 }
428 }
429 }
430 }
431
432 async fn map_request_or_get_response(
433 &self,
434 request: DecodedRequest,
435 ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), Option<BlockFifoResponse>> {
436 let request_id = request.request_id;
437 self.map_request(request).await.map_err(|status| {
438 self.helper
439 .orchestrator
440 .active_requests
441 .complete_and_take_response(request_id, status)
442 .map(|(_, r)| r)
443 })
444 }
445
446 async fn map_request(
449 &self,
450 mut request: DecodedRequest,
451 ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), zx::Status> {
452 let mut active_requests;
453 let active_request;
454 let mut commit_decompression_buffers = false;
455 let flags = self.interface.get_info().as_ref().device_flags();
456 if !flags.contains(DeviceFlag::BARRIER_SUPPORT)
459 && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
460 {
461 if let Some(id) = request.trace_flow_id {
462 fuchsia_trace::async_instant!(
463 fuchsia_trace::Id::from(id.get()),
464 "storage",
465 "block_server::SimulatedBarrier",
466 "request_id" => request.request_id.0
467 );
468 }
469 self.interface.flush(request.trace_flow_id).await?;
470 }
471
472 match request.operation {
474 Operation::StartDecompressedRead {
475 required_buffer_size,
476 device_block_offset,
477 block_count,
478 options,
479 } => {
480 let allocator = match self.helper.session_manager().buffer_allocator.get() {
481 Some(a) => a,
482 None => {
483 let source = BufferSource::new(fblock::MAX_DECOMPRESSED_BYTES as usize);
486 self.interface.on_attach_vmo(&source.vmo()).await?;
487 let allocator = BufferAllocator::new(
488 std::cmp::max(
489 self.helper.block_size as usize,
490 zx::system_get_page_size() as usize,
491 ),
492 source,
493 );
494 self.helper.session_manager().buffer_allocator.set(allocator).unwrap();
495 self.helper.session_manager().buffer_allocator.get().unwrap()
496 }
497 };
498
499 if required_buffer_size > fblock::MAX_DECOMPRESSED_BYTES as usize {
500 return Err(zx::Status::OUT_OF_RANGE);
501 }
502
503 let buffer = allocator.allocate_buffer(required_buffer_size).await;
504 let vmo_offset = buffer.range().start as u64;
505
506 unsafe fn remove_lifetime(buffer: Buffer<'_>) -> Buffer<'static> {
510 unsafe { std::mem::transmute(buffer) }
511 }
512
513 active_requests = self.helper.session_manager().active_requests.0.lock();
514 active_request = &mut active_requests.requests[request.request_id.0];
515
516 active_request.decompression_info.as_mut().unwrap().buffer =
519 Some(unsafe { remove_lifetime(buffer) });
520
521 request.operation = Operation::Read {
522 device_block_offset,
523 block_count,
524 _unused: 0,
525 vmo_offset,
526 options,
527 };
528 request.vmo = Some(allocator.buffer_source().vmo().clone());
529
530 commit_decompression_buffers = true;
531 }
532 Operation::ContinueDecompressedRead {
533 offset,
534 device_block_offset,
535 block_count,
536 options,
537 } => {
538 active_requests = self.helper.session_manager().active_requests.0.lock();
539 active_request = &mut active_requests.requests[request.request_id.0];
540
541 let buffer =
542 active_request.decompression_info.as_ref().unwrap().buffer.as_ref().unwrap();
543
544 if offset >= buffer.len() as u64
546 || buffer.len() as u64 - offset
547 < block_count as u64 * self.helper.block_size as u64
548 {
549 return Err(zx::Status::OUT_OF_RANGE);
550 }
551
552 request.operation = Operation::Read {
553 device_block_offset,
554 block_count,
555 _unused: 0,
556 vmo_offset: buffer.range().start as u64 + offset,
557 options,
558 };
559
560 let allocator = self.helper.session_manager().buffer_allocator.get().unwrap();
561 request.vmo = Some(allocator.buffer_source().vmo().clone());
562 }
563 _ => {
564 active_requests = self.helper.session_manager().active_requests.0.lock();
565 active_request = &mut active_requests.requests[request.request_id.0];
566 }
567 }
568
569 self.helper
573 .map_request(request, active_request)
574 .map(|(request, remainder)| (request, remainder, commit_decompression_buffers))
575 }
576
577 async fn process_fifo_request(
579 &self,
580 DecodedRequest { request_id, operation, vmo, trace_flow_id }: DecodedRequest,
581 commit_decompression_buffers: bool,
582 ) -> Option<BlockFifoResponse> {
583 let mut needs_postflush = false;
584 let result = match operation {
585 Operation::Read { device_block_offset, block_count, _unused, vmo_offset, options } => {
586 join(
587 self.interface.read(
588 device_block_offset,
589 block_count,
590 vmo.as_ref().unwrap(),
591 vmo_offset,
592 options,
593 trace_flow_id,
594 ),
595 async {
596 if commit_decompression_buffers {
597 let (target_slice, buffer_slice, buffer_range) = {
598 let active_request = self
599 .helper
600 .session_manager()
601 .active_requests
602 .request(request_id);
603 let info = active_request.decompression_info.as_ref().unwrap();
604 (
605 info.uncompressed_slice(),
606 self.helper
607 .orchestrator
608 .buffer_allocator
609 .get()
610 .unwrap()
611 .buffer_source()
612 .slice(),
613 info.buffer.as_ref().unwrap().range(),
614 )
615 };
616 let vmar = fuchsia_runtime::vmar_root_self();
617 let addr = target_slice.addr();
619 let unaligned = addr % zx::system_get_page_size() as usize;
620 if let Err(error) = vmar.op_range(
621 zx::VmarOp::COMMIT,
622 addr - unaligned,
623 target_slice.len() + unaligned,
624 ) {
625 log::warn!(error:?; "Unable to commit target range");
626 }
627 if let Err(error) = vmar.op_range(
629 zx::VmarOp::PREFETCH,
630 buffer_slice.addr() + buffer_range.start,
631 buffer_range.len(),
632 ) {
633 log::warn!(
634 error:?,
635 buffer_range:?;
636 "Unable to prefetch source range",
637 );
638 }
639 }
640 },
641 )
642 .await
643 .0
644 }
645 Operation::Write {
646 device_block_offset,
647 block_count,
648 _unused,
649 vmo_offset,
650 mut options,
651 } => {
652 if options.flags.contains(WriteFlags::FORCE_ACCESS) {
655 let flags = self.interface.get_info().as_ref().device_flags();
656 if !flags.contains(DeviceFlag::FUA_SUPPORT) {
657 options.flags.remove(WriteFlags::FORCE_ACCESS);
658 needs_postflush = true;
659 }
660 }
661 self.interface
662 .write(
663 device_block_offset,
664 block_count,
665 vmo.as_ref().unwrap(),
666 vmo_offset,
667 options,
668 trace_flow_id,
669 )
670 .await
671 }
672 Operation::Flush => self.interface.flush(trace_flow_id).await,
673 Operation::Trim { device_block_offset, block_count } => {
674 self.interface.trim(device_block_offset, block_count, trace_flow_id).await
675 }
676 Operation::CloseVmo
677 | Operation::StartDecompressedRead { .. }
678 | Operation::ContinueDecompressedRead { .. } => {
679 unreachable!()
681 }
682 };
683 let response = self
684 .helper
685 .orchestrator
686 .active_requests
687 .complete_and_take_response(request_id, result.into())
688 .map(|(_, r)| r);
689 if let Some(mut response) = response {
690 if zx::Status::from_raw(response.status) == zx::Status::OK && needs_postflush {
692 if let Some(id) = trace_flow_id {
693 fuchsia_trace::async_instant!(
694 fuchsia_trace::Id::from(id.get()),
695 "storage",
696 "block_server::SimulatedFUA",
697 "request_id" => request_id.0
698 );
699 }
700 response.status =
701 zx::Status::from(self.interface.flush(trace_flow_id).await).into_raw();
702 }
703 Some(response)
704 } else {
705 response
706 }
707 }
708}
709
710impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
711 type Orchestrator = Self;
712
713 const SUPPORTS_DECOMPRESSION: bool = true;
714
715 type Session = usize;
717
718 async fn on_attach_vmo(orchestrator: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
719 I::on_attach_vmo(&orchestrator.interface, vmo).await
720 }
721
722 async fn open_session(
723 orchestrator: Arc<Self>,
724 stream: fblock::SessionRequestStream,
725 offset_map: OffsetMap,
726 block_size: u32,
727 ) -> Result<(), Error> {
728 I::open_session(
729 &orchestrator.interface,
730 orchestrator.clone(),
731 stream,
732 offset_map,
733 block_size,
734 )
735 .await
736 }
737
738 fn get_info(&self) -> Cow<'_, DeviceInfo> {
739 self.interface.get_info()
740 }
741
742 async fn get_volume_info(
743 &self,
744 ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
745 self.interface.get_volume_info().await
746 }
747
748 async fn query_slices(
749 &self,
750 start_slices: &[u64],
751 ) -> Result<Vec<fblock::VsliceRange>, zx::Status> {
752 self.interface.query_slices(start_slices).await
753 }
754
755 async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
756 self.interface.extend(start_slice, slice_count).await
757 }
758
759 async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
760 self.interface.shrink(start_slice, slice_count).await
761 }
762
763 fn active_requests(&self) -> &ActiveRequests<Self::Session> {
764 return &self.active_requests;
765 }
766}
767
768impl<I: Interface + ?Sized> Drop for Session<I> {
769 fn drop(&mut self) {
770 let callback = std::mem::take(&mut *self.close_callback.lock());
771 if let Some(callback) = callback {
772 callback();
773 }
774 }
775}
776
777impl<I: Interface> IntoOrchestrator for Arc<I> {
778 type SM = SessionManager<I>;
779
780 fn into_orchestrator(self) -> Arc<Self::SM> {
781 Arc::new(SessionManager {
782 interface: self,
783 active_requests: ActiveRequests::default(),
784 buffer_allocator: OnceLock::new(),
785 })
786 }
787}