1use super::{
6 ActiveRequests, DecodedRequest, DeviceInfo, FIFO_MAX_REQUESTS, IntoSessionManager, OffsetMap,
7 Operation, SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags, WriteOptions};
11use fidl_fuchsia_hardware_block::Flag;
12use futures::future::{Fuse, FusedFuture, join};
13use futures::stream::FuturesUnordered;
14use futures::{FutureExt, StreamExt, select_biased};
15use std::borrow::Cow;
16use std::collections::VecDeque;
17use std::future::{Future, poll_fn};
18use std::mem::MaybeUninit;
19use std::pin::pin;
20use std::sync::{Arc, OnceLock};
21use std::task::{Poll, ready};
22use storage_device::buffer::Buffer;
23use storage_device::buffer_allocator::{BufferAllocator, BufferSource};
24use {
25 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
26 fuchsia_async as fasync,
27};
28
29pub trait Interface: Send + Sync + Unpin + 'static {
30 fn open_session(
45 &self,
46 session_manager: Arc<SessionManager<Self>>,
47 stream: fblock::SessionRequestStream,
48 offset_map: OffsetMap,
49 block_size: u32,
50 ) -> impl Future<Output = Result<(), Error>> + Send {
51 session_manager.serve_session(stream, offset_map, block_size)
53 }
54
55 fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
59 async { Ok(()) }
60 }
61
62 fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
64
65 fn get_info(&self) -> Cow<'_, DeviceInfo>;
67
68 fn read(
70 &self,
71 device_block_offset: u64,
72 block_count: u32,
73 vmo: &Arc<zx::Vmo>,
74 vmo_offset: u64, opts: ReadOptions,
76 trace_flow_id: TraceFlowId,
77 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
78
79 fn write(
81 &self,
82 device_block_offset: u64,
83 block_count: u32,
84 vmo: &Arc<zx::Vmo>,
85 vmo_offset: u64, opts: WriteOptions,
87 trace_flow_id: TraceFlowId,
88 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
89
90 fn flush(
92 &self,
93 trace_flow_id: TraceFlowId,
94 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
95
96 fn trim(
98 &self,
99 device_block_offset: u64,
100 block_count: u32,
101 trace_flow_id: TraceFlowId,
102 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
103
104 fn get_volume_info(
106 &self,
107 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
108 {
109 async { Err(zx::Status::NOT_SUPPORTED) }
110 }
111
112 fn query_slices(
114 &self,
115 _start_slices: &[u64],
116 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
117 async { Err(zx::Status::NOT_SUPPORTED) }
118 }
119
120 fn extend(
122 &self,
123 _start_slice: u64,
124 _slice_count: u64,
125 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
126 async { Err(zx::Status::NOT_SUPPORTED) }
127 }
128
129 fn shrink(
131 &self,
132 _start_slice: u64,
133 _slice_count: u64,
134 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
135 async { Err(zx::Status::NOT_SUPPORTED) }
136 }
137}
138
139pub struct PassthroughSession(fblock::SessionProxy);
141
142impl PassthroughSession {
143 pub fn new(proxy: fblock::SessionProxy) -> Self {
144 Self(proxy)
145 }
146
147 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
148 match request {
149 fblock::SessionRequest::GetFifo { responder } => {
150 responder.send(self.0.get_fifo().await?)?;
151 }
152 fblock::SessionRequest::AttachVmo { vmo, responder } => {
153 responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
154 }
155 fblock::SessionRequest::Close { responder } => {
156 responder.send(self.0.close().await?)?;
157 }
158 }
159 Ok(())
160 }
161
162 pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
164 while let Some(Ok(request)) = stream.next().await {
165 if let Err(error) = self.handle_request(request).await {
166 log::warn!(error:?; "FIDL error");
167 }
168 }
169 Ok(())
170 }
171}
172
173pub struct SessionManager<I: Interface + ?Sized> {
174 interface: Arc<I>,
175 active_requests: ActiveRequests<usize>,
176
177 buffer_allocator: OnceLock<BufferAllocator>,
180}
181
182impl<I: Interface + ?Sized> Drop for SessionManager<I> {
183 fn drop(&mut self) {
184 if let Some(allocator) = self.buffer_allocator.get() {
185 self.interface.on_detach_vmo(allocator.buffer_source().vmo());
186 }
187 }
188}
189
190impl<I: Interface + ?Sized> SessionManager<I> {
191 pub fn new(interface: Arc<I>) -> Self {
192 Self {
193 interface,
194 active_requests: ActiveRequests::default(),
195 buffer_allocator: OnceLock::new(),
196 }
197 }
198
199 pub fn interface(&self) -> &I {
200 self.interface.as_ref()
201 }
202
203 pub async fn serve_session(
205 self: Arc<Self>,
206 stream: fblock::SessionRequestStream,
207 offset_map: OffsetMap,
208 block_size: u32,
209 ) -> Result<(), Error> {
210 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
211 let session = Session { helper: Arc::new(helper), interface: self.interface.clone() };
212 let mut stream = stream.fuse();
213 let scope = fasync::Scope::new();
214 let helper = session.helper.clone();
215 let mut fifo_task = scope
216 .spawn(async move {
217 if let Err(status) = session.run_fifo(fifo).await {
218 if status != zx::Status::PEER_CLOSED {
219 log::error!(status:?; "FIFO error");
220 }
221 }
222 })
223 .fuse();
224
225 scopeguard::defer! {
227 for (_, (vmo, _)) in helper.take_vmos() {
228 self.interface.on_detach_vmo(&vmo);
229 }
230 }
231
232 loop {
233 futures::select! {
234 maybe_req = stream.next() => {
235 if let Some(req) = maybe_req {
236 helper.handle_request(req?).await?;
237 } else {
238 break;
239 }
240 }
241 _ = fifo_task => break,
242 }
243 }
244
245 Ok(())
246 }
247}
248
249struct Session<I: Interface + ?Sized> {
250 interface: Arc<I>,
251 helper: Arc<SessionHelper<SessionManager<I>>>,
252}
253
254impl<I: Interface + ?Sized> Session<I> {
255 async fn run_fifo(
257 &self,
258 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
259 ) -> Result<(), zx::Status> {
260 scopeguard::defer! {
261 self.helper.drop_active_requests(|session| *session == self as *const _ as usize);
262 }
263
264 let mut fifo = fasync::Fifo::from_fifo(fifo);
274 let (mut reader, mut writer) = fifo.async_io();
275 let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
276 let active_requests = &self.helper.session_manager.active_requests;
277 let mut active_request_futures = FuturesUnordered::new();
278 let mut responses = Vec::new();
279
280 let mut map_future = pin!(Fuse::terminated());
285 let mut pending_mappings: VecDeque<DecodedRequest> = VecDeque::new();
286
287 loop {
288 let new_requests = {
289 let pending_requests = active_request_futures.len() + responses.len();
292 let count = requests.len().saturating_sub(pending_requests);
293 let mut receive_requests = pin!(if count == 0 {
294 Fuse::terminated()
295 } else {
296 reader.read_entries(&mut requests[..count]).fuse()
297 });
298 let mut send_responses = pin!(if responses.is_empty() {
299 Fuse::terminated()
300 } else {
301 poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
302 match ready!(writer.try_write(cx, &responses[..])) {
303 Ok(written) => {
304 responses.drain(..written);
305 Poll::Ready(Ok(()))
306 }
307 Err(status) => Poll::Ready(Err(status)),
308 }
309 })
310 .fuse()
311 });
312
313 select_biased!(
316 res = send_responses => {
317 res?;
318 0
319 },
320 response = active_request_futures.select_next_some() => {
321 responses.extend(response);
322 0
323 }
324 result = map_future => {
325 match result {
326 Ok((request, remainder, commit_decompression_buffers)) => {
327 active_request_futures.push(self.process_fifo_request(
328 request,
329 commit_decompression_buffers,
330 ));
331 if let Some(remainder) = remainder {
332 map_future.set(
333 self.map_request_or_get_response(remainder).fuse()
334 );
335 }
336 }
337 Err(response) => responses.extend(response),
338 }
339 if map_future.is_terminated() {
340 if let Some(request) = pending_mappings.pop_front() {
341 map_future.set(self.map_request_or_get_response(request).fuse());
342 }
343 }
344 0
345 }
346 count = receive_requests => {
347 count?
348 }
349 )
350 };
351
352 for request in &mut requests[..new_requests] {
355 match self.helper.decode_fifo_request(self as *const _ as usize, unsafe {
356 request.assume_init_mut()
357 }) {
358 Ok(DecodedRequest {
359 operation: Operation::CloseVmo, vmo, request_id, ..
360 }) => {
361 if let Some(vmo) = vmo {
362 self.interface.on_detach_vmo(vmo.as_ref());
363 }
364 responses.extend(
365 active_requests
366 .complete_and_take_response(request_id, zx::Status::OK)
367 .map(|(_, response)| response),
368 );
369 }
370 Ok(request) => {
371 if map_future.is_terminated() {
372 map_future.set(self.map_request_or_get_response(request).fuse());
373 } else {
374 pending_mappings.push_back(request);
375 }
376 }
377 Err(None) => {}
378 Err(Some(response)) => responses.push(response),
379 }
380 }
381 }
382 }
383
384 async fn map_request_or_get_response(
385 &self,
386 request: DecodedRequest,
387 ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), Option<BlockFifoResponse>> {
388 let request_id = request.request_id;
389 self.map_request(request).await.map_err(|status| {
390 self.helper
391 .session_manager
392 .active_requests
393 .complete_and_take_response(request_id, status)
394 .map(|(_, r)| r)
395 })
396 }
397
398 async fn map_request(
401 &self,
402 mut request: DecodedRequest,
403 ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), zx::Status> {
404 let mut active_requests;
405 let active_request;
406 let mut commit_decompression_buffers = false;
407 let flags = self.interface.get_info().as_ref().device_flags();
408 if !flags.contains(Flag::BARRIER_SUPPORT)
411 && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
412 {
413 if let Some(id) = request.trace_flow_id {
414 fuchsia_trace::async_instant!(
415 fuchsia_trace::Id::from(id.get()),
416 c"storage",
417 c"block_server::SimulatedBarrier",
418 "request_id" => request.request_id.0
419 );
420 }
421 self.interface.flush(request.trace_flow_id).await?;
422 }
423
424 match request.operation {
426 Operation::StartDecompressedRead {
427 required_buffer_size,
428 device_block_offset,
429 block_count,
430 options,
431 } => {
432 let allocator = match self.helper.session_manager.buffer_allocator.get() {
433 Some(a) => a,
434 None => {
435 let source = BufferSource::new(fblock::MAX_DECOMPRESSED_BYTES as usize);
438 self.interface.on_attach_vmo(&source.vmo()).await?;
439 let allocator = BufferAllocator::new(
440 std::cmp::max(
441 self.helper.block_size as usize,
442 zx::system_get_page_size() as usize,
443 ),
444 source,
445 );
446 self.helper.session_manager.buffer_allocator.set(allocator).unwrap();
447 self.helper.session_manager.buffer_allocator.get().unwrap()
448 }
449 };
450
451 if required_buffer_size > fblock::MAX_DECOMPRESSED_BYTES as usize {
452 return Err(zx::Status::OUT_OF_RANGE);
453 }
454
455 let buffer = allocator.allocate_buffer(required_buffer_size).await;
456 let vmo_offset = buffer.range().start as u64;
457
458 unsafe fn remove_lifetime(buffer: Buffer<'_>) -> Buffer<'static> {
462 unsafe { std::mem::transmute(buffer) }
463 }
464
465 active_requests = self.helper.session_manager.active_requests.0.lock();
466 active_request = &mut active_requests.requests[request.request_id.0];
467
468 active_request.decompression_info.as_mut().unwrap().buffer =
471 Some(unsafe { remove_lifetime(buffer) });
472
473 request.operation = Operation::Read {
474 device_block_offset,
475 block_count,
476 _unused: 0,
477 vmo_offset,
478 options,
479 };
480 request.vmo = Some(allocator.buffer_source().vmo().clone());
481
482 commit_decompression_buffers = true;
483 }
484 Operation::ContinueDecompressedRead {
485 offset,
486 device_block_offset,
487 block_count,
488 options,
489 } => {
490 active_requests = self.helper.session_manager.active_requests.0.lock();
491 active_request = &mut active_requests.requests[request.request_id.0];
492
493 let buffer =
494 active_request.decompression_info.as_ref().unwrap().buffer.as_ref().unwrap();
495
496 if offset >= buffer.len() as u64
498 || buffer.len() as u64 - offset
499 < block_count as u64 * self.helper.block_size as u64
500 {
501 return Err(zx::Status::OUT_OF_RANGE);
502 }
503
504 request.operation = Operation::Read {
505 device_block_offset,
506 block_count,
507 _unused: 0,
508 vmo_offset: buffer.range().start as u64 + offset,
509 options,
510 };
511
512 let allocator = self.helper.session_manager.buffer_allocator.get().unwrap();
513 request.vmo = Some(allocator.buffer_source().vmo().clone());
514 }
515 _ => {
516 active_requests = self.helper.session_manager.active_requests.0.lock();
517 active_request = &mut active_requests.requests[request.request_id.0];
518 }
519 }
520
521 self.helper
525 .map_request(request, active_request)
526 .map(|(request, remainder)| (request, remainder, commit_decompression_buffers))
527 }
528
529 async fn process_fifo_request(
531 &self,
532 DecodedRequest { request_id, operation, vmo, trace_flow_id }: DecodedRequest,
533 commit_decompression_buffers: bool,
534 ) -> Option<BlockFifoResponse> {
535 let mut needs_postflush = false;
536 let result = match operation {
537 Operation::Read { device_block_offset, block_count, _unused, vmo_offset, options } => {
538 join(
539 self.interface.read(
540 device_block_offset,
541 block_count,
542 vmo.as_ref().unwrap(),
543 vmo_offset,
544 options,
545 trace_flow_id,
546 ),
547 async {
548 if commit_decompression_buffers {
549 let (target_slice, buffer_slice, buffer_range) = {
550 let active_request =
551 self.helper.session_manager.active_requests.request(request_id);
552 let info = active_request.decompression_info.as_ref().unwrap();
553 (
554 info.uncompressed_slice(),
555 self.helper
556 .session_manager
557 .buffer_allocator
558 .get()
559 .unwrap()
560 .buffer_source()
561 .slice(),
562 info.buffer.as_ref().unwrap().range(),
563 )
564 };
565 let vmar = fuchsia_runtime::vmar_root_self();
566 let addr = target_slice.addr();
568 let unaligned = addr % zx::system_get_page_size() as usize;
569 if let Err(error) = vmar.op_range(
570 zx::VmarOp::COMMIT,
571 addr - unaligned,
572 target_slice.len() + unaligned,
573 ) {
574 log::warn!(error:?; "Unable to commit target range");
575 }
576 if let Err(error) = vmar.op_range(
578 zx::VmarOp::PREFETCH,
579 buffer_slice.addr() + buffer_range.start,
580 buffer_range.len(),
581 ) {
582 log::warn!(
583 error:?,
584 buffer_range:?;
585 "Unable to prefetch source range",
586 );
587 }
588 }
589 },
590 )
591 .await
592 .0
593 }
594 Operation::Write {
595 device_block_offset,
596 block_count,
597 _unused,
598 vmo_offset,
599 mut options,
600 } => {
601 if options.flags.contains(WriteFlags::FORCE_ACCESS) {
604 let flags = self.interface.get_info().as_ref().device_flags();
605 if !flags.contains(Flag::FUA_SUPPORT) {
606 options.flags.remove(WriteFlags::FORCE_ACCESS);
607 needs_postflush = true;
608 }
609 }
610 self.interface
611 .write(
612 device_block_offset,
613 block_count,
614 vmo.as_ref().unwrap(),
615 vmo_offset,
616 options,
617 trace_flow_id,
618 )
619 .await
620 }
621 Operation::Flush => self.interface.flush(trace_flow_id).await,
622 Operation::Trim { device_block_offset, block_count } => {
623 self.interface.trim(device_block_offset, block_count, trace_flow_id).await
624 }
625 Operation::CloseVmo
626 | Operation::StartDecompressedRead { .. }
627 | Operation::ContinueDecompressedRead { .. } => {
628 unreachable!()
630 }
631 };
632 let response = self
633 .helper
634 .session_manager
635 .active_requests
636 .complete_and_take_response(request_id, result.into())
637 .map(|(_, r)| r);
638 if let Some(mut response) = response {
639 if zx::Status::from_raw(response.status) == zx::Status::OK && needs_postflush {
641 if let Some(id) = trace_flow_id {
642 fuchsia_trace::async_instant!(
643 fuchsia_trace::Id::from(id.get()),
644 c"storage",
645 c"block_server::SimulatedFUA",
646 "request_id" => request_id.0
647 );
648 }
649 response.status =
650 zx::Status::from(self.interface.flush(trace_flow_id).await).into_raw();
651 }
652 Some(response)
653 } else {
654 response
655 }
656 }
657}
658
659impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
660 const SUPPORTS_DECOMPRESSION: bool = true;
661
662 type Session = usize;
664
665 async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
666 self.interface.on_attach_vmo(vmo).await
667 }
668
669 async fn open_session(
670 self: Arc<Self>,
671 stream: fblock::SessionRequestStream,
672 offset_map: OffsetMap,
673 block_size: u32,
674 ) -> Result<(), Error> {
675 self.interface.clone().open_session(self, stream, offset_map, block_size).await
676 }
677
678 fn get_info(&self) -> Cow<'_, DeviceInfo> {
679 self.interface.get_info()
680 }
681
682 async fn get_volume_info(
683 &self,
684 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
685 self.interface.get_volume_info().await
686 }
687
688 async fn query_slices(
689 &self,
690 start_slices: &[u64],
691 ) -> Result<Vec<fvolume::VsliceRange>, zx::Status> {
692 self.interface.query_slices(start_slices).await
693 }
694
695 async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
696 self.interface.extend(start_slice, slice_count).await
697 }
698
699 async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
700 self.interface.shrink(start_slice, slice_count).await
701 }
702
703 fn active_requests(&self) -> &ActiveRequests<Self::Session> {
704 return &self.active_requests;
705 }
706}
707
708impl<I: Interface> IntoSessionManager for Arc<I> {
709 type SM = SessionManager<I>;
710
711 fn into_session_manager(self) -> Arc<Self::SM> {
712 Arc::new(SessionManager {
713 interface: self,
714 active_requests: ActiveRequests::default(),
715 buffer_allocator: OnceLock::new(),
716 })
717 }
718}