block_server/
c_interface.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{IntoSessionManager, OffsetMap, Operation, RequestId, RequestTracking, SessionHelper};
6use anyhow::Error;
7use block_protocol::{BlockFifoRequest, BlockFifoResponse};
8use fidl::endpoints::RequestStream;
9use fuchsia_async::{self as fasync, EHandle};
10use futures::stream::{AbortHandle, Abortable};
11use futures::TryStreamExt;
12use std::borrow::Cow;
13use std::collections::{HashMap, VecDeque};
14use std::ffi::{c_char, c_void, CStr};
15use std::mem::MaybeUninit;
16use std::num::NonZero;
17use std::sync::{Arc, Condvar, Mutex, Weak};
18use zx::{self as zx, AsHandleRef as _};
19use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
20
21pub struct SessionManager {
22    callbacks: Callbacks,
23    open_sessions: Mutex<HashMap<usize, Weak<Session>>>,
24    condvar: Condvar,
25    mbox: ExecutorMailbox,
26    info: super::DeviceInfo,
27}
28
29unsafe impl Send for SessionManager {}
30unsafe impl Sync for SessionManager {}
31
32impl SessionManager {
33    fn terminate(&self) {
34        {
35            // We must drop references to sessions whilst we're not holding the lock for
36            // `open_sessions` because `Session::drop` needs to take that same lock.
37            #[allow(clippy::collection_is_never_read)]
38            let mut terminated_sessions = Vec::new();
39            for (_, session) in &*self.open_sessions.lock().unwrap() {
40                if let Some(session) = session.upgrade() {
41                    session.terminate();
42                    terminated_sessions.push(session);
43                }
44            }
45        }
46        let _guard =
47            self.condvar.wait_while(self.open_sessions.lock().unwrap(), |s| !s.is_empty()).unwrap();
48    }
49}
50
51impl super::SessionManager for SessionManager {
52    async fn on_attach_vmo(self: Arc<Self>, _vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
53        Ok(())
54    }
55
56    async fn open_session(
57        self: Arc<Self>,
58        mut stream: fblock::SessionRequestStream,
59        offset_map: Option<OffsetMap>,
60        block_size: u32,
61    ) -> Result<(), Error> {
62        let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
63        let (abort_handle, registration) = AbortHandle::new_pair();
64        let session = Arc::new(Session {
65            manager: self.clone(),
66            helper,
67            fifo,
68            queue: Mutex::default(),
69            vmos: Mutex::default(),
70            abort_handle,
71        });
72        self.open_sessions
73            .lock()
74            .unwrap()
75            .insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
76        unsafe {
77            (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session.clone()));
78        }
79
80        let result = Abortable::new(
81            async {
82                while let Some(request) = stream.try_next().await? {
83                    session.helper.handle_request(request).await?;
84                }
85                Ok(())
86            },
87            registration,
88        )
89        .await
90        .unwrap_or_else(|e| Err(e.into()));
91
92        let _ = session.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
93
94        result
95    }
96
97    async fn get_info(&self) -> Result<Cow<'_, super::DeviceInfo>, zx::Status> {
98        Ok(Cow::Borrowed(&self.info))
99    }
100}
101
102impl Drop for SessionManager {
103    fn drop(&mut self) {
104        self.terminate();
105    }
106}
107
108impl IntoSessionManager for Arc<SessionManager> {
109    type SM = SessionManager;
110
111    fn into_session_manager(self) -> Self {
112        self
113    }
114}
115
116#[repr(C)]
117pub struct Callbacks {
118    pub context: *mut c_void,
119    pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
120    pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
121    pub on_requests: unsafe extern "C" fn(
122        context: *mut c_void,
123        session: *const Session,
124        requests: *mut Request,
125        request_count: usize,
126    ),
127    pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
128}
129
130impl Callbacks {
131    #[allow(dead_code)]
132    fn log(&self, msg: &str) {
133        let msg = msg.as_bytes();
134        // SAFETY: This is safe if `context` and `log` are good.
135        unsafe {
136            (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
137        }
138    }
139}
140
141/// cbindgen:no-export
142#[allow(dead_code)]
143pub struct UnownedVmo(zx::sys::zx_handle_t);
144
145#[repr(C)]
146pub struct Request {
147    pub request_id: RequestId,
148    pub operation: Operation,
149    pub trace_flow_id: Option<NonZero<u64>>,
150    pub vmo: UnownedVmo,
151}
152
153unsafe impl Send for Callbacks {}
154unsafe impl Sync for Callbacks {}
155
156pub struct Session {
157    manager: Arc<SessionManager>,
158    helper: SessionHelper<SessionManager>,
159    fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
160    queue: Mutex<SessionQueue>,
161    vmos: Mutex<HashMap<RequestId, Arc<zx::Vmo>>>,
162    abort_handle: AbortHandle,
163}
164
165#[derive(Default)]
166struct SessionQueue {
167    responses: VecDeque<BlockFifoResponse>,
168}
169
170pub const MAX_REQUESTS: usize = 64;
171
172impl Session {
173    fn run(&self) {
174        self.fifo_loop();
175        self.abort_handle.abort();
176    }
177
178    fn fifo_loop(&self) {
179        let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
180
181        loop {
182            // Send queued responses.
183            let is_queue_empty = {
184                let mut queue = self.queue.lock().unwrap();
185                while !queue.responses.is_empty() {
186                    let (front, _) = queue.responses.as_slices();
187                    match self.fifo.write(front) {
188                        Ok(count) => {
189                            let full = count < front.len();
190                            queue.responses.drain(..count);
191                            if full {
192                                break;
193                            }
194                        }
195                        Err(zx::Status::SHOULD_WAIT) => break,
196                        Err(_) => return,
197                    }
198                }
199                queue.responses.is_empty()
200            };
201
202            // Process pending reads.
203            match self.fifo.read_uninit(&mut requests) {
204                Ok(valid_requests) => self.handle_requests(valid_requests.iter()),
205                Err(zx::Status::SHOULD_WAIT) => {
206                    let mut signals =
207                        zx::Signals::OBJECT_READABLE | zx::Signals::USER_0 | zx::Signals::USER_1;
208                    if !is_queue_empty {
209                        signals |= zx::Signals::OBJECT_WRITABLE;
210                    }
211                    let Ok(signals) =
212                        self.fifo.wait_handle(signals, zx::MonotonicInstant::INFINITE)
213                    else {
214                        return;
215                    };
216                    if signals.contains(zx::Signals::USER_0) {
217                        return;
218                    }
219                    // Clear USER_1 signal if it's set.
220                    if signals.contains(zx::Signals::USER_1) {
221                        let _ = self.fifo.signal_handle(zx::Signals::USER_1, zx::Signals::empty());
222                    }
223                }
224                Err(_) => return,
225            }
226        }
227    }
228
229    fn handle_requests<'a>(&self, requests: impl Iterator<Item = &'a BlockFifoRequest>) {
230        let mut decoded_requests: [MaybeUninit<Request>; MAX_REQUESTS] =
231            unsafe { MaybeUninit::uninit().assume_init() };
232        let mut count = 0;
233        for request in requests {
234            if let Some(r) = self.helper.decode_fifo_request(request) {
235                let operation = match r.operation {
236                    Ok(Operation::CloseVmo) => {
237                        self.send_reply(r.request_tracking, zx::Status::OK);
238                        continue;
239                    }
240                    Ok(operation) => operation,
241                    Err(status) => {
242                        self.send_reply(r.request_tracking, status);
243                        continue;
244                    }
245                };
246                let RequestTracking { group_or_request, trace_flow_id } = r.request_tracking;
247                let mut request_id = group_or_request.into();
248                let vmo = if let Some(vmo) = r.vmo {
249                    let raw_handle = vmo.raw_handle();
250                    self.vmos.lock().unwrap().insert(request_id, vmo);
251                    request_id = request_id.with_vmo();
252                    UnownedVmo(raw_handle)
253                } else {
254                    UnownedVmo(zx::sys::ZX_HANDLE_INVALID)
255                };
256                decoded_requests[count].write(Request {
257                    request_id,
258                    operation,
259                    trace_flow_id,
260                    vmo,
261                });
262                count += 1;
263            }
264        }
265        if count > 0 {
266            unsafe {
267                (self.manager.callbacks.on_requests)(
268                    self.manager.callbacks.context,
269                    self,
270                    decoded_requests[0].as_mut_ptr(),
271                    count,
272                );
273            }
274        }
275    }
276
277    fn send_reply(&self, tracking: RequestTracking, status: zx::Status) {
278        let response = match self.helper.finish_fifo_request(tracking, status) {
279            Some(response) => response,
280            None => return,
281        };
282        let mut queue = self.queue.lock().unwrap();
283        if queue.responses.is_empty() {
284            match self.fifo.write_one(&response) {
285                Ok(()) => {
286                    return;
287                }
288                Err(_) => {
289                    // Wake `fifo_loop`.
290                    let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_1);
291                }
292            }
293        }
294        queue.responses.push_back(response);
295    }
296
297    fn terminate(&self) {
298        let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
299        self.abort_handle.abort();
300    }
301}
302
303impl Drop for Session {
304    fn drop(&mut self) {
305        let notify = {
306            let mut open_sessions = self.manager.open_sessions.lock().unwrap();
307            open_sessions.remove(&(self as *const _ as usize));
308            open_sessions.is_empty()
309        };
310        if notify {
311            self.manager.condvar.notify_all();
312        }
313    }
314}
315
316pub struct BlockServer {
317    server: super::BlockServer<SessionManager>,
318    ehandle: EHandle,
319    abort_handle: AbortHandle,
320}
321
322#[derive(Default)]
323struct ExecutorMailbox(Mutex<Mail>, Condvar);
324
325impl ExecutorMailbox {
326    /// Returns the old mail.
327    fn post(&self, mail: Mail) -> Mail {
328        let old = std::mem::replace(&mut *self.0.lock().unwrap(), mail);
329        self.1.notify_all();
330        old
331    }
332}
333
334type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
335
336#[derive(Default)]
337enum Mail {
338    #[default]
339    None,
340    Initialized(EHandle, AbortHandle),
341    AsyncShutdown(Box<BlockServer>, ShutdownCallback, *mut c_void),
342    Finished,
343}
344
345impl Drop for BlockServer {
346    fn drop(&mut self) {
347        self.abort_handle.abort();
348        let manager = &self.server.session_manager;
349        let mbox = manager.mbox.0.lock().unwrap();
350        let _unused = manager.mbox.1.wait_while(mbox, |mbox| !matches!(mbox, Mail::Finished));
351        manager.terminate();
352        debug_assert!(Arc::strong_count(manager) > 0);
353    }
354}
355
356#[repr(C)]
357pub struct PartitionInfo {
358    pub device_flags: u32,
359    pub start_block: u64,
360    pub block_count: u64,
361    pub block_size: u32,
362    pub type_guid: [u8; 16],
363    pub instance_guid: [u8; 16],
364    pub name: *const c_char,
365    pub flags: u64,
366}
367
368/// cbindgen:no-export
369#[allow(non_camel_case_types)]
370type zx_handle_t = zx::sys::zx_handle_t;
371
372/// cbindgen:no-export
373#[allow(non_camel_case_types)]
374type zx_status_t = zx::sys::zx_status_t;
375
376impl PartitionInfo {
377    unsafe fn to_rust(&self) -> super::DeviceInfo {
378        super::DeviceInfo::Partition(super::PartitionInfo {
379            device_flags: fblock::Flag::from_bits_truncate(self.device_flags),
380            block_range: Some(self.start_block..self.start_block + self.block_count),
381            type_guid: self.type_guid,
382            instance_guid: self.instance_guid,
383            name: if self.name.is_null() {
384                "".to_string()
385            } else {
386                String::from_utf8_lossy(CStr::from_ptr(self.name).to_bytes()).to_string()
387            },
388            flags: self.flags,
389        })
390    }
391}
392
393/// # Safety
394///
395/// All callbacks in `callbacks` must be safe.
396#[no_mangle]
397pub unsafe extern "C" fn block_server_new(
398    partition_info: &PartitionInfo,
399    callbacks: Callbacks,
400) -> *mut BlockServer {
401    let session_manager = Arc::new(SessionManager {
402        callbacks,
403        open_sessions: Mutex::default(),
404        condvar: Condvar::new(),
405        mbox: ExecutorMailbox::default(),
406        info: partition_info.to_rust(),
407    });
408
409    (session_manager.callbacks.start_thread)(
410        session_manager.callbacks.context,
411        Arc::into_raw(session_manager.clone()) as *const c_void,
412    );
413
414    let mbox = &session_manager.mbox;
415    let mail = {
416        let mail = mbox.0.lock().unwrap();
417        std::mem::replace(
418            &mut *mbox.1.wait_while(mail, |mail| matches!(mail, Mail::None)).unwrap(),
419            Mail::None,
420        )
421    };
422
423    let block_size = partition_info.block_size;
424    match mail {
425        Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
426            server: super::BlockServer::new(block_size, session_manager),
427            ehandle,
428            abort_handle,
429        })),
430        Mail::Finished => std::ptr::null_mut(),
431        _ => unreachable!(),
432    }
433}
434
435/// # Safety
436///
437/// `arg` must be the value passed to the `start_thread` callback.
438#[no_mangle]
439pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
440    let session_manager = &*(arg as *const SessionManager);
441
442    let mut executor = fasync::LocalExecutor::new();
443    let (abort_handle, registration) = AbortHandle::new_pair();
444
445    session_manager.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
446
447    let _ = executor.run_singlethreaded(Abortable::new(std::future::pending::<()>(), registration));
448}
449
450/// Called to delete the thread.  This *must* always be called, regardless of whether starting the
451/// thread is successful or not.
452///
453/// # Safety
454///
455/// `arg` must be the value passed to the `start_thread` callback.
456#[no_mangle]
457pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
458    let mail = {
459        let session_manager = Arc::from_raw(arg as *const SessionManager);
460        debug_assert!(Arc::strong_count(&session_manager) > 0);
461        session_manager.mbox.post(Mail::Finished)
462    };
463
464    if let Mail::AsyncShutdown(server, callback, arg) = mail {
465        std::mem::drop(server);
466        // SAFETY: Whoever supplied the callback must guarantee it's safe.
467        unsafe {
468            callback(arg);
469        }
470    }
471}
472
473/// # Safety
474///
475/// `block_server` must be valid.
476#[no_mangle]
477pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
478    let _ = Box::from_raw(block_server);
479}
480
481/// # Safety
482///
483/// `block_server` must be valid.
484#[no_mangle]
485pub unsafe extern "C" fn block_server_delete_async(
486    block_server: *mut BlockServer,
487    callback: ShutdownCallback,
488    arg: *mut c_void,
489) {
490    let block_server = Box::from_raw(block_server);
491    let session_manager = block_server.server.session_manager.clone();
492    let abort_handle = block_server.abort_handle.clone();
493    session_manager.mbox.post(Mail::AsyncShutdown(block_server, callback, arg));
494    abort_handle.abort();
495}
496
497/// Serves the Volume protocol for this server.  `handle` is consumed.
498///
499/// # Safety
500///
501/// `block_server` and `handle` must be valid.
502#[no_mangle]
503pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
504    let block_server = &*block_server;
505    let ehandle = &block_server.ehandle;
506    let handle = zx::Handle::from_raw(handle);
507    ehandle.global_scope().spawn(async move {
508        let _ = block_server
509            .server
510            .handle_requests(fvolume::VolumeRequestStream::from_channel(
511                fasync::Channel::from_channel(handle.into()),
512            ))
513            .await;
514    });
515}
516
517/// # Safety
518///
519/// `session` must be valid.
520#[no_mangle]
521pub unsafe extern "C" fn block_server_session_run(session: &Session) {
522    session.run();
523}
524
525/// # Safety
526///
527/// `session` must be valid.
528#[no_mangle]
529pub unsafe extern "C" fn block_server_session_release(session: &Session) {
530    session.terminate();
531    Arc::from_raw(session);
532}
533
534/// # Safety
535///
536/// `session` must be valid.
537#[no_mangle]
538pub unsafe extern "C" fn block_server_send_reply(
539    session: &Session,
540    request_id: RequestId,
541    trace_flow_id: Option<NonZero<u64>>,
542    status: zx_status_t,
543) {
544    if request_id.did_have_vmo() {
545        session.vmos.lock().unwrap().remove(&request_id);
546    }
547    session.send_reply(
548        RequestTracking { group_or_request: request_id.into(), trace_flow_id },
549        zx::Status::from_raw(status),
550    );
551}