1use super::{Operation, RequestId, TraceFlowId};
6use crate::{IntoOrchestrator, callback_interface};
7use fidl::endpoints::RequestStream;
8use fidl_fuchsia_storage_block as fblock;
9use fidl_fuchsia_storage_block::MAX_TRANSFER_UNBOUNDED;
10use fuchsia_async::{self as fasync, EHandle};
11use fuchsia_sync::{Condvar, Mutex};
12use futures::stream::AbortHandle;
13use std::borrow::{Borrow, Cow};
14use std::ffi::{CStr, c_char, c_void};
15use std::num::NonZero;
16use std::sync::Arc;
17
18#[repr(C)]
19pub struct Callbacks {
20 pub context: *mut c_void,
24 pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
28 pub on_new_session: unsafe extern "C" fn(
33 context: *mut c_void,
34 session: *const callback_interface::Session<InterfaceAdapter>,
35 ),
36 pub on_requests:
43 unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
44 pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
47}
48
49impl Callbacks {
50 #[allow(dead_code)]
51 fn log(&self, msg: &str) {
52 let msg = msg.as_bytes();
53 unsafe {
55 (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
56 }
57 }
58}
59
60#[allow(dead_code)]
62pub struct UnownedVmo(zx::sys::zx_handle_t);
63
64#[repr(C)]
65pub struct Request {
66 pub request_id: RequestId,
67 pub operation: Operation,
68 pub trace_flow_id: TraceFlowId,
69 pub vmo: UnownedVmo,
70}
71
72unsafe impl Send for Callbacks {}
73unsafe impl Sync for Callbacks {}
74
75pub struct InterfaceAdapter {
77 callbacks: Callbacks,
78 info: super::DeviceInfo,
79}
80
81impl callback_interface::Interface for InterfaceAdapter {
82 type Orchestrator = Orchestrator;
83
84 fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
85 Cow::Borrowed(&self.info)
86 }
87
88 fn spawn_session(&self, session: Arc<callback_interface::Session<Self>>) {
89 unsafe {
90 (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session));
91 }
92 }
93
94 fn on_requests(&self, requests: &[callback_interface::Request]) {
95 let mut c_requests = Vec::with_capacity(requests.len());
96 for req in requests {
97 c_requests.push(Request {
98 request_id: req.request_id,
99 operation: req.operation.clone(),
100 trace_flow_id: req.trace_flow_id,
101 vmo: UnownedVmo(
105 req.vmo.as_ref().map(|v| v.raw_handle()).unwrap_or(zx::sys::ZX_HANDLE_INVALID),
106 ),
107 });
108 }
109 unsafe {
110 (self.callbacks.on_requests)(
111 self.callbacks.context,
112 c_requests.as_mut_ptr(),
113 c_requests.len(),
114 )
115 }
116 }
117}
118
119#[repr(C)]
120pub struct PartitionInfo {
121 pub device_flags: u32,
122 pub start_block: u64,
123 pub block_count: u64,
124 pub block_size: u32,
125 pub type_guid: [u8; 16],
126 pub instance_guid: [u8; 16],
127 pub name: *const c_char,
128 pub flags: u64,
129 pub max_transfer_size: u32,
130}
131
132#[allow(non_camel_case_types)]
134type zx_handle_t = zx::sys::zx_handle_t;
135
136#[allow(non_camel_case_types)]
138type zx_status_t = zx::sys::zx_status_t;
139
140impl PartitionInfo {
141 unsafe fn to_rust(&self) -> super::DeviceInfo {
145 super::DeviceInfo::Partition(super::PartitionInfo {
146 device_flags: fblock::DeviceFlag::from_bits_truncate(self.device_flags),
147 block_range: Some(self.start_block..self.start_block + self.block_count),
148 type_guid: self.type_guid,
149 instance_guid: self.instance_guid,
150 name: if self.name.is_null() {
151 "".to_string()
152 } else {
153 String::from_utf8_lossy(unsafe { CStr::from_ptr(self.name).to_bytes() }).to_string()
154 },
155 flags: self.flags,
156 max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
157 NonZero::new(self.max_transfer_size / self.block_size)
158 } else {
159 None
160 },
161 })
162 }
163}
164
165struct ExecutorMailbox(Mutex<Mail>, Condvar);
166
167impl ExecutorMailbox {
168 fn post(&self, mail: Mail) -> Mail {
169 let old = std::mem::replace(&mut *self.0.lock(), mail);
170 self.1.notify_all();
171 old
172 }
173
174 fn new() -> Self {
175 Self(Mutex::default(), Condvar::new())
176 }
177}
178
179type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
180
181#[derive(Clone, Copy)]
182#[repr(transparent)]
183struct ContextPtr(*mut c_void);
184
185unsafe impl Send for ContextPtr {}
188unsafe impl Sync for ContextPtr {}
189
190#[derive(Default)]
191enum Mail {
192 #[default]
193 None,
194 Initialized(EHandle, AbortHandle),
195 AsyncShutdown(Box<BlockServer>, ShutdownCallback, ContextPtr),
196 Finished,
197}
198
199pub struct Orchestrator {
200 session_manager: callback_interface::SessionManager<InterfaceAdapter>,
201 mbox: ExecutorMailbox,
202}
203
204impl IntoOrchestrator for Arc<Orchestrator> {
205 type SM = callback_interface::SessionManager<InterfaceAdapter>;
206
207 fn into_orchestrator(self) -> Arc<Orchestrator> {
208 self
209 }
210}
211
212impl Borrow<callback_interface::SessionManager<InterfaceAdapter>> for Orchestrator {
213 fn borrow(&self) -> &callback_interface::SessionManager<InterfaceAdapter> {
214 &self.session_manager
215 }
216}
217
218pub struct BlockServer {
219 server: super::BlockServer<callback_interface::SessionManager<InterfaceAdapter>>,
220 ehandle: EHandle,
221 abort_handle: AbortHandle,
222 orchestrator: Arc<Orchestrator>,
223}
224
225impl Drop for BlockServer {
226 fn drop(&mut self) {
227 self.abort_handle.abort();
228 Borrow::<callback_interface::SessionManager<InterfaceAdapter>>::borrow(
229 self.orchestrator.as_ref(),
230 )
231 .terminate();
232 let mbox = &self.orchestrator.mbox;
233 let mut mail = mbox.0.lock();
234 mbox.1.wait_while(&mut mail, |mbox| !matches!(mbox, Mail::Finished));
235 }
236}
237
238#[unsafe(no_mangle)]
242pub unsafe extern "C" fn block_server_new(
243 partition_info: &PartitionInfo,
244 callbacks: Callbacks,
245) -> *mut BlockServer {
246 let start_thread = callbacks.start_thread;
247 let context = callbacks.context;
248
249 let session_manager = callback_interface::SessionManager::new(Arc::new(InterfaceAdapter {
250 callbacks,
251 info: unsafe { partition_info.to_rust() },
252 }));
253
254 let orchestrator = Arc::new(Orchestrator { session_manager, mbox: ExecutorMailbox::new() });
255
256 unsafe {
257 (start_thread)(context, Arc::into_raw(orchestrator.clone()) as *const c_void);
258 }
259
260 let mbox = &orchestrator.mbox;
261 let mail = {
262 let mut mail = mbox.0.lock();
263 mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
264 std::mem::replace(&mut *mail, Mail::None)
265 };
266
267 let block_size = partition_info.block_size;
268 match mail {
269 Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
270 server: super::BlockServer::new(block_size, orchestrator.clone()),
271 ehandle,
272 abort_handle,
273 orchestrator: orchestrator.clone(),
274 })),
275 Mail::Finished => std::ptr::null_mut(),
276 _ => unreachable!(),
277 }
278}
279
280#[unsafe(no_mangle)]
284pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
285 let orchestrator = unsafe { &*(arg as *const Orchestrator) };
286
287 let mut executor = fasync::LocalExecutor::default();
288 let (abort_handle, registration) = futures::stream::AbortHandle::new_pair();
289
290 orchestrator.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
291
292 let _ = executor.run_singlethreaded(futures::stream::Abortable::new(
293 std::future::pending::<()>(),
294 registration,
295 ));
296}
297
298#[unsafe(no_mangle)]
305pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
306 let mail = {
307 let orchestrator = unsafe { Arc::from_raw(arg as *const Orchestrator) };
308 orchestrator.mbox.post(Mail::Finished)
309 };
310
311 if let Mail::AsyncShutdown(server, callback, arg) = mail {
312 std::mem::drop(server);
313 unsafe {
315 callback(arg.0);
316 }
317 }
318}
319
320#[unsafe(no_mangle)]
324pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
325 let _ = unsafe { Box::from_raw(block_server) };
326}
327
328#[unsafe(no_mangle)]
332pub unsafe extern "C" fn block_server_delete_async(
333 block_server: *mut BlockServer,
334 callback: ShutdownCallback,
335 arg: *mut c_void,
336) {
337 let block_server = unsafe { Box::from_raw(block_server) };
338 let orchestrator = block_server.orchestrator.clone();
339 let abort_handle = block_server.abort_handle.clone();
340 orchestrator.mbox.post(Mail::AsyncShutdown(block_server, callback, ContextPtr(arg)));
341 abort_handle.abort();
342}
343
344#[unsafe(no_mangle)]
350pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
351 let block_server = unsafe { &*block_server };
352 let ehandle = &block_server.ehandle;
353 let handle = unsafe { zx::NullableHandle::from_raw(handle) };
354 ehandle.global_scope().spawn(async move {
355 let _ = block_server
356 .server
357 .handle_requests(fblock::BlockRequestStream::from_channel(
358 fasync::Channel::from_channel(handle.into()),
359 ))
360 .await;
361 });
362}
363
364#[unsafe(no_mangle)]
368pub unsafe extern "C" fn block_server_session_run(
369 session: &callback_interface::Session<InterfaceAdapter>,
370) {
371 let session = unsafe { Arc::from_raw(session) };
372 session.run();
373 let _ = Arc::into_raw(session);
374}
375
376#[unsafe(no_mangle)]
380pub unsafe extern "C" fn block_server_session_release(
381 session: &callback_interface::Session<InterfaceAdapter>,
382) {
383 session.terminate_async();
384 unsafe { Arc::from_raw(session) };
385}
386
387#[unsafe(no_mangle)]
391pub unsafe extern "C" fn block_server_send_reply(
392 block_server: &BlockServer,
393 request_id: RequestId,
394 status: zx_status_t,
395) {
396 block_server
397 .orchestrator
398 .session_manager
399 .complete_request(request_id, zx::Status::from_raw(status));
400}