1use super::{Operation, RequestId, TraceFlowId};
6use crate::{IntoOrchestrator, callback_interface};
7use fidl::endpoints::RequestStream;
8use fidl_fuchsia_storage_block as fblock;
9use fidl_fuchsia_storage_block::MAX_TRANSFER_UNBOUNDED;
10use fuchsia_async as fasync;
11use fuchsia_sync::{Condvar, Mutex};
12use futures::stream::{AbortHandle, Abortable};
13use std::borrow::{Borrow, Cow};
14use std::ffi::{CStr, c_char, c_void};
15use std::num::NonZero;
16use std::sync::Arc;
17
18pub type Session = callback_interface::Session<InterfaceAdapter>;
20
21#[repr(C)]
22pub struct Callbacks {
23 pub context: *mut c_void,
27 pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
33 pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
38 pub on_requests:
45 unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
46 pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
49}
50
51impl Callbacks {
52 #[allow(dead_code)]
53 fn log(&self, msg: &str) {
54 let msg = msg.as_bytes();
55 unsafe {
57 (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
58 }
59 }
60}
61
62#[allow(dead_code)]
64pub struct UnownedVmo(zx::sys::zx_handle_t);
65
66#[repr(C)]
67pub struct Request {
68 pub request_id: RequestId,
69 pub operation: Operation,
70 pub trace_flow_id: TraceFlowId,
71 pub vmo: UnownedVmo,
72}
73
74unsafe impl Send for Callbacks {}
75unsafe impl Sync for Callbacks {}
76
77pub struct InterfaceAdapter {
79 callbacks: Callbacks,
80 info: super::DeviceInfo,
81}
82
83impl callback_interface::Interface for InterfaceAdapter {
84 type Orchestrator = Orchestrator;
85
86 fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
87 Cow::Borrowed(&self.info)
88 }
89
90 fn spawn_session(&self, session: Arc<Session>) {
91 unsafe {
92 (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session));
93 }
94 }
95
96 fn on_requests(&self, requests: &[callback_interface::Request]) {
97 let mut c_requests = Vec::with_capacity(requests.len());
98 for req in requests {
99 c_requests.push(Request {
100 request_id: req.request_id,
101 operation: req.operation.clone(),
102 trace_flow_id: req.trace_flow_id,
103 vmo: UnownedVmo(
107 req.vmo.as_ref().map(|v| v.raw_handle()).unwrap_or(zx::sys::ZX_HANDLE_INVALID),
108 ),
109 });
110 }
111 unsafe {
112 (self.callbacks.on_requests)(
113 self.callbacks.context,
114 c_requests.as_mut_ptr(),
115 c_requests.len(),
116 )
117 }
118 }
119}
120
121#[repr(C)]
122pub struct PartitionInfo {
123 pub device_flags: u32,
124 pub start_block: u64,
125 pub block_count: u64,
126 pub block_size: u32,
127 pub type_guid: [u8; 16],
128 pub instance_guid: [u8; 16],
129 pub name: *const c_char,
130 pub flags: u64,
131 pub max_transfer_size: u32,
132}
133
134#[allow(non_camel_case_types)]
136type zx_handle_t = zx::sys::zx_handle_t;
137
138#[allow(non_camel_case_types)]
140type zx_status_t = zx::sys::zx_status_t;
141
142impl PartitionInfo {
143 unsafe fn to_rust(&self) -> super::DeviceInfo {
147 super::DeviceInfo::Partition(super::PartitionInfo {
148 device_flags: fblock::DeviceFlag::from_bits_truncate(self.device_flags),
149 block_range: Some(self.start_block..self.start_block + self.block_count),
150 type_guid: self.type_guid,
151 instance_guid: self.instance_guid,
152 name: if self.name.is_null() {
153 "".to_string()
154 } else {
155 String::from_utf8_lossy(unsafe { CStr::from_ptr(self.name).to_bytes() }).to_string()
156 },
157 flags: self.flags,
158 max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
159 NonZero::new(self.max_transfer_size / self.block_size)
160 } else {
161 None
162 },
163 })
164 }
165}
166
167struct ExecutorMailbox(Mutex<Mail>, Condvar);
168
169impl ExecutorMailbox {
170 fn post(&self, mail: Mail) -> Mail {
171 let old = std::mem::replace(&mut *self.0.lock(), mail);
172 self.1.notify_all();
173 old
174 }
175
176 fn new() -> Self {
177 Self(Mutex::default(), Condvar::new())
178 }
179}
180
181type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
182
183#[derive(Default)]
184enum Mail {
185 #[default]
186 None,
187 Initialized(fasync::ScopeHandle, AbortHandle),
188 AsyncShutdown(*const BlockServer, ShutdownCallback, *mut c_void),
189 ThreadFinished(*const BlockServer, ShutdownCallback, *mut c_void),
190 Finished,
191}
192
193unsafe impl Send for Mail {}
195
196pub struct Orchestrator {
197 session_manager: callback_interface::SessionManager<InterfaceAdapter>,
198 mbox: ExecutorMailbox,
199}
200
201impl IntoOrchestrator for Arc<Orchestrator> {
202 type SM = callback_interface::SessionManager<InterfaceAdapter>;
203
204 fn into_orchestrator(self) -> Arc<Orchestrator> {
205 self
206 }
207}
208
209impl Borrow<callback_interface::SessionManager<InterfaceAdapter>> for Orchestrator {
210 fn borrow(&self) -> &callback_interface::SessionManager<InterfaceAdapter> {
211 &self.session_manager
212 }
213}
214
215pub struct BlockServer {
216 server: super::BlockServer<callback_interface::SessionManager<InterfaceAdapter>>,
217 scope: fasync::ScopeHandle,
218 abort_handle: AbortHandle,
219 orchestrator: Arc<Orchestrator>,
220}
221
222#[unsafe(no_mangle)]
229pub unsafe extern "C" fn block_server_new(
230 partition_info: &PartitionInfo,
231 callbacks: Callbacks,
232) -> *mut BlockServer {
233 let start_thread = callbacks.start_thread;
234 let context = callbacks.context;
235
236 let session_manager = callback_interface::SessionManager::new(Arc::new(InterfaceAdapter {
237 callbacks,
238 info: unsafe { partition_info.to_rust() },
239 }));
240
241 let orchestrator = Arc::new(Orchestrator { session_manager, mbox: ExecutorMailbox::new() });
242
243 unsafe {
244 (start_thread)(context, Arc::into_raw(orchestrator.clone()) as *const c_void);
245 }
246
247 let mbox = &orchestrator.mbox;
248 let mail = {
249 let mut mail = mbox.0.lock();
250 mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
251 std::mem::replace(&mut *mail, Mail::None)
252 };
253
254 let block_size = partition_info.block_size;
255 match mail {
256 Mail::Initialized(scope, abort_handle) => Box::into_raw(Box::new(BlockServer {
257 server: super::BlockServer::new(block_size, orchestrator.clone()),
258 scope,
259 abort_handle,
260 orchestrator: orchestrator.clone(),
261 })),
262 Mail::Finished => std::ptr::null_mut(),
263 _ => unreachable!(),
264 }
265}
266
267#[unsafe(no_mangle)]
276pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
277 let orchestrator = unsafe { &*(arg as *const Orchestrator) };
278
279 let mut executor = fasync::LocalExecutor::default();
280 let scope = fasync::Scope::new();
281
282 let (abort_handle, registration) = AbortHandle::new_pair();
285 let root_task = scope.spawn(async move {
286 let _ = Abortable::new(std::future::pending::<()>(), registration).await;
287 });
288 orchestrator.mbox.post(Mail::Initialized(scope.clone(), abort_handle));
289
290 let _ = executor.run_singlethreaded(root_task);
293
294 {
296 let mut mbox = orchestrator.mbox.0.lock();
297 let mail = std::mem::take(&mut *mbox);
298 if let Mail::AsyncShutdown(block_server, callback, arg) = mail {
299 *mbox = Mail::ThreadFinished(block_server, callback, arg);
300 orchestrator.mbox.1.notify_all();
301 } else {
302 *mbox = mail;
303 }
304 }
305
306 let _ = executor.run_singlethreaded(scope.cancel());
308
309 orchestrator.session_manager.terminate();
313}
314
315#[unsafe(no_mangle)]
322pub unsafe extern "C" fn block_server_thread_release(arg: *const c_void) {
323 let orchestrator = unsafe { Arc::from_raw(arg as *const Orchestrator) };
325
326 let mail = orchestrator.mbox.post(Mail::Finished);
327 match mail {
328 Mail::None | Mail::Finished => {}
329 Mail::ThreadFinished(block_server, callback, arg) => {
330 let _ = unsafe { Box::from_raw(block_server as *mut BlockServer) };
333
334 unsafe {
336 callback(arg);
337 }
338 }
339 _ => panic!("block_server_thread_release called while thread is still running"),
340 }
341}
342
343#[unsafe(no_mangle)]
348pub unsafe extern "C" fn block_server_delete(block_server: *const BlockServer) {
349 {
350 let server = unsafe { &*block_server };
352
353 server.abort_handle.abort();
359 {
360 let mbox = &server.orchestrator.mbox;
361 let mut mail = mbox.0.lock();
362 mbox.1.wait_while(&mut mail, |mbox| !matches!(mbox, Mail::Finished));
363 }
364
365 Borrow::<callback_interface::SessionManager<InterfaceAdapter>>::borrow(
367 server.orchestrator.as_ref(),
368 )
369 .terminate();
370 }
371
372 let _ = unsafe { Box::from_raw(block_server as *mut BlockServer) };
374}
375
376#[unsafe(no_mangle)]
381pub unsafe extern "C" fn block_server_delete_async(
382 block_server: *const BlockServer,
383 callback: ShutdownCallback,
384 arg: *mut c_void,
385) {
386 let abort_handle = {
387 let server = unsafe { &*block_server };
389
390 assert!(!matches!(
393 server.orchestrator.mbox.post(Mail::AsyncShutdown(block_server, callback, arg)),
394 Mail::Finished
395 ));
396
397 server.abort_handle.clone()
398 };
399
400 abort_handle.abort();
402}
403
404#[unsafe(no_mangle)]
410pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
411 let block_server = unsafe { &*block_server };
412 let handle = unsafe { zx::NullableHandle::from_raw(handle) };
413 block_server.scope.spawn(async move {
414 let _ = block_server
415 .server
416 .handle_requests(fblock::BlockRequestStream::from_channel(
417 fasync::Channel::from_channel(handle.into()),
418 ))
419 .await;
420 });
421}
422
423#[unsafe(no_mangle)]
427pub unsafe extern "C" fn block_server_session_run(session: &Session) {
428 let session = unsafe { Arc::from_raw(session) };
429 session.run();
430 let _ = Arc::into_raw(session);
431}
432
433#[unsafe(no_mangle)]
437pub unsafe extern "C" fn block_server_session_release(session: &Session) {
438 session.terminate_async();
439 unsafe { Arc::from_raw(session) };
440}
441
442#[unsafe(no_mangle)]
446pub unsafe extern "C" fn block_server_send_reply(
447 block_server: &BlockServer,
448 request_id: RequestId,
449 status: zx_status_t,
450) {
451 block_server
452 .orchestrator
453 .session_manager
454 .complete_request(request_id, zx::Status::from_raw(status));
455}