1use super::{
6 ActiveRequests, DecodedRequest, DeviceInfo, FIFO_MAX_REQUESTS, IntoSessionManager, OffsetMap,
7 Operation, SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags, WriteOptions};
11use futures::future::{Fuse, FusedFuture};
12use futures::stream::FuturesUnordered;
13use futures::{FutureExt, StreamExt, select_biased};
14use std::borrow::Cow;
15use std::collections::VecDeque;
16use std::future::{Future, poll_fn};
17use std::mem::MaybeUninit;
18use std::pin::pin;
19use std::sync::Arc;
20use std::task::{Poll, ready};
21use {
22 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
23 fuchsia_async as fasync,
24};
25
26pub trait Interface: Send + Sync + Unpin + 'static {
27 fn open_session(
42 &self,
43 session_manager: Arc<SessionManager<Self>>,
44 stream: fblock::SessionRequestStream,
45 offset_map: OffsetMap,
46 block_size: u32,
47 ) -> impl Future<Output = Result<(), Error>> + Send {
48 session_manager.serve_session(stream, offset_map, block_size)
50 }
51
52 fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
56 async { Ok(()) }
57 }
58
59 fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
61
62 fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
64
65 fn read(
67 &self,
68 device_block_offset: u64,
69 block_count: u32,
70 vmo: &Arc<zx::Vmo>,
71 vmo_offset: u64, opts: ReadOptions,
73 trace_flow_id: TraceFlowId,
74 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
75
76 fn write(
79 &self,
80 device_block_offset: u64,
81 block_count: u32,
82 vmo: &Arc<zx::Vmo>,
83 vmo_offset: u64, opts: WriteOptions,
85 trace_flow_id: TraceFlowId,
86 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
87
88 fn barrier(&self) -> Result<(), zx::Status>;
95
96 fn flush(
98 &self,
99 trace_flow_id: TraceFlowId,
100 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
101
102 fn trim(
104 &self,
105 device_block_offset: u64,
106 block_count: u32,
107 trace_flow_id: TraceFlowId,
108 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
109
110 fn get_volume_info(
112 &self,
113 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
114 {
115 async { Err(zx::Status::NOT_SUPPORTED) }
116 }
117
118 fn query_slices(
120 &self,
121 _start_slices: &[u64],
122 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
123 async { Err(zx::Status::NOT_SUPPORTED) }
124 }
125
126 fn extend(
128 &self,
129 _start_slice: u64,
130 _slice_count: u64,
131 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
132 async { Err(zx::Status::NOT_SUPPORTED) }
133 }
134
135 fn shrink(
137 &self,
138 _start_slice: u64,
139 _slice_count: u64,
140 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
141 async { Err(zx::Status::NOT_SUPPORTED) }
142 }
143}
144
145pub struct PassthroughSession(fblock::SessionProxy);
147
148impl PassthroughSession {
149 pub fn new(proxy: fblock::SessionProxy) -> Self {
150 Self(proxy)
151 }
152
153 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
154 match request {
155 fblock::SessionRequest::GetFifo { responder } => {
156 responder.send(self.0.get_fifo().await?)?;
157 }
158 fblock::SessionRequest::AttachVmo { vmo, responder } => {
159 responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
160 }
161 fblock::SessionRequest::Close { responder } => {
162 responder.send(self.0.close().await?)?;
163 }
164 }
165 Ok(())
166 }
167
168 pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
170 while let Some(Ok(request)) = stream.next().await {
171 if let Err(error) = self.handle_request(request).await {
172 log::warn!(error:?; "FIDL error");
173 }
174 }
175 Ok(())
176 }
177}
178
179pub struct SessionManager<I: Interface + ?Sized> {
180 interface: Arc<I>,
181 active_requests: ActiveRequests<usize>,
182}
183
184impl<I: Interface + ?Sized> SessionManager<I> {
185 pub fn new(interface: Arc<I>) -> Self {
186 Self { interface, active_requests: ActiveRequests::default() }
187 }
188
189 pub fn interface(&self) -> &I {
190 self.interface.as_ref()
191 }
192
193 pub async fn serve_session(
195 self: Arc<Self>,
196 stream: fblock::SessionRequestStream,
197 offset_map: OffsetMap,
198 block_size: u32,
199 ) -> Result<(), Error> {
200 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
201 let session = Session { helper: Arc::new(helper), interface: self.interface.clone() };
202 let mut stream = stream.fuse();
203 let scope = fasync::Scope::new();
204 let helper = session.helper.clone();
205 let mut fifo_task = scope
206 .spawn(async move {
207 if let Err(status) = session.run_fifo(fifo).await {
208 if status != zx::Status::PEER_CLOSED {
209 log::error!(status:?; "FIFO error");
210 }
211 }
212 })
213 .fuse();
214
215 scopeguard::defer! {
217 for (_, vmo) in helper.take_vmos() {
218 self.interface.on_detach_vmo(&vmo);
219 }
220 }
221
222 loop {
223 futures::select! {
224 maybe_req = stream.next() => {
225 if let Some(req) = maybe_req {
226 helper.handle_request(req?).await?;
227 } else {
228 break;
229 }
230 }
231 _ = fifo_task => break,
232 }
233 }
234
235 Ok(())
236 }
237}
238
239struct Session<I: Interface + ?Sized> {
240 interface: Arc<I>,
241 helper: Arc<SessionHelper<SessionManager<I>>>,
242}
243
244impl<I: Interface + ?Sized> Session<I> {
245 async fn run_fifo(
247 &self,
248 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
249 ) -> Result<(), zx::Status> {
250 scopeguard::defer! {
251 self.helper.drop_active_requests(|session| *session == self as *const _ as usize);
252 }
253
254 let mut fifo = fasync::Fifo::from_fifo(fifo);
264 let (mut reader, mut writer) = fifo.async_io();
265 let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
266 let active_requests = &self.helper.session_manager.active_requests;
267 let mut active_request_futures = FuturesUnordered::new();
268 let mut responses = Vec::new();
269
270 let mut map_future = pin!(Fuse::terminated());
275 let mut pending_mappings: VecDeque<DecodedRequest> = VecDeque::new();
276
277 loop {
278 let new_requests = {
279 let pending_requests = active_request_futures.len() + responses.len();
282 let count = requests.len().saturating_sub(pending_requests);
283 let mut receive_requests = pin!(if count == 0 {
284 Fuse::terminated()
285 } else {
286 reader.read_entries(&mut requests[..count]).fuse()
287 });
288 let mut send_responses = pin!(if responses.is_empty() {
289 Fuse::terminated()
290 } else {
291 poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
292 match ready!(writer.try_write(cx, &responses[..])) {
293 Ok(written) => {
294 responses.drain(..written);
295 Poll::Ready(Ok(()))
296 }
297 Err(status) => Poll::Ready(Err(status)),
298 }
299 })
300 .fuse()
301 });
302
303 select_biased!(
306 res = send_responses => {
307 res?;
308 0
309 },
310 response = active_request_futures.select_next_some() => {
311 responses.extend(response);
312 0
313 }
314 result = map_future => {
315 match result {
316 Ok((request, remainder)) => {
317 active_request_futures.push(self.process_fifo_request(request));
318 if let Some(remainder) = remainder {
319 map_future.set(self.map_request(remainder).fuse());
320 }
321 }
322 Err(response) => responses.extend(response),
323 }
324 if map_future.is_terminated() {
325 if let Some(request) = pending_mappings.pop_front() {
326 map_future.set(self.map_request(request).fuse());
327 }
328 }
329 0
330 }
331 count = receive_requests => {
332 count?
333 }
334 )
335 };
336
337 for request in &mut requests[..new_requests] {
340 match self.helper.decode_fifo_request(self as *const _ as usize, unsafe {
341 request.assume_init_mut()
342 }) {
343 Ok(DecodedRequest {
344 operation: Operation::CloseVmo, vmo, request_id, ..
345 }) => {
346 if let Some(vmo) = vmo {
347 self.interface.on_detach_vmo(vmo.as_ref());
348 }
349 responses.extend(
350 active_requests
351 .complete_and_take_response(request_id, zx::Status::OK)
352 .map(|(_, response)| response),
353 );
354 }
355 Ok(mut request) => {
356 if let Err(status) = self.maybe_issue_barrier(&mut request) {
360 let response = self
361 .helper
362 .session_manager
363 .active_requests
364 .complete_and_take_response(request.request_id, status)
365 .map(|(_, r)| r);
366 responses.extend(response);
367 } else if map_future.is_terminated() {
368 map_future.set(self.map_request(request).fuse());
369 } else {
370 pending_mappings.push_back(request);
371 }
372 }
373 Err(None) => {}
374 Err(Some(response)) => responses.push(response),
375 }
376 }
377 }
378 }
379
380 fn maybe_issue_barrier(&self, request: &mut DecodedRequest) -> Result<(), zx::Status> {
381 if let Operation::Write {
382 device_block_offset: _,
383 block_count: _,
384 _unused,
385 options,
386 vmo_offset: _,
387 ..
388 } = &mut request.operation
389 {
390 if options.flags.contains(WriteFlags::PRE_BARRIER) {
391 self.interface.barrier()?;
392 options.flags &= !WriteFlags::PRE_BARRIER;
393 }
394 }
395 Ok(())
396 }
397
398 async fn map_request(
400 &self,
401 request: DecodedRequest,
402 ) -> Result<(DecodedRequest, Option<DecodedRequest>), Option<BlockFifoResponse>> {
403 self.helper.map_request(request)
404 }
405
406 async fn process_fifo_request(
408 &self,
409 DecodedRequest { request_id, operation, vmo, trace_flow_id }: DecodedRequest,
410 ) -> Option<BlockFifoResponse> {
411 let result = match operation {
412 Operation::Read { device_block_offset, block_count, _unused, vmo_offset, options } => {
413 self.interface
414 .read(
415 device_block_offset,
416 block_count,
417 vmo.as_ref().unwrap(),
418 vmo_offset,
419 options,
420 trace_flow_id,
421 )
422 .await
423 }
424 Operation::Write { device_block_offset, block_count, _unused, vmo_offset, options } => {
425 self.interface
426 .write(
427 device_block_offset,
428 block_count,
429 vmo.as_ref().unwrap(),
430 vmo_offset,
431 options,
432 trace_flow_id,
433 )
434 .await
435 }
436 Operation::Flush => self.interface.flush(trace_flow_id).await,
437 Operation::Trim { device_block_offset, block_count } => {
438 self.interface.trim(device_block_offset, block_count, trace_flow_id).await
439 }
440 Operation::CloseVmo => {
441 unreachable!()
443 }
444 };
445 self.helper
446 .session_manager
447 .active_requests
448 .complete_and_take_response(request_id, result.into())
449 .map(|(_, r)| r)
450 }
451}
452
453impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
454 type Session = usize;
456
457 async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
458 self.interface.on_attach_vmo(vmo).await
459 }
460
461 async fn open_session(
462 self: Arc<Self>,
463 stream: fblock::SessionRequestStream,
464 offset_map: OffsetMap,
465 block_size: u32,
466 ) -> Result<(), Error> {
467 self.interface.clone().open_session(self, stream, offset_map, block_size).await
468 }
469
470 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
471 self.interface.get_info().await
472 }
473
474 async fn get_volume_info(
475 &self,
476 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
477 self.interface.get_volume_info().await
478 }
479
480 async fn query_slices(
481 &self,
482 start_slices: &[u64],
483 ) -> Result<Vec<fvolume::VsliceRange>, zx::Status> {
484 self.interface.query_slices(start_slices).await
485 }
486
487 async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
488 self.interface.extend(start_slice, slice_count).await
489 }
490
491 async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
492 self.interface.shrink(start_slice, slice_count).await
493 }
494
495 fn active_requests(&self) -> &ActiveRequests<Self::Session> {
496 return &self.active_requests;
497 }
498}
499
500impl<I: Interface> IntoSessionManager for Arc<I> {
501 type SM = SessionManager<I>;
502
503 fn into_session_manager(self) -> Arc<Self::SM> {
504 Arc::new(SessionManager { interface: self, active_requests: ActiveRequests::default() })
505 }
506}