_block_server_c_rustc_static/
async_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{DecodedRequest, DeviceInfo, IntoSessionManager, OffsetMap, Operation, SessionHelper};
6use anyhow::Error;
7use block_protocol::{BlockFifoRequest, BlockFifoResponse, WriteOptions};
8use futures::future::Fuse;
9use futures::stream::FuturesUnordered;
10use futures::{select_biased, FutureExt, StreamExt};
11use std::borrow::Cow;
12use std::future::{poll_fn, Future};
13use std::mem::MaybeUninit;
14use std::num::NonZero;
15use std::pin::pin;
16use std::sync::Arc;
17use std::task::{ready, Poll};
18use {
19    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
20    fuchsia_async as fasync,
21};
22
23pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
24
25pub trait Interface: Send + Sync + Unpin + 'static {
26    /// Runs `stream` to completion.
27    ///
28    /// Implementors can override this method if they want to create a passthrough session instead
29    /// (and can use `[PassthroughSession]` below to do so).  See
30    /// fuchsia.hardware.block.Block/OpenSessionWithOffsetMap.
31    ///
32    /// If the implementor uses a `[PassthroughSession]`, the following Interface methods
33    /// will not be called, and can be stubbed out:
34    ///   - on_attach_vmo
35    ///   - on_detach_vmo
36    ///   - read
37    ///   - write
38    ///   - flush
39    ///   - trim
40    fn open_session(
41        &self,
42        session_manager: Arc<SessionManager<Self>>,
43        stream: fblock::SessionRequestStream,
44        offset_map: Option<OffsetMap>,
45        block_size: u32,
46    ) -> impl Future<Output = Result<(), Error>> + Send {
47        // By default, serve the session rather than forwarding/proxying it.
48        session_manager.serve_session(stream, offset_map, block_size)
49    }
50
51    /// Called whenever a VMO is attached, prior to the VMO's usage in any other methods.  Whilst
52    /// the VMO is attached, `vmo` will keep the same address so it is safe to use the pointer
53    /// value (as, say, a key into a HashMap).
54    fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
55        async { Ok(()) }
56    }
57
58    /// Called whenever a VMO is detached.
59    fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
60
61    /// Called to get block/partition information.
62    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
63
64    /// Called for a request to read bytes.
65    fn read(
66        &self,
67        device_block_offset: u64,
68        block_count: u32,
69        vmo: &Arc<zx::Vmo>,
70        vmo_offset: u64, // *bytes* not blocks
71        trace_flow_id: Option<NonZero<u64>>,
72    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
73
74    /// Called for a request to write bytes.
75    fn write(
76        &self,
77        device_block_offset: u64,
78        block_count: u32,
79        vmo: &Arc<zx::Vmo>,
80        vmo_offset: u64, // *bytes* not blocks
81        opts: WriteOptions,
82        trace_flow_id: Option<NonZero<u64>>,
83    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
84
85    /// Called to flush the device.
86    fn flush(
87        &self,
88        trace_flow_id: Option<NonZero<u64>>,
89    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
90
91    /// Called to trim a region.
92    fn trim(
93        &self,
94        device_block_offset: u64,
95        block_count: u32,
96        trace_flow_id: Option<NonZero<u64>>,
97    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
98
99    /// Called to handle the GetVolumeInfo FIDL call.
100    fn get_volume_info(
101        &self,
102    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
103    {
104        async { Err(zx::Status::NOT_SUPPORTED) }
105    }
106
107    /// Called to handle the QuerySlices FIDL call.
108    fn query_slices(
109        &self,
110        _start_slices: &[u64],
111    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
112        async { Err(zx::Status::NOT_SUPPORTED) }
113    }
114
115    /// Called to handle the Extend FIDL call.
116    fn extend(
117        &self,
118        _start_slice: u64,
119        _slice_count: u64,
120    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
121        async { Err(zx::Status::NOT_SUPPORTED) }
122    }
123
124    /// Called to handle the Shrink FIDL call.
125    fn shrink(
126        &self,
127        _start_slice: u64,
128        _slice_count: u64,
129    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
130        async { Err(zx::Status::NOT_SUPPORTED) }
131    }
132}
133
134/// A helper object to run a passthrough (proxy) session.
135pub struct PassthroughSession(fblock::SessionProxy);
136
137impl PassthroughSession {
138    pub fn new(proxy: fblock::SessionProxy) -> Self {
139        Self(proxy)
140    }
141
142    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
143        match request {
144            fblock::SessionRequest::GetFifo { responder } => {
145                responder.send(self.0.get_fifo().await?)?;
146            }
147            fblock::SessionRequest::AttachVmo { vmo, responder } => {
148                responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
149            }
150            fblock::SessionRequest::Close { responder } => {
151                responder.send(self.0.close().await?)?;
152            }
153        }
154        Ok(())
155    }
156
157    /// Runs `stream` until completion.
158    pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
159        while let Some(Ok(request)) = stream.next().await {
160            if let Err(error) = self.handle_request(request).await {
161                log::warn!(error:?; "FIDL error");
162            }
163        }
164        Ok(())
165    }
166}
167
168pub struct SessionManager<I: ?Sized> {
169    interface: Arc<I>,
170}
171
172impl<I: Interface + ?Sized> SessionManager<I> {
173    pub fn new(interface: Arc<I>) -> Self {
174        Self { interface }
175    }
176
177    /// Runs `stream` until completion.
178    pub async fn serve_session(
179        self: Arc<Self>,
180        stream: fblock::SessionRequestStream,
181        offset_map: Option<OffsetMap>,
182        block_size: u32,
183    ) -> Result<(), Error> {
184        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
185        let helper = Arc::new(helper);
186        let interface = self.interface.clone();
187
188        let mut stream = stream.fuse();
189
190        let scope = fasync::Scope::new();
191        let helper_clone = helper.clone();
192        let mut fifo_task = scope
193            .spawn(async move {
194                if let Err(status) = run_fifo(fifo, interface, helper).await {
195                    if status != zx::Status::PEER_CLOSED {
196                        log::error!(status:?; "FIFO error");
197                    }
198                }
199            })
200            .fuse();
201
202        // Make sure we detach VMOs when we go out of scope.
203        scopeguard::defer! {
204            for (_, vmo) in helper_clone.take_vmos() {
205                self.interface.on_detach_vmo(&vmo);
206            }
207        }
208
209        loop {
210            futures::select! {
211                maybe_req = stream.next() => {
212                    if let Some(req) = maybe_req {
213                        helper_clone.handle_request(req?).await?;
214                    } else {
215                        break;
216                    }
217                }
218                _ = fifo_task => break,
219            }
220        }
221
222        Ok(())
223    }
224}
225
226// A task loop for receiving and responding to FIFO requests.
227async fn run_fifo<I: Interface + ?Sized>(
228    fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
229    interface: Arc<I>,
230    helper: Arc<SessionHelper<SessionManager<I>>>,
231) -> Result<(), zx::Status> {
232    // The FIFO has to be processed by a single task due to implementation constraints on
233    // fuchsia_async::Fifo.  Thus, we use an event loop to drive the FIFO.  FIFO reads and writes
234    // can happen in batch, and request processing is parallel.
235    // The general flow is:
236    //  - Read messages from the FIFO, write into `requests`.
237    //  - Read `requests`, decode them, and spawn a task to process them in `active_requests`, which
238    //    will eventually write them into `responses`.
239    //  - Read `responses` and write out to the FIFO.
240    let mut fifo = fasync::Fifo::from_fifo(fifo);
241    let (mut reader, mut writer) = fifo.async_io();
242    let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
243    let mut active_requests = FuturesUnordered::new();
244    let mut responses = vec![];
245
246    loop {
247        let new_requests = {
248            // We provide some flow control by limiting how many in-flight requests we will allow.
249            let pending_requests = active_requests.len() + responses.len();
250            let count = requests.len() - pending_requests;
251            let mut receive_requests = pin!(if count == 0 {
252                Fuse::terminated()
253            } else {
254                reader.read_entries(&mut requests[..count]).fuse()
255            });
256            let mut send_responses = pin!(if responses.is_empty() {
257                Fuse::terminated()
258            } else {
259                poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
260                    match ready!(writer.try_write(cx, &responses[..])) {
261                        Ok(written) => {
262                            responses.drain(..written);
263                            Poll::Ready(Ok(()))
264                        }
265                        Err(status) => Poll::Ready(Err(status)),
266                    }
267                })
268                .fuse()
269            });
270
271            // Order is important here.  We want to prioritize sending results on the FIFO and
272            // processing FIFO messages over receiving new ones, to provide flow control.
273            select_biased!(
274                res = send_responses => {
275                    res?;
276                    0
277                },
278                response = active_requests.select_next_some() => {
279                    responses.extend(response);
280                    0
281                }
282                count = receive_requests => {
283                    count?
284                }
285            )
286        };
287        // NB: It is very important that there are no `await`s for the rest of the loop body, as
288        // otherwise active requests might become stalled.
289        for request in &requests[..new_requests] {
290            if let Some(decoded_request) =
291                helper.decode_fifo_request(unsafe { request.assume_init_ref() })
292            {
293                let interface = interface.clone();
294                let helper = helper.clone();
295                active_requests.push(async move {
296                    let tracking = decoded_request.request_tracking;
297                    let status = process_fifo_request(interface, decoded_request).await.into();
298                    helper.finish_fifo_request(tracking, status)
299                });
300            }
301        }
302    }
303}
304
305impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
306    async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
307        self.interface.on_attach_vmo(vmo).await
308    }
309
310    async fn open_session(
311        self: Arc<Self>,
312        stream: fblock::SessionRequestStream,
313        offset_map: Option<OffsetMap>,
314        block_size: u32,
315    ) -> Result<(), Error> {
316        self.interface.clone().open_session(self, stream, offset_map, block_size).await
317    }
318
319    async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
320        self.interface.get_info().await
321    }
322
323    async fn get_volume_info(
324        &self,
325    ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
326        self.interface.get_volume_info().await
327    }
328
329    async fn query_slices(
330        &self,
331        start_slices: &[u64],
332    ) -> Result<Vec<fvolume::VsliceRange>, zx::Status> {
333        self.interface.query_slices(start_slices).await
334    }
335
336    async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
337        self.interface.extend(start_slice, slice_count).await
338    }
339
340    async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
341        self.interface.shrink(start_slice, slice_count).await
342    }
343}
344
345impl<I: Interface> IntoSessionManager for Arc<I> {
346    type SM = SessionManager<I>;
347
348    fn into_session_manager(self) -> Arc<Self::SM> {
349        Arc::new(SessionManager { interface: self })
350    }
351}
352
353/// Processes a fifo request.
354async fn process_fifo_request<I: Interface + ?Sized>(
355    interface: Arc<I>,
356    r: DecodedRequest,
357) -> Result<(), zx::Status> {
358    let trace_flow_id = r.request_tracking.trace_flow_id;
359    match r.operation? {
360        Operation::Read { device_block_offset, block_count, _unused, vmo_offset } => {
361            interface
362                .read(
363                    device_block_offset,
364                    block_count,
365                    &r.vmo.as_ref().unwrap(),
366                    vmo_offset,
367                    trace_flow_id,
368                )
369                .await
370        }
371        Operation::Write { device_block_offset, block_count, options, vmo_offset } => {
372            interface
373                .write(
374                    device_block_offset,
375                    block_count,
376                    &r.vmo.as_ref().unwrap(),
377                    vmo_offset,
378                    options,
379                    trace_flow_id,
380                )
381                .await
382        }
383        Operation::Flush => interface.flush(trace_flow_id).await,
384        Operation::Trim { device_block_offset, block_count } => {
385            interface.trim(device_block_offset, block_count, trace_flow_id).await
386        }
387        Operation::CloseVmo => {
388            if let Some(vmo) = &r.vmo {
389                interface.on_detach_vmo(vmo);
390            }
391            Ok(())
392        }
393    }
394}