Skip to main content

block_server/
c_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{Operation, RequestId, TraceFlowId};
6use crate::{IntoOrchestrator, callback_interface};
7use fidl::endpoints::RequestStream;
8use fidl_fuchsia_storage_block as fblock;
9use fidl_fuchsia_storage_block::MAX_TRANSFER_UNBOUNDED;
10use fuchsia_async as fasync;
11use fuchsia_sync::{Condvar, Mutex};
12use futures::stream::{AbortHandle, Abortable};
13use std::borrow::{Borrow, Cow};
14use std::ffi::{CStr, c_char, c_void};
15use std::num::NonZero;
16use std::sync::Arc;
17
18/// cbindgen:no-export
19pub type Session = callback_interface::Session<InterfaceAdapter>;
20
21#[repr(C)]
22pub struct Callbacks {
23    /// An opaque context object retained by this library.  The library will pass this back into all
24    /// callbacks.  The memory pointed to by `context` must last until [`block_server_delete`] is
25    /// called.
26    pub context: *mut c_void,
27    /// Starts a thread.  The implementation must call [`block_server_thread`] on this newly created
28    /// thread, providing `arg`.  Once this completes, the implementation must NOT use the block
29    /// server for which the thread was started, as it could be destroyed at any time.
30    /// The implementation must call [`block_server_thread_release`] after [`block_server_thread`]
31    /// completes (but before [`block_server_delete`] is called).
32    pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
33    /// Notifies the implementation of a new session.  The implementation must call
34    /// [`block_server_session_run`] on a separate thread, and must call
35    /// [`block_server_session_release`] after [`block_server_session_run`] (but before
36    /// [`block_server_delete`] is called).
37    pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
38    /// Submits a batch of requests to be handled by the implementation.  The implementation must
39    /// not retain references to `requests` after it returns.  The implementation must ensure that
40    /// [`block_server_send_reply`] is called exactly once with the request ID of each entry in
41    /// `requests`, regardless of its status; this call can be asynchronous but must occur before
42    /// [`block_server_delete`] is called.  Note that a reply must be sent for every request before
43    /// shutdown.
44    pub on_requests:
45        unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
46    /// Logs `message` to the implementation's logger.  The implementation must not retain
47    /// references to `message`.
48    pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
49}
50
51impl Callbacks {
52    #[allow(dead_code)]
53    fn log(&self, msg: &str) {
54        let msg = msg.as_bytes();
55        // SAFETY: This is safe if `context` and `log` are good.
56        unsafe {
57            (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
58        }
59    }
60}
61
62/// cbindgen:no-export
63#[allow(dead_code)]
64pub struct UnownedVmo(zx::sys::zx_handle_t);
65
66#[repr(C)]
67pub struct Request {
68    pub request_id: RequestId,
69    pub operation: Operation,
70    pub trace_flow_id: TraceFlowId,
71    pub vmo: UnownedVmo,
72}
73
74unsafe impl Send for Callbacks {}
75unsafe impl Sync for Callbacks {}
76
77/// Implements [`callback_interface::Interface`] using C callbacks.
78pub struct InterfaceAdapter {
79    callbacks: Callbacks,
80    info: super::DeviceInfo,
81}
82
83impl callback_interface::Interface for InterfaceAdapter {
84    type Orchestrator = Orchestrator;
85
86    fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
87        Cow::Borrowed(&self.info)
88    }
89
90    fn spawn_session(&self, session: Arc<Session>) {
91        unsafe {
92            (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session));
93        }
94    }
95
96    fn on_requests(&self, requests: &[callback_interface::Request]) {
97        let mut c_requests = Vec::with_capacity(requests.len());
98        for req in requests {
99            c_requests.push(Request {
100                request_id: req.request_id,
101                operation: req.operation.clone(),
102                trace_flow_id: req.trace_flow_id,
103                // We are handing out unowned references to the VMO here.  This is safe because the
104                // VMO bin holds references to any closed VMOs until all preceding operations have
105                // finished.
106                vmo: UnownedVmo(
107                    req.vmo.as_ref().map(|v| v.raw_handle()).unwrap_or(zx::sys::ZX_HANDLE_INVALID),
108                ),
109            });
110        }
111        unsafe {
112            (self.callbacks.on_requests)(
113                self.callbacks.context,
114                c_requests.as_mut_ptr(),
115                c_requests.len(),
116            )
117        }
118    }
119}
120
121#[repr(C)]
122pub struct PartitionInfo {
123    pub device_flags: u32,
124    pub start_block: u64,
125    pub block_count: u64,
126    pub block_size: u32,
127    pub type_guid: [u8; 16],
128    pub instance_guid: [u8; 16],
129    pub name: *const c_char,
130    pub flags: u64,
131    pub max_transfer_size: u32,
132}
133
134/// cbindgen:no-export
135#[allow(non_camel_case_types)]
136type zx_handle_t = zx::sys::zx_handle_t;
137
138/// cbindgen:no-export
139#[allow(non_camel_case_types)]
140type zx_status_t = zx::sys::zx_status_t;
141
142impl PartitionInfo {
143    /// # Safety
144    ///
145    /// [`self.name`] must point to valid, null-terminated C-string, or be a nullptr.
146    unsafe fn to_rust(&self) -> super::DeviceInfo {
147        super::DeviceInfo::Partition(super::PartitionInfo {
148            device_flags: fblock::DeviceFlag::from_bits_truncate(self.device_flags),
149            block_range: Some(self.start_block..self.start_block + self.block_count),
150            type_guid: self.type_guid,
151            instance_guid: self.instance_guid,
152            name: if self.name.is_null() {
153                "".to_string()
154            } else {
155                String::from_utf8_lossy(unsafe { CStr::from_ptr(self.name).to_bytes() }).to_string()
156            },
157            flags: self.flags,
158            max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
159                NonZero::new(self.max_transfer_size / self.block_size)
160            } else {
161                None
162            },
163        })
164    }
165}
166
167struct ExecutorMailbox(Mutex<Mail>, Condvar);
168
169impl ExecutorMailbox {
170    fn post(&self, mail: Mail) -> Mail {
171        let old = std::mem::replace(&mut *self.0.lock(), mail);
172        self.1.notify_all();
173        old
174    }
175
176    fn new() -> Self {
177        Self(Mutex::default(), Condvar::new())
178    }
179}
180
181type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
182
183#[derive(Default)]
184enum Mail {
185    #[default]
186    None,
187    Initialized(fasync::ScopeHandle, AbortHandle),
188    AsyncShutdown(*const BlockServer, ShutdownCallback, *mut c_void),
189    ThreadFinished(*const BlockServer, ShutdownCallback, *mut c_void),
190    Finished,
191}
192
193// SAFETY: `Mail::AsyncShutdown` is thread-safe.
194unsafe impl Send for Mail {}
195
196pub struct Orchestrator {
197    session_manager: callback_interface::SessionManager<InterfaceAdapter>,
198    mbox: ExecutorMailbox,
199}
200
201impl IntoOrchestrator for Arc<Orchestrator> {
202    type SM = callback_interface::SessionManager<InterfaceAdapter>;
203
204    fn into_orchestrator(self) -> Arc<Orchestrator> {
205        self
206    }
207}
208
209impl Borrow<callback_interface::SessionManager<InterfaceAdapter>> for Orchestrator {
210    fn borrow(&self) -> &callback_interface::SessionManager<InterfaceAdapter> {
211        &self.session_manager
212    }
213}
214
215pub struct BlockServer {
216    server: super::BlockServer<callback_interface::SessionManager<InterfaceAdapter>>,
217    scope: fasync::ScopeHandle,
218    abort_handle: AbortHandle,
219    orchestrator: Arc<Orchestrator>,
220}
221
222/// Creates a new block server.  Returns nullptr on failure (e.g. if the thread to run the block
223/// server failed to start).
224///
225/// # Safety
226///
227/// All callbacks in `callbacks` must be safe.
228#[unsafe(no_mangle)]
229pub unsafe extern "C" fn block_server_new(
230    partition_info: &PartitionInfo,
231    callbacks: Callbacks,
232) -> *mut BlockServer {
233    let start_thread = callbacks.start_thread;
234    let context = callbacks.context;
235
236    let session_manager = callback_interface::SessionManager::new(Arc::new(InterfaceAdapter {
237        callbacks,
238        info: unsafe { partition_info.to_rust() },
239    }));
240
241    let orchestrator = Arc::new(Orchestrator { session_manager, mbox: ExecutorMailbox::new() });
242
243    unsafe {
244        (start_thread)(context, Arc::into_raw(orchestrator.clone()) as *const c_void);
245    }
246
247    let mbox = &orchestrator.mbox;
248    let mail = {
249        let mut mail = mbox.0.lock();
250        mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
251        std::mem::replace(&mut *mail, Mail::None)
252    };
253
254    let block_size = partition_info.block_size;
255    match mail {
256        Mail::Initialized(scope, abort_handle) => Box::into_raw(Box::new(BlockServer {
257            server: super::BlockServer::new(block_size, orchestrator.clone()),
258            scope,
259            abort_handle,
260            orchestrator: orchestrator.clone(),
261        })),
262        Mail::Finished => std::ptr::null_mut(),
263        _ => unreachable!(),
264    }
265}
266
267/// Runs the main loop to handle FIDL requests for the block server.  Blocks until the server is
268/// shutting down.
269///
270/// After this returns, the caller *must* call [`block_server_thread_release`] on the same thread.
271///
272/// # Safety
273///
274/// `arg` must be the value passed to the `start_thread` callback.
275#[unsafe(no_mangle)]
276pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
277    let orchestrator = unsafe { &*(arg as *const Orchestrator) };
278
279    let mut executor = fasync::LocalExecutor::default();
280    let scope = fasync::Scope::new();
281
282    // Create a future which will run until `abort_handle` is aborted, so that the scope will run
283    // that long as well.
284    let (abort_handle, registration) = AbortHandle::new_pair();
285    let root_task = scope.spawn(async move {
286        let _ = Abortable::new(std::future::pending::<()>(), registration).await;
287    });
288    orchestrator.mbox.post(Mail::Initialized(scope.clone(), abort_handle));
289
290    // Block until the abort handle is fired.  This is the main entry point, tasks are spawned on
291    // this executor.
292    let _ = executor.run_singlethreaded(root_task);
293
294    // At this point, the abort handle was fired, which happens when shutdown begins.
295    {
296        let mut mbox = orchestrator.mbox.0.lock();
297        let mail = std::mem::take(&mut *mbox);
298        if let Mail::AsyncShutdown(block_server, callback, arg) = mail {
299            *mbox = Mail::ThreadFinished(block_server, callback, arg);
300            orchestrator.mbox.1.notify_all();
301        } else {
302            *mbox = mail;
303        }
304    }
305
306    // Synchronously cancel the scope which is processing FIDL requests.
307    let _ = executor.run_singlethreaded(scope.cancel());
308
309    // No more sessions can be created.  Before we drop the `BlockServer` instance we must make
310    // sure there are no sessions running because otherwise there could be outstanding responses
311    // that would result in `block_server_send_reply` being called.
312    orchestrator.session_manager.terminate();
313}
314
315/// Called to release the thread.  This *must* always be called on the thread spawned by
316/// [`Callbacks::start_thread`], regardless of whether [`block_server_thread`] is called or not.
317///
318/// # Safety
319///
320/// `arg` must be the value passed to the `start_thread` callback.
321#[unsafe(no_mangle)]
322pub unsafe extern "C" fn block_server_thread_release(arg: *const c_void) {
323    // SAFETY: This balances the `into_raw` in `block_server_new`.
324    let orchestrator = unsafe { Arc::from_raw(arg as *const Orchestrator) };
325
326    let mail = orchestrator.mbox.post(Mail::Finished);
327    match mail {
328        Mail::None | Mail::Finished => {}
329        Mail::ThreadFinished(block_server, callback, arg) => {
330            // SAFETY: No other threads are running now, so it should be safe to drop the
331            // `BlockServer` instance.
332            let _ = unsafe { Box::from_raw(block_server as *mut BlockServer) };
333
334            // SAFETY: Whoever supplied the callback must guarantee it's safe.
335            unsafe {
336                callback(arg);
337            }
338        }
339        _ => panic!("block_server_thread_release called while thread is still running"),
340    }
341}
342
343/// # Safety
344///
345/// `block_server` must be valid and either `block_server_delete` or `block_server_delete_async` may
346/// only be called once.
347#[unsafe(no_mangle)]
348pub unsafe extern "C" fn block_server_delete(block_server: *const BlockServer) {
349    {
350        // SAFETY: The caller asserts that `block_server` is valid.
351        let server = unsafe { &*block_server };
352
353        // NOTE: The order here is important.  We must terminate the server's main thread first
354        // before terminating sessions to avoid races that can happen when a session has been just
355        // created.
356
357        // Start by terminating the main server thread.
358        server.abort_handle.abort();
359        {
360            let mbox = &server.orchestrator.mbox;
361            let mut mail = mbox.0.lock();
362            mbox.1.wait_while(&mut mail, |mbox| !matches!(mbox, Mail::Finished));
363        }
364
365        // Now that is done, no more sessions can be created, so now we can terminate all sessions.
366        Borrow::<callback_interface::SessionManager<InterfaceAdapter>>::borrow(
367            server.orchestrator.as_ref(),
368        )
369        .terminate();
370    }
371
372    // SAFETY: No other threads are running, so we can drop the `BlockServer` instance.
373    let _ = unsafe { Box::from_raw(block_server as *mut BlockServer) };
374}
375
376/// # Safety
377///
378/// `block_server` must be valid and either `block_server_delete` or `block_server_delete_async` may
379/// only be called once.
380#[unsafe(no_mangle)]
381pub unsafe extern "C" fn block_server_delete_async(
382    block_server: *const BlockServer,
383    callback: ShutdownCallback,
384    arg: *mut c_void,
385) {
386    let abort_handle = {
387        // SAFETY: The caller asserts that `block_server` is valid.
388        let server = unsafe { &*block_server };
389
390        // We must post to the mailbox before we call abort to ensure that the callback is correctly
391        // called.
392        assert!(!matches!(
393            server.orchestrator.mbox.post(Mail::AsyncShutdown(block_server, callback, arg)),
394            Mail::Finished
395        ));
396
397        server.abort_handle.clone()
398    };
399
400    // As soon as we call `abort`, we must assume the `BlockServer` instance has been dropped.
401    abort_handle.abort();
402}
403
404/// Serves the Volume protocol for this server.  `handle` is consumed.
405///
406/// # Safety
407///
408/// `block_server` and `handle` must be valid.
409#[unsafe(no_mangle)]
410pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
411    let block_server = unsafe { &*block_server };
412    let handle = unsafe { zx::NullableHandle::from_raw(handle) };
413    block_server.scope.spawn(async move {
414        let _ = block_server
415            .server
416            .handle_requests(fblock::BlockRequestStream::from_channel(
417                fasync::Channel::from_channel(handle.into()),
418            ))
419            .await;
420    });
421}
422
423/// # Safety
424///
425/// `session` must be valid.
426#[unsafe(no_mangle)]
427pub unsafe extern "C" fn block_server_session_run(session: &Session) {
428    let session = unsafe { Arc::from_raw(session) };
429    session.run();
430    let _ = Arc::into_raw(session);
431}
432
433/// # Safety
434///
435/// `session` must be valid.
436#[unsafe(no_mangle)]
437pub unsafe extern "C" fn block_server_session_release(session: &Session) {
438    session.terminate_async();
439    unsafe { Arc::from_raw(session) };
440}
441
442/// # Safety
443///
444/// `block_server` must be valid.
445#[unsafe(no_mangle)]
446pub unsafe extern "C" fn block_server_send_reply(
447    block_server: &BlockServer,
448    request_id: RequestId,
449    status: zx_status_t,
450) {
451    block_server
452        .orchestrator
453        .session_manager
454        .complete_request(request_id, zx::Status::from_raw(status));
455}