block_server/
async_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{
6    ActiveRequests, DecodedRequest, DeviceInfo, FIFO_MAX_REQUESTS, IntoSessionManager, OffsetMap,
7    Operation, SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags, WriteOptions};
11use fidl_fuchsia_hardware_block::Flag;
12use futures::future::{Fuse, FusedFuture, join};
13use futures::stream::FuturesUnordered;
14use futures::{FutureExt, StreamExt, select_biased};
15use std::borrow::Cow;
16use std::collections::VecDeque;
17use std::future::{Future, poll_fn};
18use std::mem::MaybeUninit;
19use std::pin::pin;
20use std::sync::{Arc, OnceLock};
21use std::task::{Poll, ready};
22use storage_device::buffer::Buffer;
23use storage_device::buffer_allocator::{BufferAllocator, BufferSource};
24use {
25    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
26    fuchsia_async as fasync,
27};
28
29pub trait Interface: Send + Sync + Unpin + 'static {
30    /// Runs `stream` to completion.
31    ///
32    /// Implementors can override this method if they want to create a passthrough session instead
33    /// (and can use `[PassthroughSession]` below to do so).  See
34    /// fuchsia.hardware.block.Block/OpenSessionWithOffsetMap.
35    ///
36    /// If the implementor uses a `[PassthroughSession]`, the following Interface methods
37    /// will not be called, and can be stubbed out:
38    ///   - on_attach_vmo
39    ///   - on_detach_vmo
40    ///   - read
41    ///   - write
42    ///   - flush
43    ///   - trim
44    fn open_session(
45        &self,
46        session_manager: Arc<SessionManager<Self>>,
47        stream: fblock::SessionRequestStream,
48        offset_map: OffsetMap,
49        block_size: u32,
50    ) -> impl Future<Output = Result<(), Error>> + Send {
51        // By default, serve the session rather than forwarding/proxying it.
52        session_manager.serve_session(stream, offset_map, block_size)
53    }
54
55    /// Called whenever a VMO is attached, prior to the VMO's usage in any other methods.  Whilst
56    /// the VMO is attached, `vmo` will keep the same address so it is safe to use the pointer
57    /// value (as, say, a key into a HashMap).
58    fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
59        async { Ok(()) }
60    }
61
62    /// Called whenever a VMO is detached.
63    fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
64
65    /// Called to get block/partition information.
66    fn get_info(&self) -> Cow<'_, DeviceInfo>;
67
68    /// Called for a request to read bytes.
69    fn read(
70        &self,
71        device_block_offset: u64,
72        block_count: u32,
73        vmo: &Arc<zx::Vmo>,
74        vmo_offset: u64, // *bytes* not blocks
75        opts: ReadOptions,
76        trace_flow_id: TraceFlowId,
77    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
78
79    /// Called for a request to write bytes.
80    fn write(
81        &self,
82        device_block_offset: u64,
83        block_count: u32,
84        vmo: &Arc<zx::Vmo>,
85        vmo_offset: u64, // *bytes* not blocks
86        opts: WriteOptions,
87        trace_flow_id: TraceFlowId,
88    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
89
90    /// Called to flush the device.
91    fn flush(
92        &self,
93        trace_flow_id: TraceFlowId,
94    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
95
96    /// Called to trim a region.
97    fn trim(
98        &self,
99        device_block_offset: u64,
100        block_count: u32,
101        trace_flow_id: TraceFlowId,
102    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
103
104    /// Called to handle the GetVolumeInfo FIDL call.
105    fn get_volume_info(
106        &self,
107    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
108    {
109        async { Err(zx::Status::NOT_SUPPORTED) }
110    }
111
112    /// Called to handle the QuerySlices FIDL call.
113    fn query_slices(
114        &self,
115        _start_slices: &[u64],
116    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
117        async { Err(zx::Status::NOT_SUPPORTED) }
118    }
119
120    /// Called to handle the Extend FIDL call.
121    fn extend(
122        &self,
123        _start_slice: u64,
124        _slice_count: u64,
125    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
126        async { Err(zx::Status::NOT_SUPPORTED) }
127    }
128
129    /// Called to handle the Shrink FIDL call.
130    fn shrink(
131        &self,
132        _start_slice: u64,
133        _slice_count: u64,
134    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
135        async { Err(zx::Status::NOT_SUPPORTED) }
136    }
137}
138
139/// A helper object to run a passthrough (proxy) session.
140pub struct PassthroughSession(fblock::SessionProxy);
141
142impl PassthroughSession {
143    pub fn new(proxy: fblock::SessionProxy) -> Self {
144        Self(proxy)
145    }
146
147    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
148        match request {
149            fblock::SessionRequest::GetFifo { responder } => {
150                responder.send(self.0.get_fifo().await?)?;
151            }
152            fblock::SessionRequest::AttachVmo { vmo, responder } => {
153                responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
154            }
155            fblock::SessionRequest::Close { responder } => {
156                responder.send(self.0.close().await?)?;
157            }
158        }
159        Ok(())
160    }
161
162    /// Runs `stream` until completion.
163    pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
164        while let Some(Ok(request)) = stream.next().await {
165            if let Err(error) = self.handle_request(request).await {
166                log::warn!(error:?; "FIDL error");
167            }
168        }
169        Ok(())
170    }
171}
172
173pub struct SessionManager<I: Interface + ?Sized> {
174    interface: Arc<I>,
175    active_requests: ActiveRequests<usize>,
176
177    // NOTE: This must be dropped *after* `active_requests` because we store `Buffer<'_>` with an
178    // erased ('static) lifetime in `ActiveRequest`.
179    buffer_allocator: OnceLock<BufferAllocator>,
180}
181
182impl<I: Interface + ?Sized> Drop for SessionManager<I> {
183    fn drop(&mut self) {
184        if let Some(allocator) = self.buffer_allocator.get() {
185            self.interface.on_detach_vmo(allocator.buffer_source().vmo());
186        }
187    }
188}
189
190impl<I: Interface + ?Sized> SessionManager<I> {
191    pub fn new(interface: Arc<I>) -> Self {
192        Self {
193            interface,
194            active_requests: ActiveRequests::default(),
195            buffer_allocator: OnceLock::new(),
196        }
197    }
198
199    pub fn interface(&self) -> &I {
200        self.interface.as_ref()
201    }
202
203    /// Runs `stream` until completion.
204    pub async fn serve_session(
205        self: Arc<Self>,
206        stream: fblock::SessionRequestStream,
207        offset_map: OffsetMap,
208        block_size: u32,
209    ) -> Result<(), Error> {
210        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
211        let session = Session { helper: Arc::new(helper), interface: self.interface.clone() };
212        let mut stream = stream.fuse();
213        let scope = fasync::Scope::new();
214        let helper = session.helper.clone();
215        let mut fifo_task = scope
216            .spawn(async move {
217                if let Err(status) = session.run_fifo(fifo).await {
218                    if status != zx::Status::PEER_CLOSED {
219                        log::error!(status:?; "FIFO error");
220                    }
221                }
222            })
223            .fuse();
224
225        // Make sure we detach VMOs when we go out of scope.
226        scopeguard::defer! {
227            for (_, (vmo, _)) in helper.take_vmos() {
228                self.interface.on_detach_vmo(&vmo);
229            }
230        }
231
232        loop {
233            futures::select! {
234                maybe_req = stream.next() => {
235                    if let Some(req) = maybe_req {
236                        helper.handle_request(req?).await?;
237                    } else {
238                        break;
239                    }
240                }
241                _ = fifo_task => break,
242            }
243        }
244
245        Ok(())
246    }
247}
248
249struct Session<I: Interface + ?Sized> {
250    interface: Arc<I>,
251    helper: Arc<SessionHelper<SessionManager<I>>>,
252}
253
254impl<I: Interface + ?Sized> Session<I> {
255    // A task loop for receiving and responding to FIFO requests.
256    async fn run_fifo(
257        &self,
258        fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
259    ) -> Result<(), zx::Status> {
260        scopeguard::defer! {
261            self.helper.drop_active_requests(|session| *session == self as *const _ as usize);
262        }
263
264        // The FIFO has to be processed by a single task due to implementation constraints on
265        // fuchsia_async::Fifo.  Thus, we use an event loop to drive the FIFO.  FIFO reads and
266        // writes can happen in batch, and request processing is parallel.
267        //
268        // The general flow is:
269        //  - Read messages from the FIFO, write into `requests`.
270        //  - Read `requests`, decode them, and spawn a task to process them in `active_requests`,
271        //    which will eventually write them into `responses`.
272        //  - Read `responses` and write out to the FIFO.
273        let mut fifo = fasync::Fifo::from_fifo(fifo);
274        let (mut reader, mut writer) = fifo.async_io();
275        let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
276        let active_requests = &self.helper.session_manager.active_requests;
277        let mut active_request_futures = FuturesUnordered::new();
278        let mut responses = Vec::new();
279
280        // We map requests using a single future `map_future`.  `pending_mappings` is used to queue
281        // up requests that need to be mapped.  This will serialise how mappings occur which might
282        // make updating mapping caches simpler.  If this proves to be a performance issue, we can
283        // optimise it.
284        let mut map_future = pin!(Fuse::terminated());
285        let mut pending_mappings: VecDeque<DecodedRequest> = VecDeque::new();
286
287        loop {
288            let new_requests = {
289                // We provide some flow control by limiting how many in-flight requests we will
290                // allow.
291                let pending_requests = active_request_futures.len() + responses.len();
292                let count = requests.len().saturating_sub(pending_requests);
293                let mut receive_requests = pin!(if count == 0 {
294                    Fuse::terminated()
295                } else {
296                    reader.read_entries(&mut requests[..count]).fuse()
297                });
298                let mut send_responses = pin!(if responses.is_empty() {
299                    Fuse::terminated()
300                } else {
301                    poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
302                        match ready!(writer.try_write(cx, &responses[..])) {
303                            Ok(written) => {
304                                responses.drain(..written);
305                                Poll::Ready(Ok(()))
306                            }
307                            Err(status) => Poll::Ready(Err(status)),
308                        }
309                    })
310                    .fuse()
311                });
312
313                // Order is important here.  We want to prioritize sending results on the FIFO and
314                // processing FIFO messages over receiving new ones, to provide flow control.
315                select_biased!(
316                    res = send_responses => {
317                        res?;
318                        0
319                    },
320                    response = active_request_futures.select_next_some() => {
321                        responses.extend(response);
322                        0
323                    }
324                    result = map_future => {
325                        match result {
326                            Ok((request, remainder, commit_decompression_buffers)) => {
327                                active_request_futures.push(self.process_fifo_request(
328                                    request,
329                                    commit_decompression_buffers,
330                                ));
331                                if let Some(remainder) = remainder {
332                                    map_future.set(
333                                        self.map_request_or_get_response(remainder).fuse()
334                                    );
335                                }
336                            }
337                            Err(response) => responses.extend(response),
338                        }
339                        if map_future.is_terminated() {
340                            if let Some(request) = pending_mappings.pop_front() {
341                                map_future.set(self.map_request_or_get_response(request).fuse());
342                            }
343                        }
344                        0
345                    }
346                    count = receive_requests => {
347                        count?
348                    }
349                )
350            };
351
352            // NB: It is very important that there are no `await`s for the rest of the loop body, as
353            // otherwise active requests might become stalled.
354            for request in &mut requests[..new_requests] {
355                match self.helper.decode_fifo_request(self as *const _ as usize, unsafe {
356                    request.assume_init_mut()
357                }) {
358                    Ok(DecodedRequest {
359                        operation: Operation::CloseVmo, vmo, request_id, ..
360                    }) => {
361                        if let Some(vmo) = vmo {
362                            self.interface.on_detach_vmo(vmo.as_ref());
363                        }
364                        responses.extend(
365                            active_requests
366                                .complete_and_take_response(request_id, zx::Status::OK)
367                                .map(|(_, response)| response),
368                        );
369                    }
370                    Ok(request) => {
371                        if map_future.is_terminated() {
372                            map_future.set(self.map_request_or_get_response(request).fuse());
373                        } else {
374                            pending_mappings.push_back(request);
375                        }
376                    }
377                    Err(None) => {}
378                    Err(Some(response)) => responses.push(response),
379                }
380            }
381        }
382    }
383
384    async fn map_request_or_get_response(
385        &self,
386        request: DecodedRequest,
387    ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), Option<BlockFifoResponse>> {
388        let request_id = request.request_id;
389        self.map_request(request).await.map_err(|status| {
390            self.helper
391                .session_manager
392                .active_requests
393                .complete_and_take_response(request_id, status)
394                .map(|(_, r)| r)
395        })
396    }
397
398    // NOTE: The implementation of this currently assumes that we are only processing a single map
399    // request at a time.
400    async fn map_request(
401        &self,
402        mut request: DecodedRequest,
403    ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), zx::Status> {
404        let mut active_requests;
405        let active_request;
406        let mut commit_decompression_buffers = false;
407        let flags = self.interface.get_info().as_ref().device_flags();
408        // Strip the PRE_BARRIER flag if we don't support it, and simulate the barrier with a
409        // pre-flush.
410        if !flags.contains(Flag::BARRIER_SUPPORT)
411            && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
412        {
413            if let Some(id) = request.trace_flow_id {
414                fuchsia_trace::async_instant!(
415                    fuchsia_trace::Id::from(id.get()),
416                    c"storage",
417                    c"block_server::SimulatedBarrier",
418                    "request_id" => request.request_id.0
419                );
420            }
421            self.interface.flush(request.trace_flow_id).await?;
422        }
423
424        // Handle decompressed read operations by turning them into regular read operations.
425        match request.operation {
426            Operation::StartDecompressedRead {
427                required_buffer_size,
428                device_block_offset,
429                block_count,
430                options,
431            } => {
432                let allocator = match self.helper.session_manager.buffer_allocator.get() {
433                    Some(a) => a,
434                    None => {
435                        // This isn't racy because there should only be one `map_request` future
436                        // running at any one time.
437                        let source = BufferSource::new(fblock::MAX_DECOMPRESSED_BYTES as usize);
438                        self.interface.on_attach_vmo(&source.vmo()).await?;
439                        let allocator = BufferAllocator::new(
440                            std::cmp::max(
441                                self.helper.block_size as usize,
442                                zx::system_get_page_size() as usize,
443                            ),
444                            source,
445                        );
446                        self.helper.session_manager.buffer_allocator.set(allocator).unwrap();
447                        self.helper.session_manager.buffer_allocator.get().unwrap()
448                    }
449                };
450
451                if required_buffer_size > fblock::MAX_DECOMPRESSED_BYTES as usize {
452                    return Err(zx::Status::OUT_OF_RANGE);
453                }
454
455                let buffer = allocator.allocate_buffer(required_buffer_size).await;
456                let vmo_offset = buffer.range().start as u64;
457
458                // # Safety
459                //
460                // See below.
461                unsafe fn remove_lifetime(buffer: Buffer<'_>) -> Buffer<'static> {
462                    unsafe { std::mem::transmute(buffer) }
463                }
464
465                active_requests = self.helper.session_manager.active_requests.0.lock();
466                active_request = &mut active_requests.requests[request.request_id.0];
467
468                // SAFETY: We guarantee that `buffer_allocator` is dropped after `active_requests`,
469                // so this should be safe.
470                active_request.decompression_info.as_mut().unwrap().buffer =
471                    Some(unsafe { remove_lifetime(buffer) });
472
473                request.operation = Operation::Read {
474                    device_block_offset,
475                    block_count,
476                    _unused: 0,
477                    vmo_offset,
478                    options,
479                };
480                request.vmo = Some(allocator.buffer_source().vmo().clone());
481
482                commit_decompression_buffers = true;
483            }
484            Operation::ContinueDecompressedRead {
485                offset,
486                device_block_offset,
487                block_count,
488                options,
489            } => {
490                active_requests = self.helper.session_manager.active_requests.0.lock();
491                active_request = &mut active_requests.requests[request.request_id.0];
492
493                let buffer =
494                    active_request.decompression_info.as_ref().unwrap().buffer.as_ref().unwrap();
495
496                // Make sure this read won't overflow our buffer.
497                if offset >= buffer.len() as u64
498                    || buffer.len() as u64 - offset
499                        < block_count as u64 * self.helper.block_size as u64
500                {
501                    return Err(zx::Status::OUT_OF_RANGE);
502                }
503
504                request.operation = Operation::Read {
505                    device_block_offset,
506                    block_count,
507                    _unused: 0,
508                    vmo_offset: buffer.range().start as u64 + offset,
509                    options,
510                };
511
512                let allocator = self.helper.session_manager.buffer_allocator.get().unwrap();
513                request.vmo = Some(allocator.buffer_source().vmo().clone());
514            }
515            _ => {
516                active_requests = self.helper.session_manager.active_requests.0.lock();
517                active_request = &mut active_requests.requests[request.request_id.0];
518            }
519        }
520
521        // NB: We propagate the FORCE_ACCESS flag to *both* request and remainder, even if we're
522        // using simulated FUA.  However, in `process_fifo_request`, we'll only do the post-flush
523        // once the last request completes.
524        self.helper
525            .map_request(request, active_request)
526            .map(|(request, remainder)| (request, remainder, commit_decompression_buffers))
527    }
528
529    /// Processes a fifo request.
530    async fn process_fifo_request(
531        &self,
532        DecodedRequest { request_id, operation, vmo, trace_flow_id }: DecodedRequest,
533        commit_decompression_buffers: bool,
534    ) -> Option<BlockFifoResponse> {
535        let mut needs_postflush = false;
536        let result = match operation {
537            Operation::Read { device_block_offset, block_count, _unused, vmo_offset, options } => {
538                join(
539                    self.interface.read(
540                        device_block_offset,
541                        block_count,
542                        vmo.as_ref().unwrap(),
543                        vmo_offset,
544                        options,
545                        trace_flow_id,
546                    ),
547                    async {
548                        if commit_decompression_buffers {
549                            let (target_slice, buffer_slice, buffer_range) = {
550                                let active_request =
551                                    self.helper.session_manager.active_requests.request(request_id);
552                                let info = active_request.decompression_info.as_ref().unwrap();
553                                (
554                                    info.uncompressed_slice(),
555                                    self.helper
556                                        .session_manager
557                                        .buffer_allocator
558                                        .get()
559                                        .unwrap()
560                                        .buffer_source()
561                                        .slice(),
562                                    info.buffer.as_ref().unwrap().range(),
563                                )
564                            };
565                            let vmar = fuchsia_runtime::vmar_root_self();
566                            // The target slice might not be page aligned.
567                            let addr = target_slice.addr();
568                            let unaligned = addr % zx::system_get_page_size() as usize;
569                            if let Err(error) = vmar.op_range(
570                                zx::VmarOp::COMMIT,
571                                addr - unaligned,
572                                target_slice.len() + unaligned,
573                            ) {
574                                log::warn!(error:?; "Unable to commit target range");
575                            }
576                            // But the buffer range should be.
577                            if let Err(error) = vmar.op_range(
578                                zx::VmarOp::PREFETCH,
579                                buffer_slice.addr() + buffer_range.start,
580                                buffer_range.len(),
581                            ) {
582                                log::warn!(
583                                    error:?,
584                                    buffer_range:?;
585                                    "Unable to prefetch source range",
586                                );
587                            }
588                        }
589                    },
590                )
591                .await
592                .0
593            }
594            Operation::Write {
595                device_block_offset,
596                block_count,
597                _unused,
598                vmo_offset,
599                mut options,
600            } => {
601                // Strip the FORCE_ACCESS flag if we don't support it, and simulate the FUA with a
602                // post-flush.
603                if options.flags.contains(WriteFlags::FORCE_ACCESS) {
604                    let flags = self.interface.get_info().as_ref().device_flags();
605                    if !flags.contains(Flag::FUA_SUPPORT) {
606                        options.flags.remove(WriteFlags::FORCE_ACCESS);
607                        needs_postflush = true;
608                    }
609                }
610                self.interface
611                    .write(
612                        device_block_offset,
613                        block_count,
614                        vmo.as_ref().unwrap(),
615                        vmo_offset,
616                        options,
617                        trace_flow_id,
618                    )
619                    .await
620            }
621            Operation::Flush => self.interface.flush(trace_flow_id).await,
622            Operation::Trim { device_block_offset, block_count } => {
623                self.interface.trim(device_block_offset, block_count, trace_flow_id).await
624            }
625            Operation::CloseVmo
626            | Operation::StartDecompressedRead { .. }
627            | Operation::ContinueDecompressedRead { .. } => {
628                // Handled in main request loop
629                unreachable!()
630            }
631        };
632        let response = self
633            .helper
634            .session_manager
635            .active_requests
636            .complete_and_take_response(request_id, result.into())
637            .map(|(_, r)| r);
638        if let Some(mut response) = response {
639            // Only do the post-flush on the very last request, and only if successful.
640            if zx::Status::from_raw(response.status) == zx::Status::OK && needs_postflush {
641                if let Some(id) = trace_flow_id {
642                    fuchsia_trace::async_instant!(
643                        fuchsia_trace::Id::from(id.get()),
644                        c"storage",
645                        c"block_server::SimulatedFUA",
646                        "request_id" => request_id.0
647                    );
648                }
649                response.status =
650                    zx::Status::from(self.interface.flush(trace_flow_id).await).into_raw();
651            }
652            Some(response)
653        } else {
654            response
655        }
656    }
657}
658
659impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
660    const SUPPORTS_DECOMPRESSION: bool = true;
661
662    // We don't need the session, we just need something unique to identify the session.
663    type Session = usize;
664
665    async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
666        self.interface.on_attach_vmo(vmo).await
667    }
668
669    async fn open_session(
670        self: Arc<Self>,
671        stream: fblock::SessionRequestStream,
672        offset_map: OffsetMap,
673        block_size: u32,
674    ) -> Result<(), Error> {
675        self.interface.clone().open_session(self, stream, offset_map, block_size).await
676    }
677
678    fn get_info(&self) -> Cow<'_, DeviceInfo> {
679        self.interface.get_info()
680    }
681
682    async fn get_volume_info(
683        &self,
684    ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
685        self.interface.get_volume_info().await
686    }
687
688    async fn query_slices(
689        &self,
690        start_slices: &[u64],
691    ) -> Result<Vec<fvolume::VsliceRange>, zx::Status> {
692        self.interface.query_slices(start_slices).await
693    }
694
695    async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
696        self.interface.extend(start_slice, slice_count).await
697    }
698
699    async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
700        self.interface.shrink(start_slice, slice_count).await
701    }
702
703    fn active_requests(&self) -> &ActiveRequests<Self::Session> {
704        return &self.active_requests;
705    }
706}
707
708impl<I: Interface> IntoSessionManager for Arc<I> {
709    type SM = SessionManager<I>;
710
711    fn into_session_manager(self) -> Arc<Self::SM> {
712        Arc::new(SessionManager {
713            interface: self,
714            active_requests: ActiveRequests::default(),
715            buffer_allocator: OnceLock::new(),
716        })
717    }
718}