Skip to main content

block_server/
async_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{
6    ActiveRequests, DecodedRequest, DeviceInfo, FIFO_MAX_REQUESTS, IntoOrchestrator, OffsetMap,
7    Operation, SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags, WriteOptions};
11use fidl_fuchsia_storage_block::DeviceFlag;
12use futures::future::{Fuse, FusedFuture, join};
13use futures::stream::FuturesUnordered;
14use futures::{FutureExt, StreamExt, select_biased};
15use std::borrow::Cow;
16use std::collections::VecDeque;
17use std::future::{Future, poll_fn};
18use std::mem::MaybeUninit;
19use std::pin::pin;
20use std::sync::{Arc, OnceLock};
21use std::task::{Poll, ready};
22use storage_device::buffer::Buffer;
23use storage_device::buffer_allocator::{BufferAllocator, BufferSource};
24use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
25
26pub trait Interface: Send + Sync + Unpin + 'static {
27    /// Runs `stream` to completion.
28    ///
29    /// Implementors can override this method if they want to create a passthrough session instead
30    /// (and can use `[PassthroughSession]` below to do so).  See
31    /// fuchsia.hardware.block.Block/OpenSessionWithOffsetMap.
32    ///
33    /// If the implementor uses a `[PassthroughSession]`, the following Interface methods
34    /// will not be called, and can be stubbed out:
35    ///   - on_attach_vmo
36    ///   - on_detach_vmo
37    ///   - read
38    ///   - write
39    ///   - flush
40    ///   - trim
41    fn open_session(
42        &self,
43        session_manager: Arc<SessionManager<Self>>,
44        stream: fblock::SessionRequestStream,
45        offset_map: OffsetMap,
46        block_size: u32,
47    ) -> impl Future<Output = Result<(), Error>> + Send {
48        // By default, serve the session rather than forwarding/proxying it.
49        session_manager.serve_session(stream, offset_map, block_size)
50    }
51
52    /// Called whenever a VMO is attached, prior to the VMO's usage in any other methods.  Whilst
53    /// the VMO is attached, `vmo` will keep the same address so it is safe to use the pointer
54    /// value (as, say, a key into a HashMap).
55    fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
56        async { Ok(()) }
57    }
58
59    /// Called whenever a VMO is detached.
60    fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
61
62    /// Called to get block/partition information.
63    fn get_info(&self) -> Cow<'_, DeviceInfo>;
64
65    /// Called for a request to read bytes.
66    fn read(
67        &self,
68        device_block_offset: u64,
69        block_count: u32,
70        vmo: &Arc<zx::Vmo>,
71        vmo_offset: u64, // *bytes* not blocks
72        opts: ReadOptions,
73        trace_flow_id: TraceFlowId,
74    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
75
76    /// Called for a request to write bytes.
77    fn write(
78        &self,
79        device_block_offset: u64,
80        block_count: u32,
81        vmo: &Arc<zx::Vmo>,
82        vmo_offset: u64, // *bytes* not blocks
83        opts: WriteOptions,
84        trace_flow_id: TraceFlowId,
85    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
86
87    /// Called to flush the device.
88    fn flush(
89        &self,
90        trace_flow_id: TraceFlowId,
91    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
92
93    /// Called to trim a region.
94    fn trim(
95        &self,
96        device_block_offset: u64,
97        block_count: u32,
98        trace_flow_id: TraceFlowId,
99    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
100
101    /// Called to handle the GetVolumeInfo FIDL call.
102    fn get_volume_info(
103        &self,
104    ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
105    {
106        async { Err(zx::Status::NOT_SUPPORTED) }
107    }
108
109    /// Called to handle the QuerySlices FIDL call.
110    fn query_slices(
111        &self,
112        _start_slices: &[u64],
113    ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
114        async { Err(zx::Status::NOT_SUPPORTED) }
115    }
116
117    /// Called to handle the Extend FIDL call.
118    fn extend(
119        &self,
120        _start_slice: u64,
121        _slice_count: u64,
122    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
123        async { Err(zx::Status::NOT_SUPPORTED) }
124    }
125
126    /// Called to handle the Shrink FIDL call.
127    fn shrink(
128        &self,
129        _start_slice: u64,
130        _slice_count: u64,
131    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
132        async { Err(zx::Status::NOT_SUPPORTED) }
133    }
134}
135
136/// A helper object to run a passthrough (proxy) session.
137pub struct PassthroughSession(fblock::SessionProxy);
138
139impl PassthroughSession {
140    pub fn new(proxy: fblock::SessionProxy) -> Self {
141        Self(proxy)
142    }
143
144    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
145        match request {
146            fblock::SessionRequest::GetFifo { responder } => {
147                responder.send(self.0.get_fifo().await?)?;
148            }
149            fblock::SessionRequest::AttachVmo { vmo, responder } => {
150                responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
151            }
152            fblock::SessionRequest::Close { responder } => {
153                responder.send(self.0.close().await?)?;
154            }
155        }
156        Ok(())
157    }
158
159    /// Runs `stream` until completion.
160    pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
161        while let Some(Ok(request)) = stream.next().await {
162            if let Err(error) = self.handle_request(request).await {
163                log::warn!(error:?; "FIDL error");
164            }
165        }
166        Ok(())
167    }
168}
169
170pub struct SessionManager<I: Interface + ?Sized> {
171    interface: Arc<I>,
172    active_requests: ActiveRequests<usize>,
173
174    // NOTE: This must be dropped *after* `active_requests` because we store `Buffer<'_>` with an
175    // erased ('static) lifetime in `ActiveRequest`.
176    buffer_allocator: OnceLock<BufferAllocator>,
177}
178
179impl<I: Interface + ?Sized> Drop for SessionManager<I> {
180    fn drop(&mut self) {
181        if let Some(allocator) = self.buffer_allocator.get() {
182            self.interface.on_detach_vmo(allocator.buffer_source().vmo());
183        }
184    }
185}
186
187impl<I: Interface + ?Sized> SessionManager<I> {
188    pub fn new(interface: Arc<I>) -> Self {
189        Self {
190            interface,
191            active_requests: ActiveRequests::default(),
192            buffer_allocator: OnceLock::new(),
193        }
194    }
195
196    pub fn interface(&self) -> &I {
197        self.interface.as_ref()
198    }
199
200    /// Runs `stream` until completion.
201    pub async fn serve_session(
202        self: Arc<Self>,
203        stream: fblock::SessionRequestStream,
204        offset_map: OffsetMap,
205        block_size: u32,
206    ) -> Result<(), Error> {
207        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
208        let session = Session { helper: Arc::new(helper), interface: self.interface.clone() };
209        let mut stream = stream.fuse();
210        let scope = fasync::Scope::new();
211        let helper = session.helper.clone();
212        let mut fifo_task = scope
213            .spawn(async move {
214                if let Err(status) = session.run_fifo(fifo).await {
215                    if status != zx::Status::PEER_CLOSED {
216                        log::error!(status:?; "FIFO error");
217                    }
218                }
219            })
220            .fuse();
221
222        // Make sure we detach VMOs when we go out of scope.
223        scopeguard::defer! {
224            for (_, (vmo, _)) in helper.take_vmos() {
225                self.interface.on_detach_vmo(&vmo);
226            }
227        }
228
229        loop {
230            futures::select! {
231                maybe_req = stream.next() => {
232                    if let Some(req) = maybe_req {
233                        helper.handle_request(req?).await?;
234                    } else {
235                        break;
236                    }
237                }
238                _ = fifo_task => break,
239            }
240        }
241
242        Ok(())
243    }
244}
245
246struct Session<I: Interface + ?Sized> {
247    interface: Arc<I>,
248    helper: Arc<SessionHelper<SessionManager<I>>>,
249}
250
251impl<I: Interface + ?Sized> Session<I> {
252    // A task loop for receiving and responding to FIFO requests.
253    async fn run_fifo(
254        &self,
255        fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
256    ) -> Result<(), zx::Status> {
257        scopeguard::defer! {
258            self.helper.drop_active_requests(|session| *session == self as *const _ as usize);
259        }
260
261        // The FIFO has to be processed by a single task due to implementation constraints on
262        // fuchsia_async::Fifo.  Thus, we use an event loop to drive the FIFO.  FIFO reads and
263        // writes can happen in batch, and request processing is parallel.
264        //
265        // The general flow is:
266        //  - Read messages from the FIFO, write into `requests`.
267        //  - Read `requests`, decode them, and spawn a task to process them in `active_requests`,
268        //    which will eventually write them into `responses`.
269        //  - Read `responses` and write out to the FIFO.
270        let mut fifo = fasync::Fifo::from_fifo(fifo);
271        let (mut reader, mut writer) = fifo.async_io();
272        let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
273        let active_requests = &self.helper.session_manager().active_requests;
274        let mut active_request_futures = FuturesUnordered::new();
275        let mut responses = Vec::new();
276
277        // We map requests using a single future `map_future`.  `pending_mappings` is used to queue
278        // up requests that need to be mapped.  This will serialise how mappings occur which might
279        // make updating mapping caches simpler.  If this proves to be a performance issue, we can
280        // optimise it.
281        let mut map_future = pin!(Fuse::terminated());
282        let mut pending_mappings: VecDeque<DecodedRequest> = VecDeque::new();
283
284        loop {
285            let new_requests = {
286                // We provide some flow control by limiting how many in-flight requests we will
287                // allow.
288                let pending_requests = active_request_futures.len() + responses.len();
289                let count = requests.len().saturating_sub(pending_requests);
290                let mut receive_requests = pin!(if count == 0 {
291                    Fuse::terminated()
292                } else {
293                    reader.read_entries(&mut requests[..count]).fuse()
294                });
295                let mut send_responses = pin!(if responses.is_empty() {
296                    Fuse::terminated()
297                } else {
298                    poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
299                        match ready!(writer.try_write(cx, &responses[..])) {
300                            Ok(written) => {
301                                responses.drain(..written);
302                                Poll::Ready(Ok(()))
303                            }
304                            Err(status) => Poll::Ready(Err(status)),
305                        }
306                    })
307                    .fuse()
308                });
309
310                // Order is important here.  We want to prioritize sending results on the FIFO and
311                // processing FIFO messages over receiving new ones, to provide flow control.
312                select_biased!(
313                    res = send_responses => {
314                        res?;
315                        0
316                    },
317                    response = active_request_futures.select_next_some() => {
318                        responses.extend(response);
319                        0
320                    }
321                    result = map_future => {
322                        match result {
323                            Ok((request, remainder, commit_decompression_buffers)) => {
324                                active_request_futures.push(self.process_fifo_request(
325                                    request,
326                                    commit_decompression_buffers,
327                                ));
328                                if let Some(remainder) = remainder {
329                                    map_future.set(
330                                        self.map_request_or_get_response(remainder).fuse()
331                                    );
332                                }
333                            }
334                            Err(response) => responses.extend(response),
335                        }
336                        if map_future.is_terminated() {
337                            if let Some(request) = pending_mappings.pop_front() {
338                                map_future.set(self.map_request_or_get_response(request).fuse());
339                            }
340                        }
341                        0
342                    }
343                    count = receive_requests => {
344                        count?
345                    }
346                )
347            };
348
349            // NB: It is very important that there are no `await`s for the rest of the loop body, as
350            // otherwise active requests might become stalled.
351            for request in &mut requests[..new_requests] {
352                match self.helper.decode_fifo_request(self as *const _ as usize, unsafe {
353                    request.assume_init_mut()
354                }) {
355                    Ok(DecodedRequest {
356                        operation: Operation::CloseVmo, vmo, request_id, ..
357                    }) => {
358                        if let Some(vmo) = vmo {
359                            self.interface.on_detach_vmo(vmo.as_ref());
360                        }
361                        responses.extend(
362                            active_requests
363                                .complete_and_take_response(request_id, zx::Status::OK)
364                                .map(|(_, response)| response),
365                        );
366                    }
367                    Ok(request) => {
368                        if map_future.is_terminated() {
369                            map_future.set(self.map_request_or_get_response(request).fuse());
370                        } else {
371                            pending_mappings.push_back(request);
372                        }
373                    }
374                    Err(None) => {}
375                    Err(Some(response)) => responses.push(response),
376                }
377            }
378        }
379    }
380
381    async fn map_request_or_get_response(
382        &self,
383        request: DecodedRequest,
384    ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), Option<BlockFifoResponse>> {
385        let request_id = request.request_id;
386        self.map_request(request).await.map_err(|status| {
387            self.helper
388                .orchestrator
389                .active_requests
390                .complete_and_take_response(request_id, status)
391                .map(|(_, r)| r)
392        })
393    }
394
395    // NOTE: The implementation of this currently assumes that we are only processing a single map
396    // request at a time.
397    async fn map_request(
398        &self,
399        mut request: DecodedRequest,
400    ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), zx::Status> {
401        let mut active_requests;
402        let active_request;
403        let mut commit_decompression_buffers = false;
404        let flags = self.interface.get_info().as_ref().device_flags();
405        // Strip the PRE_BARRIER flag if we don't support it, and simulate the barrier with a
406        // pre-flush.
407        if !flags.contains(DeviceFlag::BARRIER_SUPPORT)
408            && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
409        {
410            if let Some(id) = request.trace_flow_id {
411                fuchsia_trace::async_instant!(
412                    fuchsia_trace::Id::from(id.get()),
413                    "storage",
414                    "block_server::SimulatedBarrier",
415                    "request_id" => request.request_id.0
416                );
417            }
418            self.interface.flush(request.trace_flow_id).await?;
419        }
420
421        // Handle decompressed read operations by turning them into regular read operations.
422        match request.operation {
423            Operation::StartDecompressedRead {
424                required_buffer_size,
425                device_block_offset,
426                block_count,
427                options,
428            } => {
429                let allocator = match self.helper.session_manager().buffer_allocator.get() {
430                    Some(a) => a,
431                    None => {
432                        // This isn't racy because there should only be one `map_request` future
433                        // running at any one time.
434                        let source = BufferSource::new(fblock::MAX_DECOMPRESSED_BYTES as usize);
435                        self.interface.on_attach_vmo(&source.vmo()).await?;
436                        let allocator = BufferAllocator::new(
437                            std::cmp::max(
438                                self.helper.block_size as usize,
439                                zx::system_get_page_size() as usize,
440                            ),
441                            source,
442                        );
443                        self.helper.session_manager().buffer_allocator.set(allocator).unwrap();
444                        self.helper.session_manager().buffer_allocator.get().unwrap()
445                    }
446                };
447
448                if required_buffer_size > fblock::MAX_DECOMPRESSED_BYTES as usize {
449                    return Err(zx::Status::OUT_OF_RANGE);
450                }
451
452                let buffer = allocator.allocate_buffer(required_buffer_size).await;
453                let vmo_offset = buffer.range().start as u64;
454
455                // # Safety
456                //
457                // See below.
458                unsafe fn remove_lifetime(buffer: Buffer<'_>) -> Buffer<'static> {
459                    unsafe { std::mem::transmute(buffer) }
460                }
461
462                active_requests = self.helper.session_manager().active_requests.0.lock();
463                active_request = &mut active_requests.requests[request.request_id.0];
464
465                // SAFETY: We guarantee that `buffer_allocator` is dropped after `active_requests`,
466                // so this should be safe.
467                active_request.decompression_info.as_mut().unwrap().buffer =
468                    Some(unsafe { remove_lifetime(buffer) });
469
470                request.operation = Operation::Read {
471                    device_block_offset,
472                    block_count,
473                    _unused: 0,
474                    vmo_offset,
475                    options,
476                };
477                request.vmo = Some(allocator.buffer_source().vmo().clone());
478
479                commit_decompression_buffers = true;
480            }
481            Operation::ContinueDecompressedRead {
482                offset,
483                device_block_offset,
484                block_count,
485                options,
486            } => {
487                active_requests = self.helper.session_manager().active_requests.0.lock();
488                active_request = &mut active_requests.requests[request.request_id.0];
489
490                let buffer =
491                    active_request.decompression_info.as_ref().unwrap().buffer.as_ref().unwrap();
492
493                // Make sure this read won't overflow our buffer.
494                if offset >= buffer.len() as u64
495                    || buffer.len() as u64 - offset
496                        < block_count as u64 * self.helper.block_size as u64
497                {
498                    return Err(zx::Status::OUT_OF_RANGE);
499                }
500
501                request.operation = Operation::Read {
502                    device_block_offset,
503                    block_count,
504                    _unused: 0,
505                    vmo_offset: buffer.range().start as u64 + offset,
506                    options,
507                };
508
509                let allocator = self.helper.session_manager().buffer_allocator.get().unwrap();
510                request.vmo = Some(allocator.buffer_source().vmo().clone());
511            }
512            _ => {
513                active_requests = self.helper.session_manager().active_requests.0.lock();
514                active_request = &mut active_requests.requests[request.request_id.0];
515            }
516        }
517
518        // NB: We propagate the FORCE_ACCESS flag to *both* request and remainder, even if we're
519        // using simulated FUA.  However, in `process_fifo_request`, we'll only do the post-flush
520        // once the last request completes.
521        self.helper
522            .map_request(request, active_request)
523            .map(|(request, remainder)| (request, remainder, commit_decompression_buffers))
524    }
525
526    /// Processes a fifo request.
527    async fn process_fifo_request(
528        &self,
529        DecodedRequest { request_id, operation, vmo, trace_flow_id }: DecodedRequest,
530        commit_decompression_buffers: bool,
531    ) -> Option<BlockFifoResponse> {
532        let mut needs_postflush = false;
533        let result = match operation {
534            Operation::Read { device_block_offset, block_count, _unused, vmo_offset, options } => {
535                join(
536                    self.interface.read(
537                        device_block_offset,
538                        block_count,
539                        vmo.as_ref().unwrap(),
540                        vmo_offset,
541                        options,
542                        trace_flow_id,
543                    ),
544                    async {
545                        if commit_decompression_buffers {
546                            let (target_slice, buffer_slice, buffer_range) = {
547                                let active_request = self
548                                    .helper
549                                    .session_manager()
550                                    .active_requests
551                                    .request(request_id);
552                                let info = active_request.decompression_info.as_ref().unwrap();
553                                (
554                                    info.uncompressed_slice(),
555                                    self.helper
556                                        .orchestrator
557                                        .buffer_allocator
558                                        .get()
559                                        .unwrap()
560                                        .buffer_source()
561                                        .slice(),
562                                    info.buffer.as_ref().unwrap().range(),
563                                )
564                            };
565                            let vmar = fuchsia_runtime::vmar_root_self();
566                            // The target slice might not be page aligned.
567                            let addr = target_slice.addr();
568                            let unaligned = addr % zx::system_get_page_size() as usize;
569                            if let Err(error) = vmar.op_range(
570                                zx::VmarOp::COMMIT,
571                                addr - unaligned,
572                                target_slice.len() + unaligned,
573                            ) {
574                                log::warn!(error:?; "Unable to commit target range");
575                            }
576                            // But the buffer range should be.
577                            if let Err(error) = vmar.op_range(
578                                zx::VmarOp::PREFETCH,
579                                buffer_slice.addr() + buffer_range.start,
580                                buffer_range.len(),
581                            ) {
582                                log::warn!(
583                                    error:?,
584                                    buffer_range:?;
585                                    "Unable to prefetch source range",
586                                );
587                            }
588                        }
589                    },
590                )
591                .await
592                .0
593            }
594            Operation::Write {
595                device_block_offset,
596                block_count,
597                _unused,
598                vmo_offset,
599                mut options,
600            } => {
601                // Strip the FORCE_ACCESS flag if we don't support it, and simulate the FUA with a
602                // post-flush.
603                if options.flags.contains(WriteFlags::FORCE_ACCESS) {
604                    let flags = self.interface.get_info().as_ref().device_flags();
605                    if !flags.contains(DeviceFlag::FUA_SUPPORT) {
606                        options.flags.remove(WriteFlags::FORCE_ACCESS);
607                        needs_postflush = true;
608                    }
609                }
610                self.interface
611                    .write(
612                        device_block_offset,
613                        block_count,
614                        vmo.as_ref().unwrap(),
615                        vmo_offset,
616                        options,
617                        trace_flow_id,
618                    )
619                    .await
620            }
621            Operation::Flush => self.interface.flush(trace_flow_id).await,
622            Operation::Trim { device_block_offset, block_count } => {
623                self.interface.trim(device_block_offset, block_count, trace_flow_id).await
624            }
625            Operation::CloseVmo
626            | Operation::StartDecompressedRead { .. }
627            | Operation::ContinueDecompressedRead { .. } => {
628                // Handled in main request loop
629                unreachable!()
630            }
631        };
632        let response = self
633            .helper
634            .orchestrator
635            .active_requests
636            .complete_and_take_response(request_id, result.into())
637            .map(|(_, r)| r);
638        if let Some(mut response) = response {
639            // Only do the post-flush on the very last request, and only if successful.
640            if zx::Status::from_raw(response.status) == zx::Status::OK && needs_postflush {
641                if let Some(id) = trace_flow_id {
642                    fuchsia_trace::async_instant!(
643                        fuchsia_trace::Id::from(id.get()),
644                        "storage",
645                        "block_server::SimulatedFUA",
646                        "request_id" => request_id.0
647                    );
648                }
649                response.status =
650                    zx::Status::from(self.interface.flush(trace_flow_id).await).into_raw();
651            }
652            Some(response)
653        } else {
654            response
655        }
656    }
657}
658
659impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
660    type Orchestrator = Self;
661
662    const SUPPORTS_DECOMPRESSION: bool = true;
663
664    // We don't need the session, we just need something unique to identify the session.
665    type Session = usize;
666
667    async fn on_attach_vmo(orchestrator: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
668        I::on_attach_vmo(&orchestrator.interface, vmo).await
669    }
670
671    async fn open_session(
672        orchestrator: Arc<Self>,
673        stream: fblock::SessionRequestStream,
674        offset_map: OffsetMap,
675        block_size: u32,
676    ) -> Result<(), Error> {
677        I::open_session(
678            &orchestrator.interface,
679            orchestrator.clone(),
680            stream,
681            offset_map,
682            block_size,
683        )
684        .await
685    }
686
687    fn get_info(&self) -> Cow<'_, DeviceInfo> {
688        self.interface.get_info()
689    }
690
691    async fn get_volume_info(
692        &self,
693    ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
694        self.interface.get_volume_info().await
695    }
696
697    async fn query_slices(
698        &self,
699        start_slices: &[u64],
700    ) -> Result<Vec<fblock::VsliceRange>, zx::Status> {
701        self.interface.query_slices(start_slices).await
702    }
703
704    async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
705        self.interface.extend(start_slice, slice_count).await
706    }
707
708    async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
709        self.interface.shrink(start_slice, slice_count).await
710    }
711
712    fn active_requests(&self) -> &ActiveRequests<Self::Session> {
713        return &self.active_requests;
714    }
715}
716
717impl<I: Interface> IntoOrchestrator for Arc<I> {
718    type SM = SessionManager<I>;
719
720    fn into_orchestrator(self) -> Arc<Self::SM> {
721        Arc::new(SessionManager {
722            interface: self,
723            active_requests: ActiveRequests::default(),
724            buffer_allocator: OnceLock::new(),
725        })
726    }
727}