1use super::{DecodedRequest, DeviceInfo, IntoSessionManager, OffsetMap, Operation, SessionHelper};
6use anyhow::Error;
7use block_protocol::{BlockFifoRequest, BlockFifoResponse, WriteOptions};
8use fuchsia_async::{self as fasync, FifoReadable as _};
9use futures::future::Fuse;
10use futures::stream::FuturesUnordered;
11use futures::{select_biased, FutureExt, StreamExt};
12use std::borrow::Cow;
13use std::future::{poll_fn, Future};
14use std::mem::MaybeUninit;
15use std::num::NonZero;
16use std::pin::pin;
17use std::sync::Arc;
18use std::task::{ready, Poll};
19use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
20
21pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
22
23pub trait Interface: Send + Sync + Unpin + 'static {
24 fn open_session(
39 &self,
40 session_manager: Arc<SessionManager<Self>>,
41 stream: fblock::SessionRequestStream,
42 offset_map: Option<OffsetMap>,
43 block_size: u32,
44 ) -> impl Future<Output = Result<(), Error>> + Send {
45 session_manager.serve_session(stream, offset_map, block_size)
47 }
48
49 fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
53 async { Ok(()) }
54 }
55
56 fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
58
59 fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
61
62 fn read(
64 &self,
65 device_block_offset: u64,
66 block_count: u32,
67 vmo: &Arc<zx::Vmo>,
68 vmo_offset: u64, trace_flow_id: Option<NonZero<u64>>,
70 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
71
72 fn write(
74 &self,
75 device_block_offset: u64,
76 block_count: u32,
77 vmo: &Arc<zx::Vmo>,
78 vmo_offset: u64, opts: WriteOptions,
80 trace_flow_id: Option<NonZero<u64>>,
81 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
82
83 fn flush(
85 &self,
86 trace_flow_id: Option<NonZero<u64>>,
87 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
88
89 fn trim(
91 &self,
92 device_block_offset: u64,
93 block_count: u32,
94 trace_flow_id: Option<NonZero<u64>>,
95 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
96
97 fn get_volume_info(
99 &self,
100 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
101 {
102 async { Err(zx::Status::NOT_SUPPORTED) }
103 }
104
105 fn query_slices(
107 &self,
108 _start_slices: &[u64],
109 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
110 async { Err(zx::Status::NOT_SUPPORTED) }
111 }
112
113 fn extend(
115 &self,
116 _start_slice: u64,
117 _slice_count: u64,
118 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
119 async { Err(zx::Status::NOT_SUPPORTED) }
120 }
121
122 fn shrink(
124 &self,
125 _start_slice: u64,
126 _slice_count: u64,
127 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
128 async { Err(zx::Status::NOT_SUPPORTED) }
129 }
130}
131
132pub struct PassthroughSession(fblock::SessionProxy);
134
135impl PassthroughSession {
136 pub fn new(proxy: fblock::SessionProxy) -> Self {
137 Self(proxy)
138 }
139
140 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
141 match request {
142 fblock::SessionRequest::GetFifo { responder } => {
143 responder.send(self.0.get_fifo().await?)?;
144 }
145 fblock::SessionRequest::AttachVmo { vmo, responder } => {
146 responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
147 }
148 fblock::SessionRequest::Close { responder } => {
149 responder.send(self.0.close().await?)?;
150 }
151 }
152 Ok(())
153 }
154
155 pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
157 while let Some(Ok(request)) = stream.next().await {
158 if let Err(error) = self.handle_request(request).await {
159 log::warn!(error:?; "FIDL error");
160 }
161 }
162 Ok(())
163 }
164}
165
166pub struct SessionManager<I: ?Sized> {
167 interface: Arc<I>,
168}
169
170impl<I: Interface + ?Sized> SessionManager<I> {
171 pub fn new(interface: Arc<I>) -> Self {
172 Self { interface }
173 }
174
175 pub async fn serve_session(
177 self: Arc<Self>,
178 stream: fblock::SessionRequestStream,
179 offset_map: Option<OffsetMap>,
180 block_size: u32,
181 ) -> Result<(), Error> {
182 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
183 let helper = Arc::new(helper);
184 let interface = self.interface.clone();
185
186 let mut stream = stream.fuse();
187
188 let scope = fasync::Scope::new();
189 let helper_clone = helper.clone();
190 let mut fifo_task = scope
191 .spawn(async move {
192 if let Err(status) = run_fifo(fifo, interface, helper).await {
193 if status != zx::Status::PEER_CLOSED {
194 log::error!(status:?; "FIFO error");
195 }
196 }
197 })
198 .fuse();
199
200 scopeguard::defer! {
202 for (_, vmo) in helper_clone.take_vmos() {
203 self.interface.on_detach_vmo(&vmo);
204 }
205 }
206
207 loop {
208 futures::select! {
209 maybe_req = stream.next() => {
210 if let Some(req) = maybe_req {
211 helper_clone.handle_request(req?).await?;
212 } else {
213 break;
214 }
215 }
216 _ = fifo_task => break,
217 }
218 }
219
220 Ok(())
221 }
222}
223
224async fn run_fifo<I: Interface + ?Sized>(
226 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
227 interface: Arc<I>,
228 helper: Arc<SessionHelper<SessionManager<I>>>,
229) -> Result<(), zx::Status> {
230 let fifo = fasync::Fifo::from_fifo(fifo);
239 let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
240 let mut active_requests = FuturesUnordered::new();
241 let mut responses = vec![];
242
243 loop {
244 let new_requests = {
245 let pending_requests = active_requests.len() + responses.len();
247 let count = requests.len() - pending_requests;
248 let mut receive_requests = pin!(if count == 0 {
249 Fuse::terminated()
250 } else {
251 fifo.read_entries(&mut requests[..count]).fuse()
252 });
253 let mut send_responses = pin!(if responses.is_empty() {
254 Fuse::terminated()
255 } else {
256 poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
257 match ready!(fifo.try_write(cx, &responses[..])) {
258 Ok(written) => {
259 responses.drain(..written);
260 Poll::Ready(Ok(()))
261 }
262 Err(status) => Poll::Ready(Err(status)),
263 }
264 })
265 .fuse()
266 });
267
268 select_biased!(
271 res = send_responses => {
272 res?;
273 0
274 },
275 response = active_requests.select_next_some() => {
276 responses.extend(response);
277 0
278 }
279 count = receive_requests => {
280 count?
281 }
282 )
283 };
284 for request in &requests[..new_requests] {
287 if let Some(decoded_request) =
288 helper.decode_fifo_request(unsafe { request.assume_init_ref() })
289 {
290 let interface = interface.clone();
291 let helper = helper.clone();
292 active_requests.push(async move {
293 let tracking = decoded_request.request_tracking;
294 let status = process_fifo_request(interface, decoded_request).await.into();
295 helper.finish_fifo_request(tracking, status)
296 });
297 }
298 }
299 }
300}
301
302impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
303 async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
304 self.interface.on_attach_vmo(vmo).await
305 }
306
307 async fn open_session(
308 self: Arc<Self>,
309 stream: fblock::SessionRequestStream,
310 offset_map: Option<OffsetMap>,
311 block_size: u32,
312 ) -> Result<(), Error> {
313 self.interface.clone().open_session(self, stream, offset_map, block_size).await
314 }
315
316 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
317 self.interface.get_info().await
318 }
319
320 async fn get_volume_info(
321 &self,
322 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
323 self.interface.get_volume_info().await
324 }
325
326 async fn query_slices(
327 &self,
328 start_slices: &[u64],
329 ) -> Result<Vec<fvolume::VsliceRange>, zx::Status> {
330 self.interface.query_slices(start_slices).await
331 }
332
333 async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
334 self.interface.extend(start_slice, slice_count).await
335 }
336
337 async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
338 self.interface.shrink(start_slice, slice_count).await
339 }
340}
341
342impl<I: Interface> IntoSessionManager for Arc<I> {
343 type SM = SessionManager<I>;
344
345 fn into_session_manager(self) -> Arc<Self::SM> {
346 Arc::new(SessionManager { interface: self })
347 }
348}
349
350async fn process_fifo_request<I: Interface + ?Sized>(
352 interface: Arc<I>,
353 r: DecodedRequest,
354) -> Result<(), zx::Status> {
355 let trace_flow_id = r.request_tracking.trace_flow_id;
356 match r.operation? {
357 Operation::Read { device_block_offset, block_count, _unused, vmo_offset } => {
358 interface
359 .read(
360 device_block_offset,
361 block_count,
362 &r.vmo.as_ref().unwrap(),
363 vmo_offset,
364 trace_flow_id,
365 )
366 .await
367 }
368 Operation::Write { device_block_offset, block_count, options, vmo_offset } => {
369 interface
370 .write(
371 device_block_offset,
372 block_count,
373 &r.vmo.as_ref().unwrap(),
374 vmo_offset,
375 options,
376 trace_flow_id,
377 )
378 .await
379 }
380 Operation::Flush => interface.flush(trace_flow_id).await,
381 Operation::Trim { device_block_offset, block_count } => {
382 interface.trim(device_block_offset, block_count, trace_flow_id).await
383 }
384 Operation::CloseVmo => {
385 if let Some(vmo) = &r.vmo {
386 interface.on_detach_vmo(vmo);
387 }
388 Ok(())
389 }
390 }
391}