1use super::{
6 ActiveRequests, DecodedRequest, DeviceInfo, FIFO_MAX_REQUESTS, IntoOrchestrator, OffsetMap,
7 Operation, SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags, WriteOptions};
11use fidl_fuchsia_storage_block::DeviceFlag;
12use futures::future::{Fuse, FusedFuture, join};
13use futures::stream::FuturesUnordered;
14use futures::{FutureExt, StreamExt, select_biased};
15use std::borrow::Cow;
16use std::collections::VecDeque;
17use std::future::{Future, poll_fn};
18use std::mem::MaybeUninit;
19use std::pin::pin;
20use std::sync::{Arc, OnceLock};
21use std::task::{Poll, ready};
22use storage_device::buffer::Buffer;
23use storage_device::buffer_allocator::{BufferAllocator, BufferSource};
24use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
25
26pub trait Interface: Send + Sync + Unpin + 'static {
27 fn open_session(
42 &self,
43 session_manager: Arc<SessionManager<Self>>,
44 stream: fblock::SessionRequestStream,
45 offset_map: OffsetMap,
46 block_size: u32,
47 ) -> impl Future<Output = Result<(), Error>> + Send {
48 session_manager.serve_session(stream, offset_map, block_size)
50 }
51
52 fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
56 async { Ok(()) }
57 }
58
59 fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
61
62 fn get_info(&self) -> Cow<'_, DeviceInfo>;
64
65 fn read(
67 &self,
68 device_block_offset: u64,
69 block_count: u32,
70 vmo: &Arc<zx::Vmo>,
71 vmo_offset: u64, opts: ReadOptions,
73 trace_flow_id: TraceFlowId,
74 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
75
76 fn write(
78 &self,
79 device_block_offset: u64,
80 block_count: u32,
81 vmo: &Arc<zx::Vmo>,
82 vmo_offset: u64, opts: WriteOptions,
84 trace_flow_id: TraceFlowId,
85 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
86
87 fn flush(
89 &self,
90 trace_flow_id: TraceFlowId,
91 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
92
93 fn trim(
95 &self,
96 device_block_offset: u64,
97 block_count: u32,
98 trace_flow_id: TraceFlowId,
99 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
100
101 fn get_volume_info(
103 &self,
104 ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
105 {
106 async { Err(zx::Status::NOT_SUPPORTED) }
107 }
108
109 fn query_slices(
111 &self,
112 _start_slices: &[u64],
113 ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
114 async { Err(zx::Status::NOT_SUPPORTED) }
115 }
116
117 fn extend(
119 &self,
120 _start_slice: u64,
121 _slice_count: u64,
122 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
123 async { Err(zx::Status::NOT_SUPPORTED) }
124 }
125
126 fn shrink(
128 &self,
129 _start_slice: u64,
130 _slice_count: u64,
131 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
132 async { Err(zx::Status::NOT_SUPPORTED) }
133 }
134}
135
136pub struct PassthroughSession(fblock::SessionProxy);
138
139impl PassthroughSession {
140 pub fn new(proxy: fblock::SessionProxy) -> Self {
141 Self(proxy)
142 }
143
144 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
145 match request {
146 fblock::SessionRequest::GetFifo { responder } => {
147 responder.send(self.0.get_fifo().await?)?;
148 }
149 fblock::SessionRequest::AttachVmo { vmo, responder } => {
150 responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
151 }
152 fblock::SessionRequest::Close { responder } => {
153 responder.send(self.0.close().await?)?;
154 }
155 }
156 Ok(())
157 }
158
159 pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
161 while let Some(Ok(request)) = stream.next().await {
162 if let Err(error) = self.handle_request(request).await {
163 log::warn!(error:?; "FIDL error");
164 }
165 }
166 Ok(())
167 }
168}
169
170pub struct SessionManager<I: Interface + ?Sized> {
171 interface: Arc<I>,
172 active_requests: ActiveRequests<usize>,
173
174 buffer_allocator: OnceLock<BufferAllocator>,
177}
178
179impl<I: Interface + ?Sized> Drop for SessionManager<I> {
180 fn drop(&mut self) {
181 if let Some(allocator) = self.buffer_allocator.get() {
182 self.interface.on_detach_vmo(allocator.buffer_source().vmo());
183 }
184 }
185}
186
187impl<I: Interface + ?Sized> SessionManager<I> {
188 pub fn new(interface: Arc<I>) -> Self {
189 Self {
190 interface,
191 active_requests: ActiveRequests::default(),
192 buffer_allocator: OnceLock::new(),
193 }
194 }
195
196 pub fn interface(&self) -> &I {
197 self.interface.as_ref()
198 }
199
200 pub async fn serve_session(
202 self: Arc<Self>,
203 stream: fblock::SessionRequestStream,
204 offset_map: OffsetMap,
205 block_size: u32,
206 ) -> Result<(), Error> {
207 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
208 let session = Session { helper: Arc::new(helper), interface: self.interface.clone() };
209 let mut stream = stream.fuse();
210 let scope = fasync::Scope::new();
211 let helper = session.helper.clone();
212 let mut fifo_task = scope
213 .spawn(async move {
214 if let Err(status) = session.run_fifo(fifo).await {
215 if status != zx::Status::PEER_CLOSED {
216 log::error!(status:?; "FIFO error");
217 }
218 }
219 })
220 .fuse();
221
222 scopeguard::defer! {
224 for (_, (vmo, _)) in helper.take_vmos() {
225 self.interface.on_detach_vmo(&vmo);
226 }
227 }
228
229 loop {
230 futures::select! {
231 maybe_req = stream.next() => {
232 if let Some(req) = maybe_req {
233 helper.handle_request(req?).await?;
234 } else {
235 break;
236 }
237 }
238 _ = fifo_task => break,
239 }
240 }
241
242 Ok(())
243 }
244}
245
246struct Session<I: Interface + ?Sized> {
247 interface: Arc<I>,
248 helper: Arc<SessionHelper<SessionManager<I>>>,
249}
250
251impl<I: Interface + ?Sized> Session<I> {
252 async fn run_fifo(
254 &self,
255 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
256 ) -> Result<(), zx::Status> {
257 scopeguard::defer! {
258 self.helper.drop_active_requests(|session| *session == self as *const _ as usize);
261 }
262
263 let mut fifo = fasync::Fifo::from_fifo(fifo);
273 let (mut reader, mut writer) = fifo.async_io();
274 let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
275 let active_requests = &self.helper.session_manager().active_requests;
276 let mut active_request_futures = FuturesUnordered::new();
277 let mut responses = Vec::new();
278
279 let mut map_future = pin!(Fuse::terminated());
284 let mut pending_mappings: VecDeque<DecodedRequest> = VecDeque::new();
285
286 loop {
287 let new_requests = {
288 let pending_requests = active_request_futures.len() + responses.len();
291 let count = requests.len().saturating_sub(pending_requests);
292 let mut receive_requests = pin!(if count == 0 {
293 Fuse::terminated()
294 } else {
295 reader.read_entries(&mut requests[..count]).fuse()
296 });
297 let mut send_responses = pin!(if responses.is_empty() {
298 Fuse::terminated()
299 } else {
300 poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
301 match ready!(writer.try_write(cx, &responses[..])) {
302 Ok(written) => {
303 responses.drain(..written);
304 Poll::Ready(Ok(()))
305 }
306 Err(status) => Poll::Ready(Err(status)),
307 }
308 })
309 .fuse()
310 });
311
312 select_biased!(
315 res = send_responses => {
316 res?;
317 0
318 },
319 response = active_request_futures.select_next_some() => {
320 responses.extend(response);
321 0
322 }
323 result = map_future => {
324 match result {
325 Ok((request, remainder, commit_decompression_buffers)) => {
326 active_request_futures.push(self.process_fifo_request(
327 request,
328 commit_decompression_buffers,
329 ));
330 if let Some(remainder) = remainder {
331 map_future.set(
332 self.map_request_or_get_response(remainder).fuse()
333 );
334 }
335 }
336 Err(response) => responses.extend(response),
337 }
338 if map_future.is_terminated() {
339 if let Some(request) = pending_mappings.pop_front() {
340 map_future.set(self.map_request_or_get_response(request).fuse());
341 }
342 }
343 0
344 }
345 count = receive_requests => {
346 count?
347 }
348 )
349 };
350
351 for request in &mut requests[..new_requests] {
354 match self.helper.decode_fifo_request(self as *const _ as usize, unsafe {
355 request.assume_init_mut()
356 }) {
357 Ok(DecodedRequest {
358 operation: Operation::CloseVmo, vmo, request_id, ..
359 }) => {
360 if let Some(vmo) = vmo {
361 self.interface.on_detach_vmo(vmo.as_ref());
362 }
363 responses.extend(
364 active_requests
365 .complete_and_take_response(request_id, zx::Status::OK)
366 .map(|(_, response)| response),
367 );
368 }
369 Ok(request) => {
370 if map_future.is_terminated() {
371 map_future.set(self.map_request_or_get_response(request).fuse());
372 } else {
373 pending_mappings.push_back(request);
374 }
375 }
376 Err(None) => {}
377 Err(Some(response)) => responses.push(response),
378 }
379 }
380 }
381 }
382
383 async fn map_request_or_get_response(
384 &self,
385 request: DecodedRequest,
386 ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), Option<BlockFifoResponse>> {
387 let request_id = request.request_id;
388 self.map_request(request).await.map_err(|status| {
389 self.helper
390 .orchestrator
391 .active_requests
392 .complete_and_take_response(request_id, status)
393 .map(|(_, r)| r)
394 })
395 }
396
397 async fn map_request(
400 &self,
401 mut request: DecodedRequest,
402 ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), zx::Status> {
403 let mut active_requests;
404 let active_request;
405 let mut commit_decompression_buffers = false;
406 let flags = self.interface.get_info().as_ref().device_flags();
407 if !flags.contains(DeviceFlag::BARRIER_SUPPORT)
410 && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
411 {
412 if let Some(id) = request.trace_flow_id {
413 fuchsia_trace::async_instant!(
414 fuchsia_trace::Id::from(id.get()),
415 "storage",
416 "block_server::SimulatedBarrier",
417 "request_id" => request.request_id.0
418 );
419 }
420 self.interface.flush(request.trace_flow_id).await?;
421 }
422
423 match request.operation {
425 Operation::StartDecompressedRead {
426 required_buffer_size,
427 device_block_offset,
428 block_count,
429 options,
430 } => {
431 let allocator = match self.helper.session_manager().buffer_allocator.get() {
432 Some(a) => a,
433 None => {
434 let source = BufferSource::new(fblock::MAX_DECOMPRESSED_BYTES as usize);
437 self.interface.on_attach_vmo(&source.vmo()).await?;
438 let allocator = BufferAllocator::new(
439 std::cmp::max(
440 self.helper.block_size as usize,
441 zx::system_get_page_size() as usize,
442 ),
443 source,
444 );
445 self.helper.session_manager().buffer_allocator.set(allocator).unwrap();
446 self.helper.session_manager().buffer_allocator.get().unwrap()
447 }
448 };
449
450 if required_buffer_size > fblock::MAX_DECOMPRESSED_BYTES as usize {
451 return Err(zx::Status::OUT_OF_RANGE);
452 }
453
454 let buffer = allocator.allocate_buffer(required_buffer_size).await;
455 let vmo_offset = buffer.range().start as u64;
456
457 unsafe fn remove_lifetime(buffer: Buffer<'_>) -> Buffer<'static> {
461 unsafe { std::mem::transmute(buffer) }
462 }
463
464 active_requests = self.helper.session_manager().active_requests.0.lock();
465 active_request = &mut active_requests.requests[request.request_id.0];
466
467 active_request.decompression_info.as_mut().unwrap().buffer =
470 Some(unsafe { remove_lifetime(buffer) });
471
472 request.operation = Operation::Read {
473 device_block_offset,
474 block_count,
475 _unused: 0,
476 vmo_offset,
477 options,
478 };
479 request.vmo = Some(allocator.buffer_source().vmo().clone());
480
481 commit_decompression_buffers = true;
482 }
483 Operation::ContinueDecompressedRead {
484 offset,
485 device_block_offset,
486 block_count,
487 options,
488 } => {
489 active_requests = self.helper.session_manager().active_requests.0.lock();
490 active_request = &mut active_requests.requests[request.request_id.0];
491
492 let buffer =
493 active_request.decompression_info.as_ref().unwrap().buffer.as_ref().unwrap();
494
495 if offset >= buffer.len() as u64
497 || buffer.len() as u64 - offset
498 < block_count as u64 * self.helper.block_size as u64
499 {
500 return Err(zx::Status::OUT_OF_RANGE);
501 }
502
503 request.operation = Operation::Read {
504 device_block_offset,
505 block_count,
506 _unused: 0,
507 vmo_offset: buffer.range().start as u64 + offset,
508 options,
509 };
510
511 let allocator = self.helper.session_manager().buffer_allocator.get().unwrap();
512 request.vmo = Some(allocator.buffer_source().vmo().clone());
513 }
514 _ => {
515 active_requests = self.helper.session_manager().active_requests.0.lock();
516 active_request = &mut active_requests.requests[request.request_id.0];
517 }
518 }
519
520 self.helper
524 .map_request(request, active_request)
525 .map(|(request, remainder)| (request, remainder, commit_decompression_buffers))
526 }
527
528 async fn process_fifo_request(
530 &self,
531 DecodedRequest { request_id, operation, vmo, trace_flow_id }: DecodedRequest,
532 commit_decompression_buffers: bool,
533 ) -> Option<BlockFifoResponse> {
534 let mut needs_postflush = false;
535 let result = match operation {
536 Operation::Read { device_block_offset, block_count, _unused, vmo_offset, options } => {
537 join(
538 self.interface.read(
539 device_block_offset,
540 block_count,
541 vmo.as_ref().unwrap(),
542 vmo_offset,
543 options,
544 trace_flow_id,
545 ),
546 async {
547 if commit_decompression_buffers {
548 let (target_slice, buffer_slice, buffer_range) = {
549 let active_request = self
550 .helper
551 .session_manager()
552 .active_requests
553 .request(request_id);
554 let info = active_request.decompression_info.as_ref().unwrap();
555 (
556 info.uncompressed_slice(),
557 self.helper
558 .orchestrator
559 .buffer_allocator
560 .get()
561 .unwrap()
562 .buffer_source()
563 .slice(),
564 info.buffer.as_ref().unwrap().range(),
565 )
566 };
567 let vmar = fuchsia_runtime::vmar_root_self();
568 let addr = target_slice.addr();
570 let unaligned = addr % zx::system_get_page_size() as usize;
571 if let Err(error) = vmar.op_range(
572 zx::VmarOp::COMMIT,
573 addr - unaligned,
574 target_slice.len() + unaligned,
575 ) {
576 log::warn!(error:?; "Unable to commit target range");
577 }
578 if let Err(error) = vmar.op_range(
580 zx::VmarOp::PREFETCH,
581 buffer_slice.addr() + buffer_range.start,
582 buffer_range.len(),
583 ) {
584 log::warn!(
585 error:?,
586 buffer_range:?;
587 "Unable to prefetch source range",
588 );
589 }
590 }
591 },
592 )
593 .await
594 .0
595 }
596 Operation::Write {
597 device_block_offset,
598 block_count,
599 _unused,
600 vmo_offset,
601 mut options,
602 } => {
603 if options.flags.contains(WriteFlags::FORCE_ACCESS) {
606 let flags = self.interface.get_info().as_ref().device_flags();
607 if !flags.contains(DeviceFlag::FUA_SUPPORT) {
608 options.flags.remove(WriteFlags::FORCE_ACCESS);
609 needs_postflush = true;
610 }
611 }
612 self.interface
613 .write(
614 device_block_offset,
615 block_count,
616 vmo.as_ref().unwrap(),
617 vmo_offset,
618 options,
619 trace_flow_id,
620 )
621 .await
622 }
623 Operation::Flush => self.interface.flush(trace_flow_id).await,
624 Operation::Trim { device_block_offset, block_count } => {
625 self.interface.trim(device_block_offset, block_count, trace_flow_id).await
626 }
627 Operation::CloseVmo
628 | Operation::StartDecompressedRead { .. }
629 | Operation::ContinueDecompressedRead { .. } => {
630 unreachable!()
632 }
633 };
634 let response = self
635 .helper
636 .orchestrator
637 .active_requests
638 .complete_and_take_response(request_id, result.into())
639 .map(|(_, r)| r);
640 if let Some(mut response) = response {
641 if zx::Status::from_raw(response.status) == zx::Status::OK && needs_postflush {
643 if let Some(id) = trace_flow_id {
644 fuchsia_trace::async_instant!(
645 fuchsia_trace::Id::from(id.get()),
646 "storage",
647 "block_server::SimulatedFUA",
648 "request_id" => request_id.0
649 );
650 }
651 response.status =
652 zx::Status::from(self.interface.flush(trace_flow_id).await).into_raw();
653 }
654 Some(response)
655 } else {
656 response
657 }
658 }
659}
660
661impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
662 type Orchestrator = Self;
663
664 const SUPPORTS_DECOMPRESSION: bool = true;
665
666 type Session = usize;
668
669 async fn on_attach_vmo(orchestrator: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
670 I::on_attach_vmo(&orchestrator.interface, vmo).await
671 }
672
673 async fn open_session(
674 orchestrator: Arc<Self>,
675 stream: fblock::SessionRequestStream,
676 offset_map: OffsetMap,
677 block_size: u32,
678 ) -> Result<(), Error> {
679 I::open_session(
680 &orchestrator.interface,
681 orchestrator.clone(),
682 stream,
683 offset_map,
684 block_size,
685 )
686 .await
687 }
688
689 fn get_info(&self) -> Cow<'_, DeviceInfo> {
690 self.interface.get_info()
691 }
692
693 async fn get_volume_info(
694 &self,
695 ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
696 self.interface.get_volume_info().await
697 }
698
699 async fn query_slices(
700 &self,
701 start_slices: &[u64],
702 ) -> Result<Vec<fblock::VsliceRange>, zx::Status> {
703 self.interface.query_slices(start_slices).await
704 }
705
706 async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
707 self.interface.extend(start_slice, slice_count).await
708 }
709
710 async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
711 self.interface.shrink(start_slice, slice_count).await
712 }
713
714 fn active_requests(&self) -> &ActiveRequests<Self::Session> {
715 return &self.active_requests;
716 }
717}
718
719impl<I: Interface> IntoOrchestrator for Arc<I> {
720 type SM = SessionManager<I>;
721
722 fn into_orchestrator(self) -> Arc<Self::SM> {
723 Arc::new(SessionManager {
724 interface: self,
725 active_requests: ActiveRequests::default(),
726 buffer_allocator: OnceLock::new(),
727 })
728 }
729}