_block_server_c_rustc_static/
async_interface.rs
1use super::{DecodedRequest, DeviceInfo, IntoSessionManager, OffsetMap, Operation, SessionHelper};
6use anyhow::Error;
7use block_protocol::{BlockFifoRequest, BlockFifoResponse, WriteOptions};
8use futures::future::Fuse;
9use futures::stream::FuturesUnordered;
10use futures::{select_biased, FutureExt, StreamExt};
11use std::borrow::Cow;
12use std::future::{poll_fn, Future};
13use std::mem::MaybeUninit;
14use std::num::NonZero;
15use std::pin::pin;
16use std::sync::Arc;
17use std::task::{ready, Poll};
18use {
19 fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
20 fuchsia_async as fasync,
21};
22
23pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
24
25pub trait Interface: Send + Sync + Unpin + 'static {
26 fn open_session(
41 &self,
42 session_manager: Arc<SessionManager<Self>>,
43 stream: fblock::SessionRequestStream,
44 offset_map: Option<OffsetMap>,
45 block_size: u32,
46 ) -> impl Future<Output = Result<(), Error>> + Send {
47 session_manager.serve_session(stream, offset_map, block_size)
49 }
50
51 fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
55 async { Ok(()) }
56 }
57
58 fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
60
61 fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
63
64 fn read(
66 &self,
67 device_block_offset: u64,
68 block_count: u32,
69 vmo: &Arc<zx::Vmo>,
70 vmo_offset: u64, trace_flow_id: Option<NonZero<u64>>,
72 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
73
74 fn write(
76 &self,
77 device_block_offset: u64,
78 block_count: u32,
79 vmo: &Arc<zx::Vmo>,
80 vmo_offset: u64, opts: WriteOptions,
82 trace_flow_id: Option<NonZero<u64>>,
83 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
84
85 fn flush(
87 &self,
88 trace_flow_id: Option<NonZero<u64>>,
89 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
90
91 fn trim(
93 &self,
94 device_block_offset: u64,
95 block_count: u32,
96 trace_flow_id: Option<NonZero<u64>>,
97 ) -> impl Future<Output = Result<(), zx::Status>> + Send;
98
99 fn get_volume_info(
101 &self,
102 ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
103 {
104 async { Err(zx::Status::NOT_SUPPORTED) }
105 }
106
107 fn query_slices(
109 &self,
110 _start_slices: &[u64],
111 ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
112 async { Err(zx::Status::NOT_SUPPORTED) }
113 }
114
115 fn extend(
117 &self,
118 _start_slice: u64,
119 _slice_count: u64,
120 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
121 async { Err(zx::Status::NOT_SUPPORTED) }
122 }
123
124 fn shrink(
126 &self,
127 _start_slice: u64,
128 _slice_count: u64,
129 ) -> impl Future<Output = Result<(), zx::Status>> + Send {
130 async { Err(zx::Status::NOT_SUPPORTED) }
131 }
132}
133
134pub struct PassthroughSession(fblock::SessionProxy);
136
137impl PassthroughSession {
138 pub fn new(proxy: fblock::SessionProxy) -> Self {
139 Self(proxy)
140 }
141
142 async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
143 match request {
144 fblock::SessionRequest::GetFifo { responder } => {
145 responder.send(self.0.get_fifo().await?)?;
146 }
147 fblock::SessionRequest::AttachVmo { vmo, responder } => {
148 responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
149 }
150 fblock::SessionRequest::Close { responder } => {
151 responder.send(self.0.close().await?)?;
152 }
153 }
154 Ok(())
155 }
156
157 pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
159 while let Some(Ok(request)) = stream.next().await {
160 if let Err(error) = self.handle_request(request).await {
161 log::warn!(error:?; "FIDL error");
162 }
163 }
164 Ok(())
165 }
166}
167
168pub struct SessionManager<I: ?Sized> {
169 interface: Arc<I>,
170}
171
172impl<I: Interface + ?Sized> SessionManager<I> {
173 pub fn new(interface: Arc<I>) -> Self {
174 Self { interface }
175 }
176
177 pub async fn serve_session(
179 self: Arc<Self>,
180 stream: fblock::SessionRequestStream,
181 offset_map: Option<OffsetMap>,
182 block_size: u32,
183 ) -> Result<(), Error> {
184 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
185 let helper = Arc::new(helper);
186 let interface = self.interface.clone();
187
188 let mut stream = stream.fuse();
189
190 let scope = fasync::Scope::new();
191 let helper_clone = helper.clone();
192 let mut fifo_task = scope
193 .spawn(async move {
194 if let Err(status) = run_fifo(fifo, interface, helper).await {
195 if status != zx::Status::PEER_CLOSED {
196 log::error!(status:?; "FIFO error");
197 }
198 }
199 })
200 .fuse();
201
202 scopeguard::defer! {
204 for (_, vmo) in helper_clone.take_vmos() {
205 self.interface.on_detach_vmo(&vmo);
206 }
207 }
208
209 loop {
210 futures::select! {
211 maybe_req = stream.next() => {
212 if let Some(req) = maybe_req {
213 helper_clone.handle_request(req?).await?;
214 } else {
215 break;
216 }
217 }
218 _ = fifo_task => break,
219 }
220 }
221
222 Ok(())
223 }
224}
225
226async fn run_fifo<I: Interface + ?Sized>(
228 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
229 interface: Arc<I>,
230 helper: Arc<SessionHelper<SessionManager<I>>>,
231) -> Result<(), zx::Status> {
232 let mut fifo = fasync::Fifo::from_fifo(fifo);
241 let (mut reader, mut writer) = fifo.async_io();
242 let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
243 let mut active_requests = FuturesUnordered::new();
244 let mut responses = vec![];
245
246 loop {
247 let new_requests = {
248 let pending_requests = active_requests.len() + responses.len();
250 let count = requests.len() - pending_requests;
251 let mut receive_requests = pin!(if count == 0 {
252 Fuse::terminated()
253 } else {
254 reader.read_entries(&mut requests[..count]).fuse()
255 });
256 let mut send_responses = pin!(if responses.is_empty() {
257 Fuse::terminated()
258 } else {
259 poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
260 match ready!(writer.try_write(cx, &responses[..])) {
261 Ok(written) => {
262 responses.drain(..written);
263 Poll::Ready(Ok(()))
264 }
265 Err(status) => Poll::Ready(Err(status)),
266 }
267 })
268 .fuse()
269 });
270
271 select_biased!(
274 res = send_responses => {
275 res?;
276 0
277 },
278 response = active_requests.select_next_some() => {
279 responses.extend(response);
280 0
281 }
282 count = receive_requests => {
283 count?
284 }
285 )
286 };
287 for request in &requests[..new_requests] {
290 if let Some(decoded_request) =
291 helper.decode_fifo_request(unsafe { request.assume_init_ref() })
292 {
293 let interface = interface.clone();
294 let helper = helper.clone();
295 active_requests.push(async move {
296 let tracking = decoded_request.request_tracking;
297 let status = process_fifo_request(interface, decoded_request).await.into();
298 helper.finish_fifo_request(tracking, status)
299 });
300 }
301 }
302 }
303}
304
305impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
306 async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
307 self.interface.on_attach_vmo(vmo).await
308 }
309
310 async fn open_session(
311 self: Arc<Self>,
312 stream: fblock::SessionRequestStream,
313 offset_map: Option<OffsetMap>,
314 block_size: u32,
315 ) -> Result<(), Error> {
316 self.interface.clone().open_session(self, stream, offset_map, block_size).await
317 }
318
319 async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
320 self.interface.get_info().await
321 }
322
323 async fn get_volume_info(
324 &self,
325 ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
326 self.interface.get_volume_info().await
327 }
328
329 async fn query_slices(
330 &self,
331 start_slices: &[u64],
332 ) -> Result<Vec<fvolume::VsliceRange>, zx::Status> {
333 self.interface.query_slices(start_slices).await
334 }
335
336 async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
337 self.interface.extend(start_slice, slice_count).await
338 }
339
340 async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
341 self.interface.shrink(start_slice, slice_count).await
342 }
343}
344
345impl<I: Interface> IntoSessionManager for Arc<I> {
346 type SM = SessionManager<I>;
347
348 fn into_session_manager(self) -> Arc<Self::SM> {
349 Arc::new(SessionManager { interface: self })
350 }
351}
352
353async fn process_fifo_request<I: Interface + ?Sized>(
355 interface: Arc<I>,
356 r: DecodedRequest,
357) -> Result<(), zx::Status> {
358 let trace_flow_id = r.request_tracking.trace_flow_id;
359 match r.operation? {
360 Operation::Read { device_block_offset, block_count, _unused, vmo_offset } => {
361 interface
362 .read(
363 device_block_offset,
364 block_count,
365 &r.vmo.as_ref().unwrap(),
366 vmo_offset,
367 trace_flow_id,
368 )
369 .await
370 }
371 Operation::Write { device_block_offset, block_count, options, vmo_offset } => {
372 interface
373 .write(
374 device_block_offset,
375 block_count,
376 &r.vmo.as_ref().unwrap(),
377 vmo_offset,
378 options,
379 trace_flow_id,
380 )
381 .await
382 }
383 Operation::Flush => interface.flush(trace_flow_id).await,
384 Operation::Trim { device_block_offset, block_count } => {
385 interface.trim(device_block_offset, block_count, trace_flow_id).await
386 }
387 Operation::CloseVmo => {
388 if let Some(vmo) = &r.vmo {
389 interface.on_detach_vmo(vmo);
390 }
391 Ok(())
392 }
393 }
394}