Skip to main content

block_server/
c_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{Operation, RequestId, TraceFlowId};
6use crate::{IntoOrchestrator, callback_interface};
7use fidl::endpoints::RequestStream;
8use fidl_fuchsia_storage_block as fblock;
9use fidl_fuchsia_storage_block::MAX_TRANSFER_UNBOUNDED;
10use fuchsia_async::{self as fasync, EHandle};
11use fuchsia_sync::{Condvar, Mutex};
12use futures::stream::AbortHandle;
13use std::borrow::{Borrow, Cow};
14use std::ffi::{CStr, c_char, c_void};
15use std::num::NonZero;
16use std::sync::Arc;
17
18#[repr(C)]
19pub struct Callbacks {
20    /// An opaque context object retained by this library.  The library will pass this back into all
21    /// callbacks.  The memory pointed to by `context` must last until [`block_server_delete`] is
22    /// called.
23    pub context: *mut c_void,
24    /// Starts a thread.  The implementation must call [`block_server_thread`] on this newly created
25    /// thread, providing `arg`.  The implementation must then call [`block_server_thread_delete`]
26    /// after [`block_server_thread`] returns (but before [`block_server_delete`] is called).
27    pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
28    /// Notifies the implementation of a new session.  The implementation must call
29    /// [`block_server_session_run`] on a separate thread, and must call
30    /// [`block_server_session_release`] after [`block_server_session_run`] (but before
31    /// [`block_server_delete`] is called).
32    pub on_new_session: unsafe extern "C" fn(
33        context: *mut c_void,
34        session: *const callback_interface::Session<InterfaceAdapter>,
35    ),
36    /// Submits a batch of requests to be handled by the implementation.  The implementation must
37    /// not retain references to `requests` after it returns.  The implementation must ensure that
38    /// [`block_server_send_reply`] is called exactly once with the request ID of each entry in
39    /// `requests`, regardless of its status; this call can be asynchronous but must occur before
40    /// [`block_server_delete`] is called.  Note that a reply must be sent for every request before
41    /// shutdown.
42    pub on_requests:
43        unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
44    /// Logs `message` to the implementation's logger.  The implementation must not retain
45    /// references to `message`.
46    pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
47}
48
49impl Callbacks {
50    #[allow(dead_code)]
51    fn log(&self, msg: &str) {
52        let msg = msg.as_bytes();
53        // SAFETY: This is safe if `context` and `log` are good.
54        unsafe {
55            (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
56        }
57    }
58}
59
60/// cbindgen:no-export
61#[allow(dead_code)]
62pub struct UnownedVmo(zx::sys::zx_handle_t);
63
64#[repr(C)]
65pub struct Request {
66    pub request_id: RequestId,
67    pub operation: Operation,
68    pub trace_flow_id: TraceFlowId,
69    pub vmo: UnownedVmo,
70}
71
72unsafe impl Send for Callbacks {}
73unsafe impl Sync for Callbacks {}
74
75/// Implements [`callback_interface::Interface`] using C callbacks.
76pub struct InterfaceAdapter {
77    callbacks: Callbacks,
78    info: super::DeviceInfo,
79}
80
81impl callback_interface::Interface for InterfaceAdapter {
82    type Orchestrator = Orchestrator;
83
84    fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
85        Cow::Borrowed(&self.info)
86    }
87
88    fn spawn_session(&self, session: Arc<callback_interface::Session<Self>>) {
89        unsafe {
90            (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session));
91        }
92    }
93
94    fn on_requests(&self, requests: &[callback_interface::Request]) {
95        let mut c_requests = Vec::with_capacity(requests.len());
96        for req in requests {
97            c_requests.push(Request {
98                request_id: req.request_id,
99                operation: req.operation.clone(),
100                trace_flow_id: req.trace_flow_id,
101                // We are handing out unowned references to the VMO here.  This is safe because the
102                // VMO bin holds references to any closed VMOs until all preceding operations have
103                // finished.
104                vmo: UnownedVmo(
105                    req.vmo.as_ref().map(|v| v.raw_handle()).unwrap_or(zx::sys::ZX_HANDLE_INVALID),
106                ),
107            });
108        }
109        unsafe {
110            (self.callbacks.on_requests)(
111                self.callbacks.context,
112                c_requests.as_mut_ptr(),
113                c_requests.len(),
114            )
115        }
116    }
117}
118
119#[repr(C)]
120pub struct PartitionInfo {
121    pub device_flags: u32,
122    pub start_block: u64,
123    pub block_count: u64,
124    pub block_size: u32,
125    pub type_guid: [u8; 16],
126    pub instance_guid: [u8; 16],
127    pub name: *const c_char,
128    pub flags: u64,
129    pub max_transfer_size: u32,
130}
131
132/// cbindgen:no-export
133#[allow(non_camel_case_types)]
134type zx_handle_t = zx::sys::zx_handle_t;
135
136/// cbindgen:no-export
137#[allow(non_camel_case_types)]
138type zx_status_t = zx::sys::zx_status_t;
139
140impl PartitionInfo {
141    /// # Safety
142    ///
143    /// [`self.name`] must point to valid, null-terminated C-string, or be a nullptr.
144    unsafe fn to_rust(&self) -> super::DeviceInfo {
145        super::DeviceInfo::Partition(super::PartitionInfo {
146            device_flags: fblock::DeviceFlag::from_bits_truncate(self.device_flags),
147            block_range: Some(self.start_block..self.start_block + self.block_count),
148            type_guid: self.type_guid,
149            instance_guid: self.instance_guid,
150            name: if self.name.is_null() {
151                "".to_string()
152            } else {
153                String::from_utf8_lossy(unsafe { CStr::from_ptr(self.name).to_bytes() }).to_string()
154            },
155            flags: self.flags,
156            max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
157                NonZero::new(self.max_transfer_size / self.block_size)
158            } else {
159                None
160            },
161        })
162    }
163}
164
165struct ExecutorMailbox(Mutex<Mail>, Condvar);
166
167impl ExecutorMailbox {
168    fn post(&self, mail: Mail) -> Mail {
169        let old = std::mem::replace(&mut *self.0.lock(), mail);
170        self.1.notify_all();
171        old
172    }
173
174    fn new() -> Self {
175        Self(Mutex::default(), Condvar::new())
176    }
177}
178
179type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
180
181#[derive(Default)]
182enum Mail {
183    #[default]
184    None,
185    Initialized(EHandle, AbortHandle),
186    AsyncShutdown(*const BlockServer, ShutdownCallback, *mut c_void),
187    Finished,
188}
189
190// SAFETY: `Mail::AsyncShutdown` is thread-safe.
191unsafe impl Send for Mail {}
192
193pub struct Orchestrator {
194    session_manager: callback_interface::SessionManager<InterfaceAdapter>,
195    mbox: ExecutorMailbox,
196}
197
198impl IntoOrchestrator for Arc<Orchestrator> {
199    type SM = callback_interface::SessionManager<InterfaceAdapter>;
200
201    fn into_orchestrator(self) -> Arc<Orchestrator> {
202        self
203    }
204}
205
206impl Borrow<callback_interface::SessionManager<InterfaceAdapter>> for Orchestrator {
207    fn borrow(&self) -> &callback_interface::SessionManager<InterfaceAdapter> {
208        &self.session_manager
209    }
210}
211
212pub struct BlockServer {
213    server: super::BlockServer<callback_interface::SessionManager<InterfaceAdapter>>,
214    ehandle: EHandle,
215    abort_handle: AbortHandle,
216    orchestrator: Arc<Orchestrator>,
217}
218
219/// # Safety
220///
221/// All callbacks in `callbacks` must be safe.
222#[unsafe(no_mangle)]
223pub unsafe extern "C" fn block_server_new(
224    partition_info: &PartitionInfo,
225    callbacks: Callbacks,
226) -> *mut BlockServer {
227    let start_thread = callbacks.start_thread;
228    let context = callbacks.context;
229
230    let session_manager = callback_interface::SessionManager::new(Arc::new(InterfaceAdapter {
231        callbacks,
232        info: unsafe { partition_info.to_rust() },
233    }));
234
235    let orchestrator = Arc::new(Orchestrator { session_manager, mbox: ExecutorMailbox::new() });
236
237    unsafe {
238        (start_thread)(context, Arc::into_raw(orchestrator.clone()) as *const c_void);
239    }
240
241    let mbox = &orchestrator.mbox;
242    let mail = {
243        let mut mail = mbox.0.lock();
244        mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
245        std::mem::replace(&mut *mail, Mail::None)
246    };
247
248    let block_size = partition_info.block_size;
249    match mail {
250        Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
251            server: super::BlockServer::new(block_size, orchestrator.clone()),
252            ehandle,
253            abort_handle,
254            orchestrator: orchestrator.clone(),
255        })),
256        Mail::Finished => std::ptr::null_mut(),
257        _ => unreachable!(),
258    }
259}
260
261/// # Safety
262///
263/// `arg` must be the value passed to the `start_thread` callback.
264#[unsafe(no_mangle)]
265pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
266    let orchestrator = unsafe { &*(arg as *const Orchestrator) };
267
268    let mut executor = fasync::LocalExecutor::default();
269    let (abort_handle, registration) = futures::stream::AbortHandle::new_pair();
270
271    orchestrator.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
272
273    let _ = executor.run_singlethreaded(futures::stream::Abortable::new(
274        std::future::pending::<()>(),
275        registration,
276    ));
277}
278
279/// Called to delete the thread.  This *must* always be called, regardless of whether starting the
280/// thread is successful or not.
281///
282/// # Safety
283///
284/// `arg` must be the value passed to the `start_thread` callback.
285#[unsafe(no_mangle)]
286pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
287    // SAFETY: This balances the `into_raw` in `block_server_new`.
288    let orchestrator = unsafe { Arc::from_raw(arg as *const Orchestrator) };
289
290    let mail = orchestrator.mbox.post(Mail::Finished);
291
292    if let Mail::AsyncShutdown(block_server, callback, arg) = mail {
293        // No more sessions can be created.  Before we drop the `BlockServer` instance we must make
294        // sure there are no sessions running because otherwise there could be outstanding responses
295        // that would result in `block_server_send_reply` being called.
296        orchestrator.session_manager.terminate();
297
298        // SAFETY: No other threads are running now, so it should be safe to drop the `BlockServer`
299        // instance.
300        let _ = unsafe { Box::from_raw(block_server as *mut BlockServer) };
301
302        // SAFETY: Whoever supplied the callback must guarantee it's safe.
303        unsafe {
304            callback(arg);
305        }
306    }
307}
308
309/// # Safety
310///
311/// `block_server` must be valid and either `block_server_delete` or `block_server_delete_async` may
312/// only be called once.
313#[unsafe(no_mangle)]
314pub unsafe extern "C" fn block_server_delete(block_server: *const BlockServer) {
315    {
316        // SAFETY: The caller asserts that `block_server` is valid.
317        let server = unsafe { &*block_server };
318
319        // NOTE: The order here is important.  We must terminate the server's main thread first
320        // before terminating sessions to avoid races that can happen when a session has been just
321        // created.
322
323        // Start by terminating the main server thread.
324        server.abort_handle.abort();
325        {
326            let mbox = &server.orchestrator.mbox;
327            let mut mail = mbox.0.lock();
328            mbox.1.wait_while(&mut mail, |mbox| !matches!(mbox, Mail::Finished));
329        }
330
331        // Now that is done, no more sessions can be created, so now we can terminate all sessions.
332        Borrow::<callback_interface::SessionManager<InterfaceAdapter>>::borrow(
333            server.orchestrator.as_ref(),
334        )
335        .terminate();
336    }
337
338    // SAFETY: No other threads are running, so we can drop the `BlockServer` instance.
339    let _ = unsafe { Box::from_raw(block_server as *mut BlockServer) };
340}
341
342/// # Safety
343///
344/// `block_server` must be valid and either `block_server_delete` or `block_server_delete_async` may
345/// only be called once.
346#[unsafe(no_mangle)]
347pub unsafe extern "C" fn block_server_delete_async(
348    block_server: *const BlockServer,
349    callback: ShutdownCallback,
350    arg: *mut c_void,
351) {
352    let abort_handle = {
353        // SAFETY: The caller asserts that `block_server` is valid.
354        let server = unsafe { &*block_server };
355
356        // We must post to the mailbox before we call abort to ensure that the callback is correctly
357        // called.
358        assert!(!matches!(
359            server.orchestrator.mbox.post(Mail::AsyncShutdown(block_server, callback, arg)),
360            Mail::Finished
361        ));
362
363        server.abort_handle.clone()
364    };
365
366    // As soon as we call `abort`, we must assume the `BlockServer` instance has been dropped.
367    abort_handle.abort();
368}
369
370/// Serves the Volume protocol for this server.  `handle` is consumed.
371///
372/// # Safety
373///
374/// `block_server` and `handle` must be valid.
375#[unsafe(no_mangle)]
376pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
377    let block_server = unsafe { &*block_server };
378    let ehandle = &block_server.ehandle;
379    let handle = unsafe { zx::NullableHandle::from_raw(handle) };
380    ehandle.global_scope().spawn(async move {
381        let _ = block_server
382            .server
383            .handle_requests(fblock::BlockRequestStream::from_channel(
384                fasync::Channel::from_channel(handle.into()),
385            ))
386            .await;
387    });
388}
389
390/// # Safety
391///
392/// `session` must be valid.
393#[unsafe(no_mangle)]
394pub unsafe extern "C" fn block_server_session_run(
395    session: &callback_interface::Session<InterfaceAdapter>,
396) {
397    let session = unsafe { Arc::from_raw(session) };
398    session.run();
399    let _ = Arc::into_raw(session);
400}
401
402/// # Safety
403///
404/// `session` must be valid.
405#[unsafe(no_mangle)]
406pub unsafe extern "C" fn block_server_session_release(
407    session: &callback_interface::Session<InterfaceAdapter>,
408) {
409    session.terminate_async();
410    unsafe { Arc::from_raw(session) };
411}
412
413/// # Safety
414///
415/// `block_server` must be valid.
416#[unsafe(no_mangle)]
417pub unsafe extern "C" fn block_server_send_reply(
418    block_server: &BlockServer,
419    request_id: RequestId,
420    status: zx_status_t,
421) {
422    block_server
423        .orchestrator
424        .session_manager
425        .complete_request(request_id, zx::Status::from_raw(status));
426}