block_server/
async_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{DecodedRequest, DeviceInfo, IntoSessionManager, OffsetMap, Operation, SessionHelper};
6use anyhow::Error;
7use block_protocol::{BlockFifoRequest, BlockFifoResponse, WriteOptions};
8use fuchsia_async::{self as fasync, FifoReadable as _};
9use futures::future::Fuse;
10use futures::stream::FuturesUnordered;
11use futures::{select_biased, FutureExt, StreamExt};
12use std::borrow::Cow;
13use std::future::{poll_fn, Future};
14use std::mem::MaybeUninit;
15use std::num::NonZero;
16use std::pin::pin;
17use std::sync::Arc;
18use std::task::{ready, Poll};
19use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
20
21pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
22
23pub trait Interface: Send + Sync + Unpin + 'static {
24    /// Runs `stream` to completion.
25    ///
26    /// Implementors can override this method if they want to create a passthrough session instead
27    /// (and can use `[PassthroughSession]` below to do so).  See
28    /// fuchsia.hardware.block.Block/OpenSessionWithOffsetMap.
29    ///
30    /// If the implementor uses a `[PassthroughSession]`, the following Interface methods
31    /// will not be called, and can be stubbed out:
32    ///   - on_attach_vmo
33    ///   - on_detach_vmo
34    ///   - read
35    ///   - write
36    ///   - flush
37    ///   - trim
38    fn open_session(
39        &self,
40        session_manager: Arc<SessionManager<Self>>,
41        stream: fblock::SessionRequestStream,
42        offset_map: Option<OffsetMap>,
43        block_size: u32,
44    ) -> impl Future<Output = Result<(), Error>> + Send {
45        // By default, serve the session rather than forwarding/proxying it.
46        session_manager.serve_session(stream, offset_map, block_size)
47    }
48
49    /// Called whenever a VMO is attached, prior to the VMO's usage in any other methods.  Whilst
50    /// the VMO is attached, `vmo` will keep the same address so it is safe to use the pointer
51    /// value (as, say, a key into a HashMap).
52    fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
53        async { Ok(()) }
54    }
55
56    /// Called whenever a VMO is detached.
57    fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
58
59    /// Called to get block/partition information.
60    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
61
62    /// Called for a request to read bytes.
63    fn read(
64        &self,
65        device_block_offset: u64,
66        block_count: u32,
67        vmo: &Arc<zx::Vmo>,
68        vmo_offset: u64, // *bytes* not blocks
69        trace_flow_id: Option<NonZero<u64>>,
70    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
71
72    /// Called for a request to write bytes.
73    fn write(
74        &self,
75        device_block_offset: u64,
76        block_count: u32,
77        vmo: &Arc<zx::Vmo>,
78        vmo_offset: u64, // *bytes* not blocks
79        opts: WriteOptions,
80        trace_flow_id: Option<NonZero<u64>>,
81    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
82
83    /// Called to flush the device.
84    fn flush(
85        &self,
86        trace_flow_id: Option<NonZero<u64>>,
87    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
88
89    /// Called to trim a region.
90    fn trim(
91        &self,
92        device_block_offset: u64,
93        block_count: u32,
94        trace_flow_id: Option<NonZero<u64>>,
95    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
96
97    /// Called to handle the GetVolumeInfo FIDL call.
98    fn get_volume_info(
99        &self,
100    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
101    {
102        async { Err(zx::Status::NOT_SUPPORTED) }
103    }
104
105    /// Called to handle the QuerySlices FIDL call.
106    fn query_slices(
107        &self,
108        _start_slices: &[u64],
109    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
110        async { Err(zx::Status::NOT_SUPPORTED) }
111    }
112
113    /// Called to handle the Extend FIDL call.
114    fn extend(
115        &self,
116        _start_slice: u64,
117        _slice_count: u64,
118    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
119        async { Err(zx::Status::NOT_SUPPORTED) }
120    }
121
122    /// Called to handle the Shrink FIDL call.
123    fn shrink(
124        &self,
125        _start_slice: u64,
126        _slice_count: u64,
127    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
128        async { Err(zx::Status::NOT_SUPPORTED) }
129    }
130}
131
132/// A helper object to run a passthrough (proxy) session.
133pub struct PassthroughSession(fblock::SessionProxy);
134
135impl PassthroughSession {
136    pub fn new(proxy: fblock::SessionProxy) -> Self {
137        Self(proxy)
138    }
139
140    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
141        match request {
142            fblock::SessionRequest::GetFifo { responder } => {
143                responder.send(self.0.get_fifo().await?)?;
144            }
145            fblock::SessionRequest::AttachVmo { vmo, responder } => {
146                responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
147            }
148            fblock::SessionRequest::Close { responder } => {
149                responder.send(self.0.close().await?)?;
150            }
151        }
152        Ok(())
153    }
154
155    /// Runs `stream` until completion.
156    pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
157        while let Some(Ok(request)) = stream.next().await {
158            if let Err(error) = self.handle_request(request).await {
159                log::warn!(error:?; "FIDL error");
160            }
161        }
162        Ok(())
163    }
164}
165
166pub struct SessionManager<I: ?Sized> {
167    interface: Arc<I>,
168}
169
170impl<I: Interface + ?Sized> SessionManager<I> {
171    pub fn new(interface: Arc<I>) -> Self {
172        Self { interface }
173    }
174
175    /// Runs `stream` until completion.
176    pub async fn serve_session(
177        self: Arc<Self>,
178        stream: fblock::SessionRequestStream,
179        offset_map: Option<OffsetMap>,
180        block_size: u32,
181    ) -> Result<(), Error> {
182        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
183        let helper = Arc::new(helper);
184        let interface = self.interface.clone();
185
186        let mut stream = stream.fuse();
187
188        let scope = fasync::Scope::new();
189        let helper_clone = helper.clone();
190        let mut fifo_task = scope
191            .spawn(async move {
192                if let Err(status) = run_fifo(fifo, interface, helper).await {
193                    if status != zx::Status::PEER_CLOSED {
194                        log::error!(status:?; "FIFO error");
195                    }
196                }
197            })
198            .fuse();
199
200        // Make sure we detach VMOs when we go out of scope.
201        scopeguard::defer! {
202            for (_, vmo) in helper_clone.take_vmos() {
203                self.interface.on_detach_vmo(&vmo);
204            }
205        }
206
207        loop {
208            futures::select! {
209                maybe_req = stream.next() => {
210                    if let Some(req) = maybe_req {
211                        helper_clone.handle_request(req?).await?;
212                    } else {
213                        break;
214                    }
215                }
216                _ = fifo_task => break,
217            }
218        }
219
220        Ok(())
221    }
222}
223
224// A task loop for receiving and responding to FIFO requests.
225async fn run_fifo<I: Interface + ?Sized>(
226    fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
227    interface: Arc<I>,
228    helper: Arc<SessionHelper<SessionManager<I>>>,
229) -> Result<(), zx::Status> {
230    // The FIFO has to be processed by a single task due to implementation constraints on
231    // fuchsia_async::Fifo.  Thus, we use an event loop to drive the FIFO.  FIFO reads and writes
232    // can happen in batch, and request processing is parallel.
233    // The general flow is:
234    //  - Read messages from the FIFO, write into `requests`.
235    //  - Read `requests`, decode them, and spawn a task to process them in `active_requests`, which
236    //    will eventually write them into `responses`.
237    //  - Read `responses` and write out to the FIFO.
238    let fifo = fasync::Fifo::from_fifo(fifo);
239    let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
240    let mut active_requests = FuturesUnordered::new();
241    let mut responses = vec![];
242
243    loop {
244        let new_requests = {
245            // We provide some flow control by limiting how many in-flight requests we will allow.
246            let pending_requests = active_requests.len() + responses.len();
247            let count = requests.len() - pending_requests;
248            let mut receive_requests = pin!(if count == 0 {
249                Fuse::terminated()
250            } else {
251                fifo.read_entries(&mut requests[..count]).fuse()
252            });
253            let mut send_responses = pin!(if responses.is_empty() {
254                Fuse::terminated()
255            } else {
256                poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
257                    match ready!(fifo.try_write(cx, &responses[..])) {
258                        Ok(written) => {
259                            responses.drain(..written);
260                            Poll::Ready(Ok(()))
261                        }
262                        Err(status) => Poll::Ready(Err(status)),
263                    }
264                })
265                .fuse()
266            });
267
268            // Order is important here.  We want to prioritize sending results on the FIFO and
269            // processing FIFO messages over receiving new ones, to provide flow control.
270            select_biased!(
271                res = send_responses => {
272                    res?;
273                    0
274                },
275                response = active_requests.select_next_some() => {
276                    responses.extend(response);
277                    0
278                }
279                count = receive_requests => {
280                    count?
281                }
282            )
283        };
284        // NB: It is very important that there are no `await`s for the rest of the loop body, as
285        // otherwise active requests might become stalled.
286        for request in &requests[..new_requests] {
287            if let Some(decoded_request) =
288                helper.decode_fifo_request(unsafe { request.assume_init_ref() })
289            {
290                let interface = interface.clone();
291                let helper = helper.clone();
292                active_requests.push(async move {
293                    let tracking = decoded_request.request_tracking;
294                    let status = process_fifo_request(interface, decoded_request).await.into();
295                    helper.finish_fifo_request(tracking, status)
296                });
297            }
298        }
299    }
300}
301
302impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
303    async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
304        self.interface.on_attach_vmo(vmo).await
305    }
306
307    async fn open_session(
308        self: Arc<Self>,
309        stream: fblock::SessionRequestStream,
310        offset_map: Option<OffsetMap>,
311        block_size: u32,
312    ) -> Result<(), Error> {
313        self.interface.clone().open_session(self, stream, offset_map, block_size).await
314    }
315
316    async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
317        self.interface.get_info().await
318    }
319
320    async fn get_volume_info(
321        &self,
322    ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
323        self.interface.get_volume_info().await
324    }
325
326    async fn query_slices(
327        &self,
328        start_slices: &[u64],
329    ) -> Result<Vec<fvolume::VsliceRange>, zx::Status> {
330        self.interface.query_slices(start_slices).await
331    }
332
333    async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
334        self.interface.extend(start_slice, slice_count).await
335    }
336
337    async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
338        self.interface.shrink(start_slice, slice_count).await
339    }
340}
341
342impl<I: Interface> IntoSessionManager for Arc<I> {
343    type SM = SessionManager<I>;
344
345    fn into_session_manager(self) -> Arc<Self::SM> {
346        Arc::new(SessionManager { interface: self })
347    }
348}
349
350/// Processes a fifo request.
351async fn process_fifo_request<I: Interface + ?Sized>(
352    interface: Arc<I>,
353    r: DecodedRequest,
354) -> Result<(), zx::Status> {
355    let trace_flow_id = r.request_tracking.trace_flow_id;
356    match r.operation? {
357        Operation::Read { device_block_offset, block_count, _unused, vmo_offset } => {
358            interface
359                .read(
360                    device_block_offset,
361                    block_count,
362                    &r.vmo.as_ref().unwrap(),
363                    vmo_offset,
364                    trace_flow_id,
365                )
366                .await
367        }
368        Operation::Write { device_block_offset, block_count, options, vmo_offset } => {
369            interface
370                .write(
371                    device_block_offset,
372                    block_count,
373                    &r.vmo.as_ref().unwrap(),
374                    vmo_offset,
375                    options,
376                    trace_flow_id,
377                )
378                .await
379        }
380        Operation::Flush => interface.flush(trace_flow_id).await,
381        Operation::Trim { device_block_offset, block_count } => {
382            interface.trim(device_block_offset, block_count, trace_flow_id).await
383        }
384        Operation::CloseVmo => {
385            if let Some(vmo) = &r.vmo {
386                interface.on_detach_vmo(vmo);
387            }
388            Ok(())
389        }
390    }
391}