1use super::{Operation, RequestId, TraceFlowId};
6use crate::{IntoOrchestrator, callback_interface};
7use fidl::endpoints::RequestStream;
8use fidl_fuchsia_storage_block as fblock;
9use fidl_fuchsia_storage_block::MAX_TRANSFER_UNBOUNDED;
10use fuchsia_async::{self as fasync, EHandle};
11use fuchsia_sync::{Condvar, Mutex};
12use futures::stream::AbortHandle;
13use std::borrow::{Borrow, Cow};
14use std::ffi::{CStr, c_char, c_void};
15use std::num::NonZero;
16use std::sync::Arc;
17
18#[repr(C)]
19pub struct Callbacks {
20 pub context: *mut c_void,
24 pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
28 pub on_new_session: unsafe extern "C" fn(
33 context: *mut c_void,
34 session: *const callback_interface::Session<InterfaceAdapter>,
35 ),
36 pub on_requests:
42 unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
43 pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
46}
47
48impl Callbacks {
49 #[allow(dead_code)]
50 fn log(&self, msg: &str) {
51 let msg = msg.as_bytes();
52 unsafe {
54 (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
55 }
56 }
57}
58
59#[allow(dead_code)]
61pub struct UnownedVmo(zx::sys::zx_handle_t);
62
63#[repr(C)]
64pub struct Request {
65 pub request_id: RequestId,
66 pub operation: Operation,
67 pub trace_flow_id: TraceFlowId,
68 pub vmo: UnownedVmo,
69}
70
71unsafe impl Send for Callbacks {}
72unsafe impl Sync for Callbacks {}
73
74pub struct InterfaceAdapter {
76 callbacks: Callbacks,
77 info: super::DeviceInfo,
78}
79
80impl callback_interface::Interface for InterfaceAdapter {
81 type Orchestrator = Orchestrator;
82
83 fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
84 Cow::Borrowed(&self.info)
85 }
86
87 fn spawn_session(&self, session: Arc<callback_interface::Session<Self>>) {
88 unsafe {
89 (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session));
90 }
91 }
92
93 fn on_requests(&self, requests: &[callback_interface::Request]) {
94 let mut c_requests = Vec::with_capacity(requests.len());
95 for req in requests {
96 c_requests.push(Request {
97 request_id: req.request_id,
98 operation: req.operation.clone(),
99 trace_flow_id: req.trace_flow_id,
100 vmo: UnownedVmo(
104 req.vmo.as_ref().map(|v| v.raw_handle()).unwrap_or(zx::sys::ZX_HANDLE_INVALID),
105 ),
106 });
107 }
108 unsafe {
109 (self.callbacks.on_requests)(
110 self.callbacks.context,
111 c_requests.as_mut_ptr(),
112 c_requests.len(),
113 )
114 }
115 }
116}
117
118#[repr(C)]
119pub struct PartitionInfo {
120 pub device_flags: u32,
121 pub start_block: u64,
122 pub block_count: u64,
123 pub block_size: u32,
124 pub type_guid: [u8; 16],
125 pub instance_guid: [u8; 16],
126 pub name: *const c_char,
127 pub flags: u64,
128 pub max_transfer_size: u32,
129}
130
131#[allow(non_camel_case_types)]
133type zx_handle_t = zx::sys::zx_handle_t;
134
135#[allow(non_camel_case_types)]
137type zx_status_t = zx::sys::zx_status_t;
138
139impl PartitionInfo {
140 unsafe fn to_rust(&self) -> super::DeviceInfo {
144 super::DeviceInfo::Partition(super::PartitionInfo {
145 device_flags: fblock::DeviceFlag::from_bits_truncate(self.device_flags),
146 block_range: Some(self.start_block..self.start_block + self.block_count),
147 type_guid: self.type_guid,
148 instance_guid: self.instance_guid,
149 name: if self.name.is_null() {
150 "".to_string()
151 } else {
152 String::from_utf8_lossy(unsafe { CStr::from_ptr(self.name).to_bytes() }).to_string()
153 },
154 flags: self.flags,
155 max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
156 NonZero::new(self.max_transfer_size / self.block_size)
157 } else {
158 None
159 },
160 })
161 }
162}
163
164struct ExecutorMailbox(Mutex<Mail>, Condvar);
165
166impl ExecutorMailbox {
167 fn post(&self, mail: Mail) -> Mail {
168 let old = std::mem::replace(&mut *self.0.lock(), mail);
169 self.1.notify_all();
170 old
171 }
172
173 fn new() -> Self {
174 Self(Mutex::default(), Condvar::new())
175 }
176}
177
178type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
179
180#[derive(Clone, Copy)]
181#[repr(transparent)]
182struct ContextPtr(*mut c_void);
183
184unsafe impl Send for ContextPtr {}
187unsafe impl Sync for ContextPtr {}
188
189#[derive(Default)]
190enum Mail {
191 #[default]
192 None,
193 Initialized(EHandle, AbortHandle),
194 AsyncShutdown(Box<BlockServer>, ShutdownCallback, ContextPtr),
195 Finished,
196}
197
198pub struct Orchestrator {
199 session_manager: callback_interface::SessionManager<InterfaceAdapter>,
200 mbox: ExecutorMailbox,
201}
202
203impl IntoOrchestrator for Arc<Orchestrator> {
204 type SM = callback_interface::SessionManager<InterfaceAdapter>;
205
206 fn into_orchestrator(self) -> Arc<Orchestrator> {
207 self
208 }
209}
210
211impl Borrow<callback_interface::SessionManager<InterfaceAdapter>> for Orchestrator {
212 fn borrow(&self) -> &callback_interface::SessionManager<InterfaceAdapter> {
213 &self.session_manager
214 }
215}
216
217pub struct BlockServer {
218 server: super::BlockServer<callback_interface::SessionManager<InterfaceAdapter>>,
219 ehandle: EHandle,
220 abort_handle: AbortHandle,
221 orchestrator: Arc<Orchestrator>,
222}
223
224impl Drop for BlockServer {
225 fn drop(&mut self) {
226 self.abort_handle.abort();
227 Borrow::<callback_interface::SessionManager<InterfaceAdapter>>::borrow(
228 self.orchestrator.as_ref(),
229 )
230 .terminate();
231 let mbox = &self.orchestrator.mbox;
232 let mut mail = mbox.0.lock();
233 mbox.1.wait_while(&mut mail, |mbox| !matches!(mbox, Mail::Finished));
234 }
235}
236
237#[unsafe(no_mangle)]
241pub unsafe extern "C" fn block_server_new(
242 partition_info: &PartitionInfo,
243 callbacks: Callbacks,
244) -> *mut BlockServer {
245 let start_thread = callbacks.start_thread;
246 let context = callbacks.context;
247
248 let session_manager = callback_interface::SessionManager::new(Arc::new(InterfaceAdapter {
249 callbacks,
250 info: unsafe { partition_info.to_rust() },
251 }));
252
253 let orchestrator = Arc::new(Orchestrator { session_manager, mbox: ExecutorMailbox::new() });
254
255 unsafe {
256 (start_thread)(context, Arc::into_raw(orchestrator.clone()) as *const c_void);
257 }
258
259 let mbox = &orchestrator.mbox;
260 let mail = {
261 let mut mail = mbox.0.lock();
262 mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
263 std::mem::replace(&mut *mail, Mail::None)
264 };
265
266 let block_size = partition_info.block_size;
267 match mail {
268 Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
269 server: super::BlockServer::new(block_size, orchestrator.clone()),
270 ehandle,
271 abort_handle,
272 orchestrator: orchestrator.clone(),
273 })),
274 Mail::Finished => std::ptr::null_mut(),
275 _ => unreachable!(),
276 }
277}
278
279#[unsafe(no_mangle)]
283pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
284 let orchestrator = unsafe { &*(arg as *const Orchestrator) };
285
286 let mut executor = fasync::LocalExecutor::default();
287 let (abort_handle, registration) = futures::stream::AbortHandle::new_pair();
288
289 orchestrator.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
290
291 let _ = executor.run_singlethreaded(futures::stream::Abortable::new(
292 std::future::pending::<()>(),
293 registration,
294 ));
295}
296
297#[unsafe(no_mangle)]
304pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
305 let mail = {
306 let orchestrator = unsafe { Arc::from_raw(arg as *const Orchestrator) };
307 orchestrator.mbox.post(Mail::Finished)
308 };
309
310 if let Mail::AsyncShutdown(server, callback, arg) = mail {
311 std::mem::drop(server);
312 unsafe {
314 callback(arg.0);
315 }
316 }
317}
318
319#[unsafe(no_mangle)]
323pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
324 let _ = unsafe { Box::from_raw(block_server) };
325}
326
327#[unsafe(no_mangle)]
331pub unsafe extern "C" fn block_server_delete_async(
332 block_server: *mut BlockServer,
333 callback: ShutdownCallback,
334 arg: *mut c_void,
335) {
336 let block_server = unsafe { Box::from_raw(block_server) };
337 let orchestrator = block_server.orchestrator.clone();
338 let abort_handle = block_server.abort_handle.clone();
339 orchestrator.mbox.post(Mail::AsyncShutdown(block_server, callback, ContextPtr(arg)));
340 abort_handle.abort();
341}
342
343#[unsafe(no_mangle)]
349pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
350 let block_server = unsafe { &*block_server };
351 let ehandle = &block_server.ehandle;
352 let handle = unsafe { zx::NullableHandle::from_raw(handle) };
353 ehandle.global_scope().spawn(async move {
354 let _ = block_server
355 .server
356 .handle_requests(fblock::BlockRequestStream::from_channel(
357 fasync::Channel::from_channel(handle.into()),
358 ))
359 .await;
360 });
361}
362
363#[unsafe(no_mangle)]
367pub unsafe extern "C" fn block_server_session_run(
368 session: &callback_interface::Session<InterfaceAdapter>,
369) {
370 let session = unsafe { Arc::from_raw(session) };
371 session.run();
372 let _ = Arc::into_raw(session);
373}
374
375#[unsafe(no_mangle)]
379pub unsafe extern "C" fn block_server_session_release(
380 session: &callback_interface::Session<InterfaceAdapter>,
381) {
382 session.terminate_async();
383 unsafe { Arc::from_raw(session) };
384}
385
386#[unsafe(no_mangle)]
390pub unsafe extern "C" fn block_server_send_reply(
391 block_server: &BlockServer,
392 request_id: RequestId,
393 status: zx_status_t,
394) {
395 block_server
396 .orchestrator
397 .session_manager
398 .complete_request(request_id, zx::Status::from_raw(status));
399}