Skip to main content

block_server/
async_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{
6    ActiveRequests, DecodedRequest, DeviceInfo, FIFO_MAX_REQUESTS, IntoOrchestrator, OffsetMap,
7    Operation, SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags, WriteOptions};
11use fidl_fuchsia_storage_block::DeviceFlag;
12use futures::future::{Fuse, FusedFuture, join};
13use futures::stream::FuturesUnordered;
14use futures::{FutureExt, StreamExt, select_biased};
15use std::borrow::Cow;
16use std::collections::VecDeque;
17use std::future::{Future, poll_fn};
18use std::mem::MaybeUninit;
19use std::pin::pin;
20use std::sync::{Arc, OnceLock};
21use std::task::{Poll, ready};
22use storage_device::buffer::Buffer;
23use storage_device::buffer_allocator::{BufferAllocator, BufferSource};
24use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
25
26pub trait Interface: Send + Sync + Unpin + 'static {
27    /// Runs `stream` to completion.
28    ///
29    /// Implementors can override this method if they want to create a passthrough session instead
30    /// (and can use `[PassthroughSession]` below to do so).  See
31    /// fuchsia.hardware.block.Block/OpenSessionWithOffsetMap.
32    ///
33    /// If the implementor uses a `[PassthroughSession]`, the following Interface methods
34    /// will not be called, and can be stubbed out:
35    ///   - on_attach_vmo
36    ///   - on_detach_vmo
37    ///   - read
38    ///   - write
39    ///   - flush
40    ///   - trim
41    fn open_session(
42        &self,
43        session_manager: Arc<SessionManager<Self>>,
44        stream: fblock::SessionRequestStream,
45        offset_map: OffsetMap,
46        block_size: u32,
47    ) -> impl Future<Output = Result<(), Error>> + Send {
48        // By default, serve the session rather than forwarding/proxying it.
49        session_manager.serve_session(stream, offset_map, block_size)
50    }
51
52    /// Called whenever a VMO is attached, prior to the VMO's usage in any other methods.  Whilst
53    /// the VMO is attached, `vmo` will keep the same address so it is safe to use the pointer
54    /// value (as, say, a key into a HashMap).
55    fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
56        async { Ok(()) }
57    }
58
59    /// Called whenever a VMO is detached.
60    fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
61
62    /// Called to get block/partition information.
63    fn get_info(&self) -> Cow<'_, DeviceInfo>;
64
65    /// Called for a request to read bytes.
66    fn read(
67        &self,
68        device_block_offset: u64,
69        block_count: u32,
70        vmo: &Arc<zx::Vmo>,
71        vmo_offset: u64, // *bytes* not blocks
72        opts: ReadOptions,
73        trace_flow_id: TraceFlowId,
74    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
75
76    /// Called for a request to write bytes.
77    fn write(
78        &self,
79        device_block_offset: u64,
80        block_count: u32,
81        vmo: &Arc<zx::Vmo>,
82        vmo_offset: u64, // *bytes* not blocks
83        opts: WriteOptions,
84        trace_flow_id: TraceFlowId,
85    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
86
87    /// Called to flush the device.
88    fn flush(
89        &self,
90        trace_flow_id: TraceFlowId,
91    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
92
93    /// Called to trim a region.
94    fn trim(
95        &self,
96        device_block_offset: u64,
97        block_count: u32,
98        trace_flow_id: TraceFlowId,
99    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
100
101    /// Called to handle the GetVolumeInfo FIDL call.
102    fn get_volume_info(
103        &self,
104    ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
105    {
106        async { Err(zx::Status::NOT_SUPPORTED) }
107    }
108
109    /// Called to handle the QuerySlices FIDL call.
110    fn query_slices(
111        &self,
112        _start_slices: &[u64],
113    ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
114        async { Err(zx::Status::NOT_SUPPORTED) }
115    }
116
117    /// Called to handle the Extend FIDL call.
118    fn extend(
119        &self,
120        _start_slice: u64,
121        _slice_count: u64,
122    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
123        async { Err(zx::Status::NOT_SUPPORTED) }
124    }
125
126    /// Called to handle the Shrink FIDL call.
127    fn shrink(
128        &self,
129        _start_slice: u64,
130        _slice_count: u64,
131    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
132        async { Err(zx::Status::NOT_SUPPORTED) }
133    }
134}
135
136/// A helper object to run a passthrough (proxy) session.
137pub struct PassthroughSession(fblock::SessionProxy);
138
139impl PassthroughSession {
140    pub fn new(proxy: fblock::SessionProxy) -> Self {
141        Self(proxy)
142    }
143
144    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
145        match request {
146            fblock::SessionRequest::GetFifo { responder } => {
147                responder.send(self.0.get_fifo().await?)?;
148            }
149            fblock::SessionRequest::AttachVmo { vmo, responder } => {
150                responder.send(self.0.attach_vmo(vmo).await?.as_ref().map_err(|s| *s))?;
151            }
152            fblock::SessionRequest::Close { responder } => {
153                responder.send(self.0.close().await?)?;
154            }
155        }
156        Ok(())
157    }
158
159    /// Runs `stream` until completion.
160    pub async fn serve(&self, mut stream: fblock::SessionRequestStream) -> Result<(), Error> {
161        while let Some(Ok(request)) = stream.next().await {
162            if let Err(error) = self.handle_request(request).await {
163                log::warn!(error:?; "FIDL error");
164            }
165        }
166        Ok(())
167    }
168}
169
170pub struct SessionManager<I: Interface + ?Sized> {
171    interface: Arc<I>,
172    active_requests: ActiveRequests<usize>,
173
174    // NOTE: This must be dropped *after* `active_requests` because we store `Buffer<'_>` with an
175    // erased ('static) lifetime in `ActiveRequest`.
176    buffer_allocator: OnceLock<BufferAllocator>,
177}
178
179impl<I: Interface + ?Sized> Drop for SessionManager<I> {
180    fn drop(&mut self) {
181        if let Some(allocator) = self.buffer_allocator.get() {
182            self.interface.on_detach_vmo(allocator.buffer_source().vmo());
183        }
184    }
185}
186
187impl<I: Interface + ?Sized> SessionManager<I> {
188    pub fn new(interface: Arc<I>) -> Self {
189        Self {
190            interface,
191            active_requests: ActiveRequests::default(),
192            buffer_allocator: OnceLock::new(),
193        }
194    }
195
196    pub fn interface(&self) -> &I {
197        self.interface.as_ref()
198    }
199
200    /// Runs `stream` until completion.
201    pub async fn serve_session(
202        self: Arc<Self>,
203        stream: fblock::SessionRequestStream,
204        offset_map: OffsetMap,
205        block_size: u32,
206    ) -> Result<(), Error> {
207        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
208        let session = Session { helper: Arc::new(helper), interface: self.interface.clone() };
209        let mut stream = stream.fuse();
210        let scope = fasync::Scope::new();
211        let helper = session.helper.clone();
212        let mut fifo_task = scope
213            .spawn(async move {
214                if let Err(status) = session.run_fifo(fifo).await {
215                    if status != zx::Status::PEER_CLOSED {
216                        log::error!(status:?; "FIFO error");
217                    }
218                }
219            })
220            .fuse();
221
222        // Make sure we detach VMOs when we go out of scope.
223        scopeguard::defer! {
224            for (_, (vmo, _)) in helper.take_vmos() {
225                self.interface.on_detach_vmo(&vmo);
226            }
227        }
228
229        loop {
230            futures::select! {
231                maybe_req = stream.next() => {
232                    if let Some(req) = maybe_req {
233                        helper.handle_request(req?).await?;
234                    } else {
235                        break;
236                    }
237                }
238                _ = fifo_task => break,
239            }
240        }
241
242        Ok(())
243    }
244}
245
246struct Session<I: Interface + ?Sized> {
247    interface: Arc<I>,
248    helper: Arc<SessionHelper<SessionManager<I>>>,
249}
250
251impl<I: Interface + ?Sized> Session<I> {
252    // A task loop for receiving and responding to FIFO requests.
253    async fn run_fifo(
254        &self,
255        fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
256    ) -> Result<(), zx::Status> {
257        scopeguard::defer! {
258            // Ensure that we always clean up active requests for this session upon FIFO
259            // termination.
260            self.helper.drop_active_requests(|session| *session == self as *const _ as usize);
261        }
262
263        // The FIFO has to be processed by a single task due to implementation constraints on
264        // fuchsia_async::Fifo.  Thus, we use an event loop to drive the FIFO.  FIFO reads and
265        // writes can happen in batch, and request processing is parallel.
266        //
267        // The general flow is:
268        //  - Read messages from the FIFO, write into `requests`.
269        //  - Read `requests`, decode them, and spawn a task to process them in `active_requests`,
270        //    which will eventually write them into `responses`.
271        //  - Read `responses` and write out to the FIFO.
272        let mut fifo = fasync::Fifo::from_fifo(fifo);
273        let (mut reader, mut writer) = fifo.async_io();
274        let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); FIFO_MAX_REQUESTS];
275        let active_requests = &self.helper.session_manager().active_requests;
276        let mut active_request_futures = FuturesUnordered::new();
277        let mut responses = Vec::new();
278
279        // We map requests using a single future `map_future`.  `pending_mappings` is used to queue
280        // up requests that need to be mapped.  This will serialise how mappings occur which might
281        // make updating mapping caches simpler.  If this proves to be a performance issue, we can
282        // optimise it.
283        let mut map_future = pin!(Fuse::terminated());
284        let mut pending_mappings: VecDeque<DecodedRequest> = VecDeque::new();
285
286        loop {
287            let new_requests = {
288                // We provide some flow control by limiting how many in-flight requests we will
289                // allow.
290                let pending_requests = active_request_futures.len() + responses.len();
291                let count = requests.len().saturating_sub(pending_requests);
292                let mut receive_requests = pin!(if count == 0 {
293                    Fuse::terminated()
294                } else {
295                    reader.read_entries(&mut requests[..count]).fuse()
296                });
297                let mut send_responses = pin!(if responses.is_empty() {
298                    Fuse::terminated()
299                } else {
300                    poll_fn(|cx| -> Poll<Result<(), zx::Status>> {
301                        match ready!(writer.try_write(cx, &responses[..])) {
302                            Ok(written) => {
303                                responses.drain(..written);
304                                Poll::Ready(Ok(()))
305                            }
306                            Err(status) => Poll::Ready(Err(status)),
307                        }
308                    })
309                    .fuse()
310                });
311
312                // Order is important here.  We want to prioritize sending results on the FIFO and
313                // processing FIFO messages over receiving new ones, to provide flow control.
314                select_biased!(
315                    res = send_responses => {
316                        res?;
317                        0
318                    },
319                    response = active_request_futures.select_next_some() => {
320                        responses.extend(response);
321                        0
322                    }
323                    result = map_future => {
324                        match result {
325                            Ok((request, remainder, commit_decompression_buffers)) => {
326                                active_request_futures.push(self.process_fifo_request(
327                                    request,
328                                    commit_decompression_buffers,
329                                ));
330                                if let Some(remainder) = remainder {
331                                    map_future.set(
332                                        self.map_request_or_get_response(remainder).fuse()
333                                    );
334                                }
335                            }
336                            Err(response) => responses.extend(response),
337                        }
338                        if map_future.is_terminated() {
339                            if let Some(request) = pending_mappings.pop_front() {
340                                map_future.set(self.map_request_or_get_response(request).fuse());
341                            }
342                        }
343                        0
344                    }
345                    count = receive_requests => {
346                        count?
347                    }
348                )
349            };
350
351            // NB: It is very important that there are no `await`s for the rest of the loop body, as
352            // otherwise active requests might become stalled.
353            for request in &mut requests[..new_requests] {
354                match self.helper.decode_fifo_request(self as *const _ as usize, unsafe {
355                    request.assume_init_mut()
356                }) {
357                    Ok(DecodedRequest {
358                        operation: Operation::CloseVmo, vmo, request_id, ..
359                    }) => {
360                        if let Some(vmo) = vmo {
361                            self.interface.on_detach_vmo(vmo.as_ref());
362                        }
363                        responses.extend(
364                            active_requests
365                                .complete_and_take_response(request_id, zx::Status::OK)
366                                .map(|(_, response)| response),
367                        );
368                    }
369                    Ok(request) => {
370                        if map_future.is_terminated() {
371                            map_future.set(self.map_request_or_get_response(request).fuse());
372                        } else {
373                            pending_mappings.push_back(request);
374                        }
375                    }
376                    Err(None) => {}
377                    Err(Some(response)) => responses.push(response),
378                }
379            }
380        }
381    }
382
383    async fn map_request_or_get_response(
384        &self,
385        request: DecodedRequest,
386    ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), Option<BlockFifoResponse>> {
387        let request_id = request.request_id;
388        self.map_request(request).await.map_err(|status| {
389            self.helper
390                .orchestrator
391                .active_requests
392                .complete_and_take_response(request_id, status)
393                .map(|(_, r)| r)
394        })
395    }
396
397    // NOTE: The implementation of this currently assumes that we are only processing a single map
398    // request at a time.
399    async fn map_request(
400        &self,
401        mut request: DecodedRequest,
402    ) -> Result<(DecodedRequest, Option<DecodedRequest>, bool), zx::Status> {
403        let mut active_requests;
404        let active_request;
405        let mut commit_decompression_buffers = false;
406        let flags = self.interface.get_info().as_ref().device_flags();
407        // Strip the PRE_BARRIER flag if we don't support it, and simulate the barrier with a
408        // pre-flush.
409        if !flags.contains(DeviceFlag::BARRIER_SUPPORT)
410            && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
411        {
412            if let Some(id) = request.trace_flow_id {
413                fuchsia_trace::async_instant!(
414                    fuchsia_trace::Id::from(id.get()),
415                    "storage",
416                    "block_server::SimulatedBarrier",
417                    "request_id" => request.request_id.0
418                );
419            }
420            self.interface.flush(request.trace_flow_id).await?;
421        }
422
423        // Handle decompressed read operations by turning them into regular read operations.
424        match request.operation {
425            Operation::StartDecompressedRead {
426                required_buffer_size,
427                device_block_offset,
428                block_count,
429                options,
430            } => {
431                let allocator = match self.helper.session_manager().buffer_allocator.get() {
432                    Some(a) => a,
433                    None => {
434                        // This isn't racy because there should only be one `map_request` future
435                        // running at any one time.
436                        let source = BufferSource::new(fblock::MAX_DECOMPRESSED_BYTES as usize);
437                        self.interface.on_attach_vmo(&source.vmo()).await?;
438                        let allocator = BufferAllocator::new(
439                            std::cmp::max(
440                                self.helper.block_size as usize,
441                                zx::system_get_page_size() as usize,
442                            ),
443                            source,
444                        );
445                        self.helper.session_manager().buffer_allocator.set(allocator).unwrap();
446                        self.helper.session_manager().buffer_allocator.get().unwrap()
447                    }
448                };
449
450                if required_buffer_size > fblock::MAX_DECOMPRESSED_BYTES as usize {
451                    return Err(zx::Status::OUT_OF_RANGE);
452                }
453
454                let buffer = allocator.allocate_buffer(required_buffer_size).await;
455                let vmo_offset = buffer.range().start as u64;
456
457                // # Safety
458                //
459                // See below.
460                unsafe fn remove_lifetime(buffer: Buffer<'_>) -> Buffer<'static> {
461                    unsafe { std::mem::transmute(buffer) }
462                }
463
464                active_requests = self.helper.session_manager().active_requests.0.lock();
465                active_request = &mut active_requests.requests[request.request_id.0];
466
467                // SAFETY: We guarantee that `buffer_allocator` is dropped after `active_requests`,
468                // so this should be safe.
469                active_request.decompression_info.as_mut().unwrap().buffer =
470                    Some(unsafe { remove_lifetime(buffer) });
471
472                request.operation = Operation::Read {
473                    device_block_offset,
474                    block_count,
475                    _unused: 0,
476                    vmo_offset,
477                    options,
478                };
479                request.vmo = Some(allocator.buffer_source().vmo().clone());
480
481                commit_decompression_buffers = true;
482            }
483            Operation::ContinueDecompressedRead {
484                offset,
485                device_block_offset,
486                block_count,
487                options,
488            } => {
489                active_requests = self.helper.session_manager().active_requests.0.lock();
490                active_request = &mut active_requests.requests[request.request_id.0];
491
492                let buffer =
493                    active_request.decompression_info.as_ref().unwrap().buffer.as_ref().unwrap();
494
495                // Make sure this read won't overflow our buffer.
496                if offset >= buffer.len() as u64
497                    || buffer.len() as u64 - offset
498                        < block_count as u64 * self.helper.block_size as u64
499                {
500                    return Err(zx::Status::OUT_OF_RANGE);
501                }
502
503                request.operation = Operation::Read {
504                    device_block_offset,
505                    block_count,
506                    _unused: 0,
507                    vmo_offset: buffer.range().start as u64 + offset,
508                    options,
509                };
510
511                let allocator = self.helper.session_manager().buffer_allocator.get().unwrap();
512                request.vmo = Some(allocator.buffer_source().vmo().clone());
513            }
514            _ => {
515                active_requests = self.helper.session_manager().active_requests.0.lock();
516                active_request = &mut active_requests.requests[request.request_id.0];
517            }
518        }
519
520        // NB: We propagate the FORCE_ACCESS flag to *both* request and remainder, even if we're
521        // using simulated FUA.  However, in `process_fifo_request`, we'll only do the post-flush
522        // once the last request completes.
523        self.helper
524            .map_request(request, active_request)
525            .map(|(request, remainder)| (request, remainder, commit_decompression_buffers))
526    }
527
528    /// Processes a fifo request.
529    async fn process_fifo_request(
530        &self,
531        DecodedRequest { request_id, operation, vmo, trace_flow_id }: DecodedRequest,
532        commit_decompression_buffers: bool,
533    ) -> Option<BlockFifoResponse> {
534        let mut needs_postflush = false;
535        let result = match operation {
536            Operation::Read { device_block_offset, block_count, _unused, vmo_offset, options } => {
537                join(
538                    self.interface.read(
539                        device_block_offset,
540                        block_count,
541                        vmo.as_ref().unwrap(),
542                        vmo_offset,
543                        options,
544                        trace_flow_id,
545                    ),
546                    async {
547                        if commit_decompression_buffers {
548                            let (target_slice, buffer_slice, buffer_range) = {
549                                let active_request = self
550                                    .helper
551                                    .session_manager()
552                                    .active_requests
553                                    .request(request_id);
554                                let info = active_request.decompression_info.as_ref().unwrap();
555                                (
556                                    info.uncompressed_slice(),
557                                    self.helper
558                                        .orchestrator
559                                        .buffer_allocator
560                                        .get()
561                                        .unwrap()
562                                        .buffer_source()
563                                        .slice(),
564                                    info.buffer.as_ref().unwrap().range(),
565                                )
566                            };
567                            let vmar = fuchsia_runtime::vmar_root_self();
568                            // The target slice might not be page aligned.
569                            let addr = target_slice.addr();
570                            let unaligned = addr % zx::system_get_page_size() as usize;
571                            if let Err(error) = vmar.op_range(
572                                zx::VmarOp::COMMIT,
573                                addr - unaligned,
574                                target_slice.len() + unaligned,
575                            ) {
576                                log::warn!(error:?; "Unable to commit target range");
577                            }
578                            // But the buffer range should be.
579                            if let Err(error) = vmar.op_range(
580                                zx::VmarOp::PREFETCH,
581                                buffer_slice.addr() + buffer_range.start,
582                                buffer_range.len(),
583                            ) {
584                                log::warn!(
585                                    error:?,
586                                    buffer_range:?;
587                                    "Unable to prefetch source range",
588                                );
589                            }
590                        }
591                    },
592                )
593                .await
594                .0
595            }
596            Operation::Write {
597                device_block_offset,
598                block_count,
599                _unused,
600                vmo_offset,
601                mut options,
602            } => {
603                // Strip the FORCE_ACCESS flag if we don't support it, and simulate the FUA with a
604                // post-flush.
605                if options.flags.contains(WriteFlags::FORCE_ACCESS) {
606                    let flags = self.interface.get_info().as_ref().device_flags();
607                    if !flags.contains(DeviceFlag::FUA_SUPPORT) {
608                        options.flags.remove(WriteFlags::FORCE_ACCESS);
609                        needs_postflush = true;
610                    }
611                }
612                self.interface
613                    .write(
614                        device_block_offset,
615                        block_count,
616                        vmo.as_ref().unwrap(),
617                        vmo_offset,
618                        options,
619                        trace_flow_id,
620                    )
621                    .await
622            }
623            Operation::Flush => self.interface.flush(trace_flow_id).await,
624            Operation::Trim { device_block_offset, block_count } => {
625                self.interface.trim(device_block_offset, block_count, trace_flow_id).await
626            }
627            Operation::CloseVmo
628            | Operation::StartDecompressedRead { .. }
629            | Operation::ContinueDecompressedRead { .. } => {
630                // Handled in main request loop
631                unreachable!()
632            }
633        };
634        let response = self
635            .helper
636            .orchestrator
637            .active_requests
638            .complete_and_take_response(request_id, result.into())
639            .map(|(_, r)| r);
640        if let Some(mut response) = response {
641            // Only do the post-flush on the very last request, and only if successful.
642            if zx::Status::from_raw(response.status) == zx::Status::OK && needs_postflush {
643                if let Some(id) = trace_flow_id {
644                    fuchsia_trace::async_instant!(
645                        fuchsia_trace::Id::from(id.get()),
646                        "storage",
647                        "block_server::SimulatedFUA",
648                        "request_id" => request_id.0
649                    );
650                }
651                response.status =
652                    zx::Status::from(self.interface.flush(trace_flow_id).await).into_raw();
653            }
654            Some(response)
655        } else {
656            response
657        }
658    }
659}
660
661impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
662    type Orchestrator = Self;
663
664    const SUPPORTS_DECOMPRESSION: bool = true;
665
666    // We don't need the session, we just need something unique to identify the session.
667    type Session = usize;
668
669    async fn on_attach_vmo(orchestrator: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
670        I::on_attach_vmo(&orchestrator.interface, vmo).await
671    }
672
673    async fn open_session(
674        orchestrator: Arc<Self>,
675        stream: fblock::SessionRequestStream,
676        offset_map: OffsetMap,
677        block_size: u32,
678    ) -> Result<(), Error> {
679        I::open_session(
680            &orchestrator.interface,
681            orchestrator.clone(),
682            stream,
683            offset_map,
684            block_size,
685        )
686        .await
687    }
688
689    fn get_info(&self) -> Cow<'_, DeviceInfo> {
690        self.interface.get_info()
691    }
692
693    async fn get_volume_info(
694        &self,
695    ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
696        self.interface.get_volume_info().await
697    }
698
699    async fn query_slices(
700        &self,
701        start_slices: &[u64],
702    ) -> Result<Vec<fblock::VsliceRange>, zx::Status> {
703        self.interface.query_slices(start_slices).await
704    }
705
706    async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
707        self.interface.extend(start_slice, slice_count).await
708    }
709
710    async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
711        self.interface.shrink(start_slice, slice_count).await
712    }
713
714    fn active_requests(&self) -> &ActiveRequests<Self::Session> {
715        return &self.active_requests;
716    }
717}
718
719impl<I: Interface> IntoOrchestrator for Arc<I> {
720    type SM = SessionManager<I>;
721
722    fn into_orchestrator(self) -> Arc<Self::SM> {
723        Arc::new(SessionManager {
724            interface: self,
725            active_requests: ActiveRequests::default(),
726            buffer_allocator: OnceLock::new(),
727        })
728    }
729}