block_server/
async_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{
6    ActiveRequests, DecodedRequest, DeviceInfo, FIFO_MAX_REQUESTS, IntoSessionManager, OffsetMap,
7    Operation, SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags, WriteOptions};
11use futures::future::{Fuse, FusedFuture};
12use futures::stream::FuturesUnordered;
13use futures::{FutureExt, StreamExt, select_biased};
14use std::borrow::Cow;
15use std::collections::VecDeque;
16use std::future::{Future, poll_fn};
17use std::mem::MaybeUninit;
18use std::pin::pin;
19use std::sync::Arc;
20use std::task::{Poll, ready};
21use {
22    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
23    fuchsia_async as fasync,
24};
25
26pub trait Interface: Send + Sync + Unpin + 'static {
27    /// Runs `stream` to completion.
28    ///
29    /// Implementors can override this method if they want to create a passthrough session instead
30    /// (and can use `[PassthroughSession]` below to do so).  See
31    /// fuchsia.hardware.block.Block/OpenSessionWithOffsetMap.
32    ///
33    /// If the implementor uses a `[PassthroughSession]`, the following Interface methods
34    /// will not be called, and can be stubbed out:
35    ///   - on_attach_vmo
36    ///   - on_detach_vmo
37    ///   - read
38    ///   - write
39    ///   - flush
40    ///   - trim
41    fn open_session(
42        &self,
43        session_manager: Arc<SessionManager<Self>>,
44        stream: fblock::SessionRequestStream,
45        offset_map: OffsetMap,
46        block_size: u32,
47    ) -> impl Future<Output = Result<(), Error>> + Send {
48        // By default, serve the session rather than forwarding/proxying it.
49        session_manager.serve_session(stream, offset_map, block_size)
50    }
51
52    /// Called whenever a VMO is attached, prior to the VMO's usage in any other methods.  Whilst
53    /// the VMO is attached, `vmo` will keep the same address so it is safe to use the pointer
54    /// value (as, say, a key into a HashMap).
55    fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
56        async { Ok(()) }
57    }
58
59    /// Called whenever a VMO is detached.
60    fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
61
62    /// Called to get block/partition information.
63    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
64
65    /// Called for a request to read bytes.
66    fn read(
67        &self,
68        device_block_offset: u64,
69        block_count: u32,
70        vmo: &Arc<zx::Vmo>,
71        vmo_offset: u64, // *bytes* not blocks
72        opts: ReadOptions,
73        trace_flow_id: TraceFlowId,
74    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
75
76    /// Called for a request to write bytes. WriteOptions::PRE_BARRIER should never be seen
77    /// for this call. See barrier().
78    fn write(
79        &self,
80        device_block_offset: u64,
81        block_count: u32,
82        vmo: &Arc<zx::Vmo>,
83        vmo_offset: u64, // *bytes* not blocks
84        opts: WriteOptions,
85        trace_flow_id: TraceFlowId,
86    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
87
88    /// Indicates that all previously completed write operations should be made persistent prior to
89    /// any write operations issued after this call. It is not specified how the barrier affects
90    /// currently in-flight write operations. This corresponds to the use of the PRE_BARRIER flag
91    /// that can be used on a write request. Requests with that flag will be converted into
92    /// separate barrier and write calls, and the write call above will not ever include the
93    /// WriteOptions::PRE_BARRIER within opts.
94    fn barrier(&self) -> Result<(), zx::Status>;
95
96    /// Called to flush the device.
97    fn flush(
98        &self,
99        trace_flow_id: TraceFlowId,
100    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
101
102    /// Called to trim a region.
103    fn trim(
104        &self,
105        device_block_offset: u64,
106        block_count: u32,
107        trace_flow_id: TraceFlowId,
108    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
109
110    /// Called to handle the GetVolumeInfo FIDL call.
111    fn get_volume_info(
112        &self,
113    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
114    {
115        async { Err(zx::Status::NOT_SUPPORTED) }
116    }
117
118    /// Called to handle the QuerySlices FIDL call.
119    fn query_slices(
120        &self,
121        _start_slices: &[u64],
122    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
123        async { Err(zx::Status::NOT_SUPPORTED) }
124    }
125
126    /// Called to handle the Extend FIDL call.
127    fn extend(
128        &self,
129        _start_slice: u64,
130        _slice_count: u64,
131    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
132        async { Err(zx::Status::NOT_SUPPORTED) }
133    }
134
135    /// Called to handle the Shrink FIDL call.
136    fn shrink(
137        &self,
138        _start_slice: u64,
139        _slice_count: u64,
140    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
141        async { Err(zx::Status::NOT_SUPPORTED) }
142    }
143}
144
145/// A helper object to run a passthrough (proxy) session.
146pub struct PassthroughSession(fblock::SessionProxy);
147
148impl PassthroughSession {
149    pub fn new(proxy: fblock::SessionProxy) -> Self {
150        Self(proxy)
151    }
152
153    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
154        match request {
155            fblock::SessionRequest::GetFifo { responder } => {
156                responder.send(self.0.get_fifo().await?)?;
157            }
158            fblock::SessionRequest::AttachVmo { vmo, responder } => {
159                responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
160            }
161            fblock::SessionRequest::Close { responder } => {
162                responder.send(self.0.close().await?)?;
163            }
164        }
165        Ok(())
166    }
167
168    /// Runs `stream` until completion.
169    pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
170        while let Some(Ok(request)) = stream.next().await {
171            if let Err(error) = self.handle_request(request).await {
172                log::warn!(error:?; "FIDL error");
173            }
174        }
175        Ok(())
176    }
177}
178
179pub struct SessionManager<I: Interface + ?Sized> {
180    interface: Arc<I>,
181    active_requests: ActiveRequests<usize>,
182}
183
184impl<I: Interface + ?Sized> SessionManager<I> {
185    pub fn new(interface: Arc<I>) -> Self {
186        Self { interface, active_requests: ActiveRequests::default() }
187    }
188
189    pub fn interface(&self) -> &I {
190        self.interface.as_ref()
191    }
192
193    /// Runs `stream` until completion.
194    pub async fn serve_session(
195        self: Arc<Self>,
196        stream: fblock::SessionRequestStream,
197        offset_map: OffsetMap,
198        block_size: u32,
199    ) -> Result<(), Error> {
200        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
201        let session = Session { helper: Arc::new(helper), interface: self.interface.clone() };
202        let mut stream = stream.fuse();
203        let scope = fasync::Scope::new();
204        let helper = session.helper.clone();
205        let mut fifo_task = scope
206            .spawn(async move {
207                if let Err(status) = session.run_fifo(fifo).await {
208                    if status != zx::Status::PEER_CLOSED {
209                        log::error!(status:?; "FIFO error");
210                    }
211                }
212            })
213            .fuse();
214
215        // Make sure we detach VMOs when we go out of scope.
216        scopeguard::defer! {
217            for (_, vmo) in helper.take_vmos() {
218                self.interface.on_detach_vmo(&vmo);
219            }
220        }
221
222        loop {
223            futures::select! {
224                maybe_req = stream.next() => {
225                    if let Some(req) = maybe_req {
226                        helper.handle_request(req?).await?;
227                    } else {
228                        break;
229                    }
230                }
231                _ = fifo_task => break,
232            }
233        }
234
235        Ok(())
236    }
237}
238
239struct Session<I: Interface + ?Sized> {
240    interface: Arc<I>,
241    helper: Arc<SessionHelper<SessionManager<I>>>,
242}
243
244impl<I: Interface + ?Sized> Session<I> {
245    // A task loop for receiving and responding to FIFO requests.
246    async fn run_fifo(
247        &self,
248        fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
249    ) -> Result<(), zx::Status> {
250        scopeguard::defer! {
251            self.helper.drop_active_requests(|session| *session == self as *const _ as usize);
252        }
253
254        // The FIFO has to be processed by a single task due to implementation constraints on
255        // fuchsia_async::Fifo.  Thus, we use an event loop to drive the FIFO.  FIFO reads and
256        // writes can happen in batch, and request processing is parallel.
257        //
258        // The general flow is:
259        //  - Read messages from the FIFO, write into `requests`.
260        //  - Read `requests`, decode them, and spawn a task to process them in `active_requests`,
261        //    which will eventually write them into `responses`.
262        //  - Read `responses` and write out to the FIFO.
263        let mut fifo = fasync::Fifo::from_fifo(fifo);
264        let (mut reader, mut writer) = fifo.async_io();
265        let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
266        let active_requests = &self.helper.session_manager.active_requests;
267        let mut active_request_futures = FuturesUnordered::new();
268        let mut responses = Vec::new();
269
270        // We map requests using a single future `map_future`.  `pending_mappings` is used to queue
271        // up requests that need to be mapped.  This will serialise how mappings occur which might
272        // make updating mapping caches simpler.  If this proves to be a performance issue, we can
273        // optimise it.
274        let mut map_future = pin!(Fuse::terminated());
275        let mut pending_mappings: VecDeque<DecodedRequest> = VecDeque::new();
276
277        loop {
278            let new_requests = {
279                // We provide some flow control by limiting how many in-flight requests we will
280                // allow.
281                let pending_requests = active_request_futures.len() + responses.len();
282                let count = requests.len().saturating_sub(pending_requests);
283                let mut receive_requests = pin!(if count == 0 {
284                    Fuse::terminated()
285                } else {
286                    reader.read_entries(&mut requests[..count]).fuse()
287                });
288                let mut send_responses = pin!(if responses.is_empty() {
289                    Fuse::terminated()
290                } else {
291                    poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
292                        match ready!(writer.try_write(cx, &responses[..])) {
293                            Ok(written) => {
294                                responses.drain(..written);
295                                Poll::Ready(Ok(()))
296                            }
297                            Err(status) => Poll::Ready(Err(status)),
298                        }
299                    })
300                    .fuse()
301                });
302
303                // Order is important here.  We want to prioritize sending results on the FIFO and
304                // processing FIFO messages over receiving new ones, to provide flow control.
305                select_biased!(
306                    res = send_responses => {
307                        res?;
308                        0
309                    },
310                    response = active_request_futures.select_next_some() => {
311                        responses.extend(response);
312                        0
313                    }
314                    result = map_future => {
315                        match result {
316                            Ok((request, remainder)) => {
317                                active_request_futures.push(self.process_fifo_request(request));
318                                if let Some(remainder) = remainder {
319                                    map_future.set(self.map_request(remainder).fuse());
320                                }
321                            }
322                            Err(response) => responses.extend(response),
323                        }
324                        if map_future.is_terminated() {
325                            if let Some(request) = pending_mappings.pop_front() {
326                                map_future.set(self.map_request(request).fuse());
327                            }
328                        }
329                        0
330                    }
331                    count = receive_requests => {
332                        count?
333                    }
334                )
335            };
336
337            // NB: It is very important that there are no `await`s for the rest of the loop body, as
338            // otherwise active requests might become stalled.
339            for request in &mut requests[..new_requests] {
340                match self.helper.decode_fifo_request(self as *const _ as usize, unsafe {
341                    request.assume_init_mut()
342                }) {
343                    Ok(DecodedRequest {
344                        operation: Operation::CloseVmo, vmo, request_id, ..
345                    }) => {
346                        if let Some(vmo) = vmo {
347                            self.interface.on_detach_vmo(vmo.as_ref());
348                        }
349                        responses.extend(
350                            active_requests
351                                .complete_and_take_response(request_id, zx::Status::OK)
352                                .map(|(_, response)| response),
353                        );
354                    }
355                    Ok(mut request) => {
356                        // If `request` contains WriteOptions::PRE_BARRIER, issue the barrier
357                        // prior to mapping the request. If the barrier fails, we can
358                        // immediately respond to the request without splitting it up.
359                        if let Err(status) = self.maybe_issue_barrier(&mut request) {
360                            let response = self
361                                .helper
362                                .session_manager
363                                .active_requests
364                                .complete_and_take_response(request.request_id, status)
365                                .map(|(_, r)| r);
366                            responses.extend(response);
367                        } else if map_future.is_terminated() {
368                            map_future.set(self.map_request(request).fuse());
369                        } else {
370                            pending_mappings.push_back(request);
371                        }
372                    }
373                    Err(None) => {}
374                    Err(Some(response)) => responses.push(response),
375                }
376            }
377        }
378    }
379
380    fn maybe_issue_barrier(&self, request: &mut DecodedRequest) -> Result<(), zx::Status> {
381        if let Operation::Write {
382            device_block_offset: _,
383            block_count: _,
384            _unused,
385            options,
386            vmo_offset: _,
387            ..
388        } = &mut request.operation
389        {
390            if options.flags.contains(WriteFlags::PRE_BARRIER) {
391                self.interface.barrier()?;
392                options.flags &= !WriteFlags::PRE_BARRIER;
393            }
394        }
395        Ok(())
396    }
397
398    // This is currently async when it doesn't need to be to allow for upcoming changes.
399    async fn map_request(
400        &self,
401        request: DecodedRequest,
402    ) -> Result<(DecodedRequest, Option<DecodedRequest>), Option<BlockFifoResponse>> {
403        self.helper.map_request(request)
404    }
405
406    /// Processes a fifo request.
407    async fn process_fifo_request(
408        &self,
409        DecodedRequest { request_id, operation, vmo, trace_flow_id }: DecodedRequest,
410    ) -> Option<BlockFifoResponse> {
411        let result = match operation {
412            Operation::Read { device_block_offset, block_count, _unused, vmo_offset, options } => {
413                self.interface
414                    .read(
415                        device_block_offset,
416                        block_count,
417                        vmo.as_ref().unwrap(),
418                        vmo_offset,
419                        options,
420                        trace_flow_id,
421                    )
422                    .await
423            }
424            Operation::Write { device_block_offset, block_count, _unused, vmo_offset, options } => {
425                self.interface
426                    .write(
427                        device_block_offset,
428                        block_count,
429                        vmo.as_ref().unwrap(),
430                        vmo_offset,
431                        options,
432                        trace_flow_id,
433                    )
434                    .await
435            }
436            Operation::Flush => self.interface.flush(trace_flow_id).await,
437            Operation::Trim { device_block_offset, block_count } => {
438                self.interface.trim(device_block_offset, block_count, trace_flow_id).await
439            }
440            Operation::CloseVmo => {
441                // Handled in main request loop
442                unreachable!()
443            }
444        };
445        self.helper
446            .session_manager
447            .active_requests
448            .complete_and_take_response(request_id, result.into())
449            .map(|(_, r)| r)
450    }
451}
452
453impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
454    // We don't need the session, we just need something unique to identify the session.
455    type Session = usize;
456
457    async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
458        self.interface.on_attach_vmo(vmo).await
459    }
460
461    async fn open_session(
462        self: Arc<Self>,
463        stream: fblock::SessionRequestStream,
464        offset_map: OffsetMap,
465        block_size: u32,
466    ) -> Result<(), Error> {
467        self.interface.clone().open_session(self, stream, offset_map, block_size).await
468    }
469
470    async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
471        self.interface.get_info().await
472    }
473
474    async fn get_volume_info(
475        &self,
476    ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
477        self.interface.get_volume_info().await
478    }
479
480    async fn query_slices(
481        &self,
482        start_slices: &[u64],
483    ) -> Result<Vec<fvolume::VsliceRange>, zx::Status> {
484        self.interface.query_slices(start_slices).await
485    }
486
487    async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
488        self.interface.extend(start_slice, slice_count).await
489    }
490
491    async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
492        self.interface.shrink(start_slice, slice_count).await
493    }
494
495    fn active_requests(&self) -> &ActiveRequests<Self::Session> {
496        return &self.active_requests;
497    }
498}
499
500impl<I: Interface> IntoSessionManager for Arc<I> {
501    type SM = SessionManager<I>;
502
503    fn into_session_manager(self) -> Arc<Self::SM> {
504        Arc::new(SessionManager { interface: self, active_requests: ActiveRequests::default() })
505    }
506}