block_server/
c_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::WriteFlags;
6
7use super::{
8    ActiveRequests, DecodedRequest, IntoSessionManager, OffsetMap, Operation, RequestId,
9    SessionHelper, TraceFlowId,
10};
11use anyhow::Error;
12use block_protocol::{BlockFifoRequest, BlockFifoResponse};
13use fidl::endpoints::RequestStream;
14use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
15use fuchsia_async::{self as fasync, EHandle};
16use fuchsia_sync::{Condvar, Mutex};
17use futures::TryStreamExt;
18use futures::stream::{AbortHandle, Abortable};
19use std::borrow::Cow;
20use std::collections::{HashMap, VecDeque};
21use std::ffi::{CStr, c_char, c_void};
22use std::mem::MaybeUninit;
23use std::num::NonZero;
24use std::ops::{Deref, DerefMut};
25use std::sync::{Arc, Weak};
26use zx::{self as zx, AsHandleRef as _};
27use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
28
29pub struct SessionManager {
30    callbacks: Callbacks,
31    open_sessions: Mutex<HashMap<usize, Weak<Session>>>,
32    open_sessions_condvar: Condvar,
33    active_requests: ActiveRequests<Arc<Session>>,
34    inflight_requests: Mutex<usize>,
35    no_inflight_requests_condvar: Condvar,
36    mbox: ExecutorMailbox,
37    info: super::DeviceInfo,
38}
39
40unsafe impl Send for SessionManager {}
41unsafe impl Sync for SessionManager {}
42
43impl SessionManager {
44    fn submit_requests(&self, requests: &mut [Request]) {
45        *self.inflight_requests.lock() += requests.len();
46        // SAFETY: `request` points to a valid array of `requests.len()` elements.
47        // The callback implementation is assumed to uphold its contract.
48        unsafe {
49            (self.callbacks.on_requests)(
50                self.callbacks.context,
51                std::ptr::from_mut(&mut requests[0]),
52                requests.len(),
53            )
54        }
55    }
56
57    /// Waits for there to be no requests in-flight.
58    ///
59    /// NOTE: To void TOCTOUs, this must be called on the same thread which calls
60    /// [`Self::submit_requests`].
61    fn wait_for_no_inflight_requests(&self) {
62        let mut guard = self.inflight_requests.lock();
63        self.no_inflight_requests_condvar.wait_while(&mut guard, |count| *count > 0);
64    }
65
66    /// Called instead of `[Self::complete_request]` when a request is completed before it was
67    /// actually submitted.
68    fn complete_unsubmitted_request(&self, request_id: RequestId, status: zx::Status) {
69        if let Some((session, response)) =
70            self.active_requests.complete_and_take_response(request_id, status)
71        {
72            session.send_response(response);
73        }
74    }
75
76    fn complete_request(&self, request_id: RequestId, status: zx::Status) {
77        let notify = {
78            let mut inflight_requests = self.inflight_requests.lock();
79            *inflight_requests -= 1;
80            *inflight_requests == 0
81        };
82        self.complete_unsubmitted_request(request_id, status);
83        if notify {
84            self.no_inflight_requests_condvar.notify_all();
85        }
86    }
87
88    fn terminate(&self) {
89        {
90            // We must drop references to sessions whilst we're not holding the lock for
91            // `open_sessions` because `Session::drop` needs to take that same lock.
92            #[allow(clippy::collection_is_never_read)]
93            let mut terminated_sessions = Vec::new();
94            for (_, session) in &*self.open_sessions.lock() {
95                if let Some(session) = session.upgrade() {
96                    session.terminate();
97                    terminated_sessions.push(session);
98                }
99            }
100        }
101        let mut guard = self.open_sessions.lock();
102        self.open_sessions_condvar.wait_while(&mut guard, |s| !s.is_empty());
103    }
104}
105
106impl super::SessionManager for SessionManager {
107    const SUPPORTS_DECOMPRESSION: bool = false;
108    type Session = Arc<Session>;
109
110    async fn on_attach_vmo(self: Arc<Self>, _vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
111        Ok(())
112    }
113
114    async fn open_session(
115        self: Arc<Self>,
116        mut stream: fblock::SessionRequestStream,
117        offset_map: OffsetMap,
118        block_size: u32,
119    ) -> Result<(), Error> {
120        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
121        let (abort_handle, registration) = AbortHandle::new_pair();
122        let session = Arc::new(Session {
123            manager: self.clone(),
124            helper,
125            fifo,
126            queue: Mutex::default(),
127            abort_handle,
128        });
129        self.open_sessions.lock().insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
130        unsafe {
131            (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session.clone()));
132        }
133
134        let result = Abortable::new(
135            async {
136                while let Some(request) = stream.try_next().await? {
137                    session.helper.handle_request(request).await?;
138                }
139                Ok(())
140            },
141            registration,
142        )
143        .await
144        .unwrap_or_else(|e| Err(e.into()));
145
146        let _ = session.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
147
148        result
149    }
150
151    fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
152        Cow::Borrowed(&self.info)
153    }
154
155    fn active_requests(&self) -> &ActiveRequests<Arc<Session>> {
156        &self.active_requests
157    }
158}
159
160impl Drop for SessionManager {
161    fn drop(&mut self) {
162        self.terminate();
163    }
164}
165
166impl IntoSessionManager for Arc<SessionManager> {
167    type SM = SessionManager;
168
169    fn into_session_manager(self) -> Self {
170        self
171    }
172}
173
174#[repr(C)]
175pub struct Callbacks {
176    /// An opaque context object retained by this library.  The library will pass this back into all
177    /// callbacks.  The memory pointed to by `context` must last until [`block_server_delete`] is
178    /// called.
179    pub context: *mut c_void,
180    /// Starts a thread.  The implementation must call [`block_server_thread`] on this newly created
181    /// thread, providing `arg`.  The implementation must then call [`block_server_thread_delete`]
182    /// after [`block_server_thread`] returns (but before [`block_server_delete`] is called).
183    pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
184    /// Notifies the implementation of a new session.  The implementation must call
185    /// [`block_server_session_run`] on a separate thread, and must call
186    /// [`block_server_session_release`] after [`block_server_session_run`] (but before
187    /// [`block_server_delete`] is called).
188    pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
189    /// Submits a batch of requests to be handled by the implementation.  The implementation must
190    /// not retain references to `requests` after it returns.  The implementation must ensure that
191    /// [`block_server_send_reply`] is called exactly once with the request ID of each entry in
192    /// `requests`, regardless of its status; this call can be asynchronous but must occur before
193    /// [`block_server_delete`] is called.
194    pub on_requests:
195        unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
196    /// Logs `message` to the implementation's logger.  The implementation must not retain
197    /// references to `message`.
198    pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
199}
200
201impl Callbacks {
202    #[allow(dead_code)]
203    fn log(&self, msg: &str) {
204        let msg = msg.as_bytes();
205        // SAFETY: This is safe if `context` and `log` are good.
206        unsafe {
207            (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
208        }
209    }
210}
211
212/// cbindgen:no-export
213#[allow(dead_code)]
214pub struct UnownedVmo(zx::sys::zx_handle_t);
215
216#[repr(C)]
217pub struct Request {
218    pub request_id: RequestId,
219    pub operation: Operation,
220    pub trace_flow_id: TraceFlowId,
221    pub vmo: UnownedVmo,
222}
223
224unsafe impl Send for Callbacks {}
225unsafe impl Sync for Callbacks {}
226
227pub struct Session {
228    manager: Arc<SessionManager>,
229    helper: SessionHelper<SessionManager>,
230    fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
231    queue: Mutex<SessionQueue>,
232    abort_handle: AbortHandle,
233}
234
235#[derive(Default)]
236struct SessionQueue {
237    responses: VecDeque<BlockFifoResponse>,
238}
239
240pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
241
242impl Session {
243    fn run(self: &Arc<Self>) {
244        self.fifo_loop();
245        self.abort_handle.abort();
246        self.helper.drop_active_requests(|s| Arc::ptr_eq(s, self));
247    }
248
249    fn fifo_loop(self: &Arc<Self>) {
250        let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
251
252        loop {
253            // Send queued responses.
254            let is_queue_empty = {
255                let mut queue = self.queue.lock();
256                while !queue.responses.is_empty() {
257                    let (front, _) = queue.responses.as_slices();
258                    match self.fifo.write(front) {
259                        Ok(count) => {
260                            let full = count < front.len();
261                            queue.responses.drain(..count);
262                            if full {
263                                break;
264                            }
265                        }
266                        Err(zx::Status::SHOULD_WAIT) => break,
267                        Err(_) => return,
268                    }
269                }
270                queue.responses.is_empty()
271            };
272
273            // Process pending reads.
274            match self.fifo.read_uninit(&mut requests) {
275                Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
276                Err(zx::Status::SHOULD_WAIT) => {
277                    let mut signals =
278                        zx::Signals::OBJECT_READABLE | zx::Signals::USER_0 | zx::Signals::USER_1;
279                    if !is_queue_empty {
280                        signals |= zx::Signals::OBJECT_WRITABLE;
281                    }
282                    let Ok(signals) =
283                        self.fifo.wait_handle(signals, zx::MonotonicInstant::INFINITE).to_result()
284                    else {
285                        return;
286                    };
287                    if signals.contains(zx::Signals::USER_0) {
288                        return;
289                    }
290                    // Clear USER_1 signal if it's set.
291                    if signals.contains(zx::Signals::USER_1) {
292                        let _ = self.fifo.signal_handle(zx::Signals::USER_1, zx::Signals::empty());
293                    }
294                }
295                Err(_) => return,
296            }
297        }
298    }
299
300    /// Synchronously performs a device flush.
301    fn pre_flush(self: &Arc<Self>, request_id: RequestId) -> Result<(), zx::Status> {
302        let trace_flow_id = {
303            let mut request = self.manager.active_requests.request(request_id);
304            if let Some(id) = request.trace_flow_id {
305                fuchsia_trace::async_instant!(
306                    fuchsia_trace::Id::from(id.get()),
307                    c"storage",
308                    c"block_server::SimulatedBarrier",
309                    "request_id" => request_id.0
310                );
311            }
312            request.count += 1;
313            request.trace_flow_id
314        };
315        self.manager.submit_requests(&mut [Request {
316            request_id,
317            operation: Operation::Flush,
318            trace_flow_id,
319            vmo: UnownedVmo(zx::sys::ZX_HANDLE_INVALID),
320        }]);
321        self.manager.wait_for_no_inflight_requests();
322        let status = self.manager.active_requests.request(request_id).status;
323        match status {
324            zx::Status::OK => Ok(()),
325            status => {
326                // Respond for the unsubmitted request too.
327                self.manager.complete_unsubmitted_request(request_id, status);
328                Err(status)
329            }
330        }
331    }
332
333    /// Synchronously completes `decoded_requests`, and inserts a post-flush into `decoded_requests`
334    /// to be submitted later.
335    fn post_flush(self: &Arc<Self>, request_id: RequestId, decoded_requests: &mut DecodedRequests) {
336        if decoded_requests.len() > 0 {
337            self.manager.submit_requests(decoded_requests);
338            decoded_requests.clear();
339        }
340        self.manager.wait_for_no_inflight_requests();
341        let request = self.manager.active_requests.request(request_id);
342        match request.status {
343            zx::Status::OK => decoded_requests.push(Request {
344                request_id,
345                operation: Operation::Flush,
346                trace_flow_id: request.trace_flow_id,
347                vmo: UnownedVmo(zx::sys::ZX_HANDLE_INVALID),
348            }),
349            status => {
350                drop(request);
351                self.manager.complete_unsubmitted_request(request_id, status)
352            }
353        }
354    }
355
356    fn handle_requests<'a>(
357        self: &Arc<Self>,
358        requests: impl Iterator<Item = &'a mut BlockFifoRequest>,
359    ) {
360        let mut decoded_requests = DecodedRequests::default();
361        for request in requests {
362            match self.helper.decode_fifo_request(self.clone(), request) {
363                Ok(DecodedRequest { operation: Operation::CloseVmo, request_id, .. }) => {
364                    self.manager.complete_unsubmitted_request(request_id, zx::Status::OK);
365                }
366                Ok(mut request) => {
367                    let request_id = request.request_id;
368                    // Strip the PRE_BARRIER flag if we don't support it, and simulate the barrier
369                    // with a pre-flush.
370                    if !self.manager.info.device_flags().contains(fblock::Flag::BARRIER_SUPPORT)
371                        && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
372                        && self.pre_flush(request_id).is_err()
373                    {
374                        continue;
375                    }
376                    // Strip the FORCE_ACCESS flag if we don't support it, and simulate the FUA with
377                    // a post-flush.
378                    let simulate_fua =
379                        !self.manager.info.device_flags().contains(fblock::Flag::FUA_SUPPORT)
380                            && request.operation.take_write_flag(WriteFlags::FORCE_ACCESS);
381                    if simulate_fua {
382                        // Account for the additional request we need at the end.
383                        self.manager.active_requests.request(request_id).count += 1;
384                    }
385
386                    loop {
387                        let result = self.helper.map_request(
388                            request,
389                            &mut self.manager.active_requests.request(request_id),
390                        );
391                        match result {
392                            Ok((
393                                DecodedRequest { request_id, operation, vmo, trace_flow_id },
394                                remainder,
395                            )) => {
396                                // We are handing out unowned references to the VMO here.  This is
397                                // safe because the VMO bin holds references to any closed VMOs
398                                // until all preceding operations have finished.
399                                decoded_requests.push(Request {
400                                    request_id,
401                                    operation,
402                                    trace_flow_id,
403                                    vmo: UnownedVmo(
404                                        vmo.as_ref()
405                                            .map(|vmo| vmo.raw_handle())
406                                            .unwrap_or(zx::sys::ZX_HANDLE_INVALID),
407                                    ),
408                                });
409
410                                if decoded_requests.is_full() {
411                                    self.manager.submit_requests(&mut decoded_requests);
412                                    decoded_requests.clear();
413                                }
414                                if let Some(r) = remainder {
415                                    request = r;
416                                } else {
417                                    break;
418                                }
419                            }
420                            Err(status) => {
421                                self.manager.complete_unsubmitted_request(request_id, status);
422                                break;
423                            }
424                        }
425                    }
426
427                    if simulate_fua {
428                        self.post_flush(request_id, &mut decoded_requests);
429                    }
430                }
431                Err(None) => {}
432                Err(Some(response)) => self.send_response(response),
433            }
434        }
435        if !decoded_requests.is_empty() {
436            self.manager.submit_requests(&mut decoded_requests);
437        }
438    }
439
440    fn send_response(&self, response: BlockFifoResponse) {
441        let mut queue = self.queue.lock();
442        if queue.responses.is_empty() {
443            match self.fifo.write_one(&response) {
444                Ok(()) => {
445                    return;
446                }
447                Err(_) => {
448                    // Wake `fifo_loop`.
449                    let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_1);
450                }
451            }
452        }
453        queue.responses.push_back(response);
454    }
455
456    fn terminate(&self) {
457        let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
458        self.abort_handle.abort();
459    }
460}
461
462impl Drop for Session {
463    fn drop(&mut self) {
464        let notify = {
465            let mut open_sessions = self.manager.open_sessions.lock();
466            open_sessions.remove(&(self as *const _ as usize));
467            open_sessions.is_empty()
468        };
469        if notify {
470            self.manager.open_sessions_condvar.notify_all();
471        }
472    }
473}
474
475pub struct BlockServer {
476    server: super::BlockServer<SessionManager>,
477    ehandle: EHandle,
478    abort_handle: AbortHandle,
479}
480
481struct ExecutorMailbox(Mutex<Mail>, Condvar);
482
483impl ExecutorMailbox {
484    /// Returns the old mail.
485    fn post(&self, mail: Mail) -> Mail {
486        let old = std::mem::replace(&mut *self.0.lock(), mail);
487        self.1.notify_all();
488        old
489    }
490
491    fn new() -> Self {
492        Self(Mutex::default(), Condvar::new())
493    }
494}
495
496type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
497
498#[derive(Default)]
499enum Mail {
500    #[default]
501    None,
502    Initialized(EHandle, AbortHandle),
503    AsyncShutdown(Box<BlockServer>, ShutdownCallback, *mut c_void),
504    Finished,
505}
506
507impl Drop for BlockServer {
508    fn drop(&mut self) {
509        self.abort_handle.abort();
510        let manager = &self.server.session_manager;
511        let mut mbox = manager.mbox.0.lock();
512        manager.mbox.1.wait_while(&mut mbox, |mbox| !matches!(mbox, Mail::Finished));
513        manager.terminate();
514        debug_assert!(Arc::strong_count(manager) > 0);
515    }
516}
517
518#[repr(C)]
519pub struct PartitionInfo {
520    pub device_flags: u32,
521    pub start_block: u64,
522    pub block_count: u64,
523    pub block_size: u32,
524    pub type_guid: [u8; 16],
525    pub instance_guid: [u8; 16],
526    pub name: *const c_char,
527    pub flags: u64,
528    pub max_transfer_size: u32,
529}
530
531/// cbindgen:no-export
532#[allow(non_camel_case_types)]
533type zx_handle_t = zx::sys::zx_handle_t;
534
535/// cbindgen:no-export
536#[allow(non_camel_case_types)]
537type zx_status_t = zx::sys::zx_status_t;
538
539impl PartitionInfo {
540    /// # Safety
541    ///
542    /// [`self.name`] must point to valid, null-terminated C-string, or be a nullptr.
543    unsafe fn to_rust(&self) -> super::DeviceInfo {
544        super::DeviceInfo::Partition(super::PartitionInfo {
545            device_flags: fblock::Flag::from_bits_truncate(self.device_flags),
546            block_range: Some(self.start_block..self.start_block + self.block_count),
547            type_guid: self.type_guid,
548            instance_guid: self.instance_guid,
549            name: if self.name.is_null() {
550                "".to_string()
551            } else {
552                String::from_utf8_lossy(unsafe { CStr::from_ptr(self.name).to_bytes() }).to_string()
553            },
554            flags: self.flags,
555            max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
556                NonZero::new(self.max_transfer_size / self.block_size)
557            } else {
558                None
559            },
560        })
561    }
562}
563
564struct DecodedRequests {
565    requests: [MaybeUninit<Request>; MAX_REQUESTS],
566    count: usize,
567}
568
569impl Default for DecodedRequests {
570    fn default() -> Self {
571        Self { requests: unsafe { MaybeUninit::uninit().assume_init() }, count: 0 }
572    }
573}
574
575impl DecodedRequests {
576    fn push(&mut self, request: Request) {
577        assert!(self.count < MAX_REQUESTS);
578        self.requests[self.count].write(request);
579        self.count += 1;
580    }
581
582    fn is_full(&self) -> bool {
583        self.count == MAX_REQUESTS
584    }
585
586    fn clear(&mut self) {
587        self.count = 0;
588    }
589}
590
591impl Deref for DecodedRequests {
592    type Target = [Request];
593
594    fn deref(&self) -> &Self::Target {
595        // SAFETY: We wrote the request in [`Self::push`].
596        unsafe { std::slice::from_raw_parts(self.requests[0].as_ptr(), self.count) }
597    }
598}
599
600impl DerefMut for DecodedRequests {
601    fn deref_mut(&mut self) -> &mut Self::Target {
602        // SAFETY: We wrote the request in [`Self::push`].
603        unsafe { std::slice::from_raw_parts_mut(self.requests[0].as_mut_ptr(), self.count) }
604    }
605}
606
607/// # Safety
608///
609/// All callbacks in `callbacks` must be safe.
610#[unsafe(no_mangle)]
611pub unsafe extern "C" fn block_server_new(
612    partition_info: &PartitionInfo,
613    callbacks: Callbacks,
614) -> *mut BlockServer {
615    let session_manager = Arc::new(SessionManager {
616        callbacks,
617        open_sessions: Mutex::default(),
618        active_requests: ActiveRequests::default(),
619        open_sessions_condvar: Condvar::new(),
620        inflight_requests: Mutex::default(),
621        no_inflight_requests_condvar: Condvar::new(),
622        mbox: ExecutorMailbox::new(),
623        info: unsafe { partition_info.to_rust() },
624    });
625
626    unsafe {
627        (session_manager.callbacks.start_thread)(
628            session_manager.callbacks.context,
629            Arc::into_raw(session_manager.clone()) as *const c_void,
630        );
631    }
632
633    let mbox = &session_manager.mbox;
634    let mail = {
635        let mut mail = mbox.0.lock();
636        mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
637        std::mem::replace(&mut *mail, Mail::None)
638    };
639
640    let block_size = partition_info.block_size;
641    match mail {
642        Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
643            server: super::BlockServer::new(block_size, session_manager),
644            ehandle,
645            abort_handle,
646        })),
647        Mail::Finished => std::ptr::null_mut(),
648        _ => unreachable!(),
649    }
650}
651
652/// # Safety
653///
654/// `arg` must be the value passed to the `start_thread` callback.
655#[unsafe(no_mangle)]
656pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
657    let session_manager = unsafe { &*(arg as *const SessionManager) };
658
659    let mut executor = fasync::LocalExecutor::default();
660    let (abort_handle, registration) = AbortHandle::new_pair();
661
662    session_manager.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
663
664    let _ = executor.run_singlethreaded(Abortable::new(std::future::pending::<()>(), registration));
665}
666
667/// Called to delete the thread.  This *must* always be called, regardless of whether starting the
668/// thread is successful or not.
669///
670/// # Safety
671///
672/// `arg` must be the value passed to the `start_thread` callback.
673#[unsafe(no_mangle)]
674pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
675    let mail = {
676        let session_manager = unsafe { Arc::from_raw(arg as *const SessionManager) };
677        debug_assert!(Arc::strong_count(&session_manager) > 0);
678        session_manager.mbox.post(Mail::Finished)
679    };
680
681    if let Mail::AsyncShutdown(server, callback, arg) = mail {
682        std::mem::drop(server);
683        // SAFETY: Whoever supplied the callback must guarantee it's safe.
684        unsafe {
685            callback(arg);
686        }
687    }
688}
689
690/// # Safety
691///
692/// `block_server` must be valid.
693#[unsafe(no_mangle)]
694pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
695    let _ = unsafe { Box::from_raw(block_server) };
696}
697
698/// # Safety
699///
700/// `block_server` must be valid.
701#[unsafe(no_mangle)]
702pub unsafe extern "C" fn block_server_delete_async(
703    block_server: *mut BlockServer,
704    callback: ShutdownCallback,
705    arg: *mut c_void,
706) {
707    let block_server = unsafe { Box::from_raw(block_server) };
708    let session_manager = block_server.server.session_manager.clone();
709    let abort_handle = block_server.abort_handle.clone();
710    session_manager.mbox.post(Mail::AsyncShutdown(block_server, callback, arg));
711    abort_handle.abort();
712}
713
714/// Serves the Volume protocol for this server.  `handle` is consumed.
715///
716/// # Safety
717///
718/// `block_server` and `handle` must be valid.
719#[unsafe(no_mangle)]
720pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
721    let block_server = unsafe { &*block_server };
722    let ehandle = &block_server.ehandle;
723    let handle = unsafe { zx::NullableHandle::from_raw(handle) };
724    ehandle.global_scope().spawn(async move {
725        let _ = block_server
726            .server
727            .handle_requests(fvolume::VolumeRequestStream::from_channel(
728                fasync::Channel::from_channel(handle.into()),
729            ))
730            .await;
731    });
732}
733
734/// # Safety
735///
736/// `session` must be valid.
737#[unsafe(no_mangle)]
738pub unsafe extern "C" fn block_server_session_run(session: &Session) {
739    let session = unsafe { Arc::from_raw(session) };
740    session.run();
741    let _ = Arc::into_raw(session);
742}
743
744/// # Safety
745///
746/// `session` must be valid.
747#[unsafe(no_mangle)]
748pub unsafe extern "C" fn block_server_session_release(session: &Session) {
749    session.terminate();
750    unsafe { Arc::from_raw(session) };
751}
752
753/// # Safety
754///
755/// `block_server` must be valid.
756#[unsafe(no_mangle)]
757pub unsafe extern "C" fn block_server_send_reply(
758    block_server: &BlockServer,
759    request_id: RequestId,
760    status: zx_status_t,
761) {
762    block_server.server.session_manager.complete_request(request_id, zx::Status::from_raw(status));
763}