Skip to main content

block_server/
async_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{
6    ActiveRequests, DecodedRequest, DeviceInfo, FIFO_MAX_REQUESTS, HandleRequestResult,
7    IntoOrchestrator, OffsetMap, Operation, SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags, WriteOptions};
11use fidl_fuchsia_storage_block as fblock;
12use fidl_fuchsia_storage_block::DeviceFlag;
13use fuchsia_async as fasync;
14use fuchsia_sync::Mutex;
15use futures::future::{Fuse, FusedFuture, join};
16use futures::stream::FuturesUnordered;
17use futures::{FutureExt, StreamExt, select_biased};
18use std::borrow::Cow;
19use std::collections::VecDeque;
20use std::future::{Future, poll_fn};
21use std::mem::MaybeUninit;
22use std::pin::pin;
23use std::sync::{Arc, OnceLock};
24use std::task::{Poll, ready};
25use storage_device::buffer::Buffer;
26use storage_device::buffer_allocator::{BufferAllocator, BufferSource};
27
28pub trait Interface: Send + Sync + Unpin + 'static {
29    /// Runs `stream` to completion.
30    ///
31    /// Implementors can override this method if they want to create a passthrough session instead
32    /// (and can use `[PassthroughSession]` below to do so).  See
33    /// fuchsia.hardware.block.Block/OpenSessionWithOffsetMap.
34    ///
35    /// If the implementor uses a `[PassthroughSession]`, the following Interface methods
36    /// will not be called, and can be stubbed out:
37    ///   - on_attach_vmo
38    ///   - on_detach_vmo
39    ///   - read
40    ///   - write
41    ///   - flush
42    ///   - trim
43    fn open_session(
44        &self,
45        session_manager: Arc<SessionManager<Self>>,
46        stream: fblock::SessionRequestStream,
47        offset_map: OffsetMap,
48        block_size: u32,
49    ) -> impl Future<Output = Result<(), Error>> + Send {
50        // By default, serve the session rather than forwarding/proxying it.
51        session_manager.serve_session(stream, offset_map, block_size)
52    }
53
54    /// Called whenever a VMO is attached, prior to the VMO's usage in any other methods.  Whilst
55    /// the VMO is attached, `vmo` will keep the same address so it is safe to use the pointer
56    /// value (as, say, a key into a HashMap).
57    fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
58        async { Ok(()) }
59    }
60
61    /// Called whenever a VMO is detached.
62    fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
63
64    /// Called to get block/partition information.
65    fn get_info(&self) -> Cow<'_, DeviceInfo>;
66
67    /// Called for a request to read bytes.
68    fn read(
69        &self,
70        device_block_offset: u64,
71        block_count: u32,
72        vmo: &Arc<zx::Vmo>,
73        vmo_offset: u64, // *bytes* not blocks
74        opts: ReadOptions,
75        trace_flow_id: TraceFlowId,
76    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
77
78    /// Called for a request to write bytes.
79    fn write(
80        &self,
81        device_block_offset: u64,
82        block_count: u32,
83        vmo: &Arc<zx::Vmo>,
84        vmo_offset: u64, // *bytes* not blocks
85        opts: WriteOptions,
86        trace_flow_id: TraceFlowId,
87    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
88
89    /// Called to flush the device.
90    fn flush(
91        &self,
92        trace_flow_id: TraceFlowId,
93    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
94
95    /// Called to trim a region.
96    fn trim(
97        &self,
98        device_block_offset: u64,
99        block_count: u32,
100        trace_flow_id: TraceFlowId,
101    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
102
103    /// Called to handle the GetVolumeInfo FIDL call.
104    fn get_volume_info(
105        &self,
106    ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
107    {
108        async { Err(zx::Status::NOT_SUPPORTED) }
109    }
110
111    /// Called to handle the QuerySlices FIDL call.
112    fn query_slices(
113        &self,
114        _start_slices: &[u64],
115    ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
116        async { Err(zx::Status::NOT_SUPPORTED) }
117    }
118
119    /// Called to handle the Extend FIDL call.
120    fn extend(
121        &self,
122        _start_slice: u64,
123        _slice_count: u64,
124    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
125        async { Err(zx::Status::NOT_SUPPORTED) }
126    }
127
128    /// Called to handle the Shrink FIDL call.
129    fn shrink(
130        &self,
131        _start_slice: u64,
132        _slice_count: u64,
133    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
134        async { Err(zx::Status::NOT_SUPPORTED) }
135    }
136}
137
138/// A helper object to run a passthrough (proxy) session.
139pub struct PassthroughSession(fblock::SessionProxy);
140
141impl PassthroughSession {
142    pub fn new(proxy: fblock::SessionProxy) -> Self {
143        Self(proxy)
144    }
145
146    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
147        match request {
148            fblock::SessionRequest::GetFifo { responder } => {
149                responder.send(self.0.get_fifo().await?)?;
150            }
151            fblock::SessionRequest::AttachVmo { vmo, responder } => {
152                responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
153            }
154            fblock::SessionRequest::Close { responder } => {
155                responder.send(self.0.close().await?)?;
156            }
157        }
158        Ok(())
159    }
160
161    /// Runs `stream` until completion.
162    pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
163        while let Some(Ok(request)) = stream.next().await {
164            if let Err(error) = self.handle_request(request).await {
165                log::warn!(error:?; "FIDL error");
166            }
167        }
168        Ok(())
169    }
170}
171
172pub struct SessionManager<I: Interface + ?Sized> {
173    interface: Arc<I>,
174    active_requests: ActiveRequests<usize>,
175
176    // NOTE: This must be dropped *after* `active_requests` because we store `Buffer<'_>` with an
177    // erased ('static) lifetime in `ActiveRequest`.
178    buffer_allocator: OnceLock<BufferAllocator>,
179}
180
181impl<I: Interface + ?Sized> Drop for SessionManager<I> {
182    fn drop(&mut self) {
183        if let Some(allocator) = self.buffer_allocator.get() {
184            self.interface.on_detach_vmo(allocator.buffer_source().vmo());
185        }
186    }
187}
188
189impl<I: Interface + ?Sized> SessionManager<I> {
190    pub fn new(interface: Arc<I>) -> Self {
191        Self {
192            interface,
193            active_requests: ActiveRequests::default(),
194            buffer_allocator: OnceLock::new(),
195        }
196    }
197
198    pub fn interface(&self) -> &I {
199        self.interface.as_ref()
200    }
201
202    /// Runs `stream` until completion.
203    pub async fn serve_session(
204        self: Arc<Self>,
205        stream: fblock::SessionRequestStream,
206        offset_map: OffsetMap,
207        block_size: u32,
208    ) -> Result<(), Error> {
209        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
210        let session = Arc::new(Session {
211            helper: Arc::new(helper),
212            interface: self.interface.clone(),
213            close_callback: Mutex::new(None),
214        });
215
216        let (stop_sender, stop_receiver) = futures::channel::oneshot::channel();
217
218        let mut stream = stream.fuse();
219        let scope = fasync::Scope::new();
220        let session_clone = session.clone();
221        let mut fifo_task = scope
222            .spawn(async move {
223                if let Err(status) = session_clone.run_fifo(fifo, stop_receiver).await {
224                    if status != zx::Status::PEER_CLOSED {
225                        log::error!(status:?; "FIFO error");
226                    }
227                }
228            })
229            .fuse();
230
231        // Make sure we detach VMOs when we go out of scope.
232        scopeguard::defer! {
233            for (_, (vmo, _)) in session.helper.take_vmos() {
234                self.interface.on_detach_vmo(&vmo);
235            }
236        }
237
238        let mut closing = false;
239        let mut stop_sender = Some(stop_sender);
240
241        loop {
242            futures::select! {
243                maybe_req = if closing {
244                    futures::future::pending().left_future()
245                } else {
246                    stream.next().right_future()
247                } => {
248                    if let Some(req) = maybe_req {
249                        match session.helper.handle_request(req?).await? {
250                            HandleRequestResult::Ok => {},
251                            HandleRequestResult::Closed(callback) => {
252                                *session.close_callback.lock() = Some(callback);
253                                // Client explicitly closed stream, stop processing.
254                                if let Some(sender) = stop_sender.take() {
255                                    let _ = sender.send(());
256                                }
257                                closing = true;
258                            }
259                        }
260                    } else {
261                        // Client end of stream dropped, stop processing.
262                        if let Some(sender) = stop_sender.take() {
263                            let _ = sender.send(());
264                        }
265                        closing = true;
266                    }
267                }
268                _ = fifo_task => break,
269            }
270        }
271        Ok(())
272    }
273}
274
275pub struct Session<I: Interface + ?Sized> {
276    interface: Arc<I>,
277    helper: Arc<SessionHelper<SessionManager<I>>>,
278    close_callback: Mutex<Option<Box<dyn FnOnce() + Send + 'static>>>,
279}
280
281impl<I: Interface + ?Sized> Session<I> {
282    // A task loop for receiving and responding to FIFO requests.
283    async fn run_fifo(
284        &self,
285        fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
286        stop_signal: futures::channel::oneshot::Receiver<()>,
287    ) -> Result<(), zx::Status> {
288        scopeguard::defer! {
289            // Ensure that we always clean up active requests for this session upon FIFO
290            // termination.
291            self.helper.drop_active_requests(|session| *session == self as *const _ as usize);
292        }
293
294        // The FIFO has to be processed by a single task due to implementation constraints on
295        // fuchsia_async::Fifo.  Thus, we use an event loop to drive the FIFO.  FIFO reads and
296        // writes can happen in batch, and request processing is parallel.
297        //
298        // The general flow is:
299        //  - Read messages from the FIFO, write into `requests`.
300        //  - Read `requests`, decode them, and spawn a task to process them in `active_requests`,
301        //    which will eventually write them into `responses`.
302        //  - Read `responses` and write out to the FIFO.
303        let mut fifo = fasync::Fifo::from_fifo(fifo);
304        let (mut reader, mut writer) = fifo.async_io();
305        let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
306        let active_requests = &self.helper.session_manager().active_requests;
307        let mut active_request_futures = FuturesUnordered::new();
308        let mut responses = Vec::new();
309
310        // We map requests using a single future `map_future`.  `pending_mappings` is used to queue
311        // up requests that need to be mapped.  This will serialise how mappings occur which might
312        // make updating mapping caches simpler.  If this proves to be a performance issue, we can
313        // optimise it.
314        let mut map_future = pin!(Fuse::terminated());
315        let mut pending_mappings: VecDeque<DecodedRequest> = VecDeque::new();
316
317        // When `stop_signal` is received, we stop reading from the FIFO and wait for in-flight
318        // tasks to complete.
319        let mut stop_signal = pin!(stop_signal.fuse());
320        let mut is_closed = false;
321
322        loop {
323            let new_requests = {
324                // We provide some flow control by limiting how many in-flight requests we will
325                // allow.
326                let pending_requests = active_request_futures.len() + responses.len();
327
328                if is_closed
329                    && pending_requests == 0
330                    && map_future.is_terminated()
331                    && pending_mappings.is_empty()
332                {
333                    return Ok(());
334                }
335
336                let count = requests.len().saturating_sub(pending_requests);
337                let mut receive_requests = pin!(if count == 0 || is_closed {
338                    Fuse::terminated()
339                } else {
340                    reader.read_entries(&mut requests[..count]).fuse()
341                });
342                let mut send_responses = pin!(if responses.is_empty() {
343                    Fuse::terminated()
344                } else {
345                    poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
346                        match ready!(writer.try_write(cx, &responses[..])) {
347                            Ok(written) => {
348                                responses.drain(..written);
349                                Poll::Ready(Ok(()))
350                            }
351                            Err(status) => Poll::Ready(Err(status)),
352                        }
353                    })
354                    .fuse()
355                });
356
357                // Order is important here.  We want to prioritize sending results on the FIFO and
358                // processing FIFO messages over receiving new ones, to provide flow control.
359                select_biased!(
360                    res = send_responses => {
361                        res?;
362                        0
363                    },
364                    response = active_request_futures.select_next_some() => {
365                        responses.extend(response);
366                        0
367                    }
368                    result = map_future => {
369                        match result {
370                            Ok((request, remainder, commit_decompression_buffers)) => {
371                                active_request_futures.push(self.process_fifo_request(
372                                    request,
373                                    commit_decompression_buffers,
374                                ));
375                                if let Some(remainder) = remainder {
376                                    map_future.set(
377                                        self.map_request_or_get_response(remainder).fuse()
378                                    );
379                                }
380                            }
381                            Err(response) => responses.extend(response),
382                        }
383                        if map_future.is_terminated() {
384                            if let Some(request) = pending_mappings.pop_front() {
385                                map_future.set(self.map_request_or_get_response(request).fuse());
386                            }
387                        }
388                        0
389                    }
390                    _ = stop_signal => {
391                        is_closed = true;
392                        0
393                    }
394                    count = receive_requests => {
395                        count?
396                    }
397                )
398            };
399
400            // NB: It is very important that there are no `await`s for the rest of the loop body, as
401            // otherwise active requests might become stalled.
402            for request in &mut requests[..new_requests] {
403                match self.helper.decode_fifo_request(self as *const _ as usize, unsafe {
404                    request.assume_init_mut()
405                }) {
406                    Ok(DecodedRequest {
407                        operation: Operation::CloseVmo, vmo, request_id, ..
408                    }) => {
409                        if let Some(vmo) = vmo {
410                            self.interface.on_detach_vmo(vmo.as_ref());
411                        }
412                        responses.extend(
413                            active_requests
414                                .complete_and_take_response(request_id, zx::Status::OK)
415                                .map(|(_, response)| response),
416                        );
417                    }
418                    Ok(request) => {
419                        if map_future.is_terminated() {
420                            map_future.set(self.map_request_or_get_response(request).fuse());
421                        } else {
422                            pending_mappings.push_back(request);
423                        }
424                    }
425                    Err(None) => {}
426                    Err(Some(response)) => responses.push(response),
427                }
428            }
429        }
430    }
431
432    async fn map_request_or_get_response(
433        &self,
434        request: DecodedRequest,
435    ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), Option<BlockFifoResponse>> {
436        let request_id = request.request_id;
437        self.map_request(request).await.map_err(|status| {
438            self.helper
439                .orchestrator
440                .active_requests
441                .complete_and_take_response(request_id, status)
442                .map(|(_, r)| r)
443        })
444    }
445
446    // NOTE: The implementation of this currently assumes that we are only processing a single map
447    // request at a time.
448    async fn map_request(
449        &self,
450        mut request: DecodedRequest,
451    ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), zx::Status> {
452        let mut active_requests;
453        let active_request;
454        let mut commit_decompression_buffers = false;
455        let flags = self.interface.get_info().as_ref().device_flags();
456        // Strip the PRE_BARRIER flag if we don't support it, and simulate the barrier with a
457        // pre-flush.
458        if !flags.contains(DeviceFlag::BARRIER_SUPPORT)
459            && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
460        {
461            if let Some(id) = request.trace_flow_id {
462                fuchsia_trace::async_instant!(
463                    fuchsia_trace::Id::from(id.get()),
464                    "storage",
465                    "block_server::SimulatedBarrier",
466                    "request_id" => request.request_id.0
467                );
468            }
469            self.interface.flush(request.trace_flow_id).await?;
470        }
471
472        // Handle decompressed read operations by turning them into regular read operations.
473        match request.operation {
474            Operation::StartDecompressedRead {
475                required_buffer_size,
476                device_block_offset,
477                block_count,
478                options,
479            } => {
480                let allocator = match self.helper.session_manager().buffer_allocator.get() {
481                    Some(a) => a,
482                    None => {
483                        // This isn't racy because there should only be one `map_request` future
484                        // running at any one time.
485                        let source = BufferSource::new(fblock::MAX_DECOMPRESSED_BYTES as usize);
486                        self.interface.on_attach_vmo(&source.vmo()).await?;
487                        let allocator = BufferAllocator::new(
488                            std::cmp::max(
489                                self.helper.block_size as usize,
490                                zx::system_get_page_size() as usize,
491                            ),
492                            source,
493                        );
494                        self.helper.session_manager().buffer_allocator.set(allocator).unwrap();
495                        self.helper.session_manager().buffer_allocator.get().unwrap()
496                    }
497                };
498
499                if required_buffer_size > fblock::MAX_DECOMPRESSED_BYTES as usize {
500                    return Err(zx::Status::OUT_OF_RANGE);
501                }
502
503                let buffer = allocator.allocate_buffer(required_buffer_size).await;
504                let vmo_offset = buffer.range().start as u64;
505
506                // # Safety
507                //
508                // See below.
509                unsafe fn remove_lifetime(buffer: Buffer<'_>) -> Buffer<'static> {
510                    unsafe { std::mem::transmute(buffer) }
511                }
512
513                active_requests = self.helper.session_manager().active_requests.0.lock();
514                active_request = &mut active_requests.requests[request.request_id.0];
515
516                // SAFETY: We guarantee that `buffer_allocator` is dropped after `active_requests`,
517                // so this should be safe.
518                active_request.decompression_info.as_mut().unwrap().buffer =
519                    Some(unsafe { remove_lifetime(buffer) });
520
521                request.operation = Operation::Read {
522                    device_block_offset,
523                    block_count,
524                    _unused: 0,
525                    vmo_offset,
526                    options,
527                };
528                request.vmo = Some(allocator.buffer_source().vmo().clone());
529
530                commit_decompression_buffers = true;
531            }
532            Operation::ContinueDecompressedRead {
533                offset,
534                device_block_offset,
535                block_count,
536                options,
537            } => {
538                active_requests = self.helper.session_manager().active_requests.0.lock();
539                active_request = &mut active_requests.requests[request.request_id.0];
540
541                let buffer =
542                    active_request.decompression_info.as_ref().unwrap().buffer.as_ref().unwrap();
543
544                // Make sure this read won't overflow our buffer.
545                if offset >= buffer.len() as u64
546                    || buffer.len() as u64 - offset
547                        < block_count as u64 * self.helper.block_size as u64
548                {
549                    return Err(zx::Status::OUT_OF_RANGE);
550                }
551
552                request.operation = Operation::Read {
553                    device_block_offset,
554                    block_count,
555                    _unused: 0,
556                    vmo_offset: buffer.range().start as u64 + offset,
557                    options,
558                };
559
560                let allocator = self.helper.session_manager().buffer_allocator.get().unwrap();
561                request.vmo = Some(allocator.buffer_source().vmo().clone());
562            }
563            _ => {
564                active_requests = self.helper.session_manager().active_requests.0.lock();
565                active_request = &mut active_requests.requests[request.request_id.0];
566            }
567        }
568
569        // NB: We propagate the FORCE_ACCESS flag to *both* request and remainder, even if we're
570        // using simulated FUA.  However, in `process_fifo_request`, we'll only do the post-flush
571        // once the last request completes.
572        self.helper
573            .map_request(request, active_request)
574            .map(|(request, remainder)| (request, remainder, commit_decompression_buffers))
575    }
576
577    /// Processes a fifo request.
578    async fn process_fifo_request(
579        &self,
580        DecodedRequest { request_id, operation, vmo, trace_flow_id }: DecodedRequest,
581        commit_decompression_buffers: bool,
582    ) -> Option<BlockFifoResponse> {
583        let mut needs_postflush = false;
584        let result = match operation {
585            Operation::Read { device_block_offset, block_count, _unused, vmo_offset, options } => {
586                join(
587                    self.interface.read(
588                        device_block_offset,
589                        block_count,
590                        vmo.as_ref().unwrap(),
591                        vmo_offset,
592                        options,
593                        trace_flow_id,
594                    ),
595                    async {
596                        if commit_decompression_buffers {
597                            let (target_slice, buffer_slice, buffer_range) = {
598                                let active_request = self
599                                    .helper
600                                    .session_manager()
601                                    .active_requests
602                                    .request(request_id);
603                                let info = active_request.decompression_info.as_ref().unwrap();
604                                (
605                                    info.uncompressed_slice(),
606                                    self.helper
607                                        .orchestrator
608                                        .buffer_allocator
609                                        .get()
610                                        .unwrap()
611                                        .buffer_source()
612                                        .slice(),
613                                    info.buffer.as_ref().unwrap().range(),
614                                )
615                            };
616                            let vmar = fuchsia_runtime::vmar_root_self();
617                            // The target slice might not be page aligned.
618                            let addr = target_slice.addr();
619                            let unaligned = addr % zx::system_get_page_size() as usize;
620                            if let Err(error) = vmar.op_range(
621                                zx::VmarOp::COMMIT,
622                                addr - unaligned,
623                                target_slice.len() + unaligned,
624                            ) {
625                                log::warn!(error:?; "Unable to commit target range");
626                            }
627                            // But the buffer range should be.
628                            if let Err(error) = vmar.op_range(
629                                zx::VmarOp::PREFETCH,
630                                buffer_slice.addr() + buffer_range.start,
631                                buffer_range.len(),
632                            ) {
633                                log::warn!(
634                                    error:?,
635                                    buffer_range:?;
636                                    "Unable to prefetch source range",
637                                );
638                            }
639                        }
640                    },
641                )
642                .await
643                .0
644            }
645            Operation::Write {
646                device_block_offset,
647                block_count,
648                _unused,
649                vmo_offset,
650                mut options,
651            } => {
652                // Strip the FORCE_ACCESS flag if we don't support it, and simulate the FUA with a
653                // post-flush.
654                if options.flags.contains(WriteFlags::FORCE_ACCESS) {
655                    let flags = self.interface.get_info().as_ref().device_flags();
656                    if !flags.contains(DeviceFlag::FUA_SUPPORT) {
657                        options.flags.remove(WriteFlags::FORCE_ACCESS);
658                        needs_postflush = true;
659                    }
660                }
661                self.interface
662                    .write(
663                        device_block_offset,
664                        block_count,
665                        vmo.as_ref().unwrap(),
666                        vmo_offset,
667                        options,
668                        trace_flow_id,
669                    )
670                    .await
671            }
672            Operation::Flush => self.interface.flush(trace_flow_id).await,
673            Operation::Trim { device_block_offset, block_count } => {
674                self.interface.trim(device_block_offset, block_count, trace_flow_id).await
675            }
676            Operation::CloseVmo
677            | Operation::StartDecompressedRead { .. }
678            | Operation::ContinueDecompressedRead { .. } => {
679                // Handled in main request loop
680                unreachable!()
681            }
682        };
683        let response = self
684            .helper
685            .orchestrator
686            .active_requests
687            .complete_and_take_response(request_id, result.into())
688            .map(|(_, r)| r);
689        if let Some(mut response) = response {
690            // Only do the post-flush on the very last request, and only if successful.
691            if zx::Status::from_raw(response.status) == zx::Status::OK && needs_postflush {
692                if let Some(id) = trace_flow_id {
693                    fuchsia_trace::async_instant!(
694                        fuchsia_trace::Id::from(id.get()),
695                        "storage",
696                        "block_server::SimulatedFUA",
697                        "request_id" => request_id.0
698                    );
699                }
700                response.status =
701                    zx::Status::from(self.interface.flush(trace_flow_id).await).into_raw();
702            }
703            Some(response)
704        } else {
705            response
706        }
707    }
708}
709
710impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
711    type Orchestrator = Self;
712
713    const SUPPORTS_DECOMPRESSION: bool = true;
714
715    // We don't need the session, we just need something unique to identify the session.
716    type Session = usize;
717
718    async fn on_attach_vmo(orchestrator: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
719        I::on_attach_vmo(&orchestrator.interface, vmo).await
720    }
721
722    async fn open_session(
723        orchestrator: Arc<Self>,
724        stream: fblock::SessionRequestStream,
725        offset_map: OffsetMap,
726        block_size: u32,
727    ) -> Result<(), Error> {
728        I::open_session(
729            &orchestrator.interface,
730            orchestrator.clone(),
731            stream,
732            offset_map,
733            block_size,
734        )
735        .await
736    }
737
738    fn get_info(&self) -> Cow<'_, DeviceInfo> {
739        self.interface.get_info()
740    }
741
742    async fn get_volume_info(
743        &self,
744    ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
745        self.interface.get_volume_info().await
746    }
747
748    async fn query_slices(
749        &self,
750        start_slices: &[u64],
751    ) -> Result<Vec<fblock::VsliceRange>, zx::Status> {
752        self.interface.query_slices(start_slices).await
753    }
754
755    async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
756        self.interface.extend(start_slice, slice_count).await
757    }
758
759    async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
760        self.interface.shrink(start_slice, slice_count).await
761    }
762
763    fn active_requests(&self) -> &ActiveRequests<Self::Session> {
764        return &self.active_requests;
765    }
766}
767
768impl<I: Interface + ?Sized> Drop for Session<I> {
769    fn drop(&mut self) {
770        let callback = std::mem::take(&mut *self.close_callback.lock());
771        if let Some(callback) = callback {
772            callback();
773        }
774    }
775}
776
777impl<I: Interface> IntoOrchestrator for Arc<I> {
778    type SM = SessionManager<I>;
779
780    fn into_orchestrator(self) -> Arc<Self::SM> {
781        Arc::new(SessionManager {
782            interface: self,
783            active_requests: ActiveRequests::default(),
784            buffer_allocator: OnceLock::new(),
785        })
786    }
787}