1use super::{IntoSessionManager, OffsetMap, Operation, RequestId, RequestTracking, SessionHelper};
6use anyhow::Error;
7use block_protocol::{BlockFifoRequest, BlockFifoResponse};
8use fidl::endpoints::RequestStream;
9use fuchsia_async::{self as fasync, EHandle};
10use futures::stream::{AbortHandle, Abortable};
11use futures::TryStreamExt;
12use std::borrow::Cow;
13use std::collections::{HashMap, VecDeque};
14use std::ffi::{c_char, c_void, CStr};
15use std::mem::MaybeUninit;
16use std::num::NonZero;
17use std::sync::{Arc, Condvar, Mutex, Weak};
18use zx::{self as zx, AsHandleRef as _};
19use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
20
21pub struct SessionManager {
22 callbacks: Callbacks,
23 open_sessions: Mutex<HashMap<usize, Weak<Session>>>,
24 condvar: Condvar,
25 mbox: ExecutorMailbox,
26 info: super::DeviceInfo,
27}
28
29unsafe impl Send for SessionManager {}
30unsafe impl Sync for SessionManager {}
31
32impl SessionManager {
33 fn terminate(&self) {
34 {
35 #[allow(clippy::collection_is_never_read)]
38 let mut terminated_sessions = Vec::new();
39 for (_, session) in &*self.open_sessions.lock().unwrap() {
40 if let Some(session) = session.upgrade() {
41 session.terminate();
42 terminated_sessions.push(session);
43 }
44 }
45 }
46 let _guard =
47 self.condvar.wait_while(self.open_sessions.lock().unwrap(), |s| !s.is_empty()).unwrap();
48 }
49}
50
51impl super::SessionManager for SessionManager {
52 async fn on_attach_vmo(self: Arc<Self>, _vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
53 Ok(())
54 }
55
56 async fn open_session(
57 self: Arc<Self>,
58 mut stream: fblock::SessionRequestStream,
59 offset_map: Option<OffsetMap>,
60 block_size: u32,
61 ) -> Result<(), Error> {
62 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
63 let (abort_handle, registration) = AbortHandle::new_pair();
64 let session = Arc::new(Session {
65 manager: self.clone(),
66 helper,
67 fifo,
68 queue: Mutex::default(),
69 vmos: Mutex::default(),
70 abort_handle,
71 });
72 self.open_sessions
73 .lock()
74 .unwrap()
75 .insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
76 unsafe {
77 (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session.clone()));
78 }
79
80 let result = Abortable::new(
81 async {
82 while let Some(request) = stream.try_next().await? {
83 session.helper.handle_request(request).await?;
84 }
85 Ok(())
86 },
87 registration,
88 )
89 .await
90 .unwrap_or_else(|e| Err(e.into()));
91
92 let _ = session.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
93
94 result
95 }
96
97 async fn get_info(&self) -> Result<Cow<'_, super::DeviceInfo>, zx::Status> {
98 Ok(Cow::Borrowed(&self.info))
99 }
100}
101
102impl Drop for SessionManager {
103 fn drop(&mut self) {
104 self.terminate();
105 }
106}
107
108impl IntoSessionManager for Arc<SessionManager> {
109 type SM = SessionManager;
110
111 fn into_session_manager(self) -> Self {
112 self
113 }
114}
115
116#[repr(C)]
117pub struct Callbacks {
118 pub context: *mut c_void,
119 pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
120 pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
121 pub on_requests: unsafe extern "C" fn(
122 context: *mut c_void,
123 session: *const Session,
124 requests: *mut Request,
125 request_count: usize,
126 ),
127 pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
128}
129
130impl Callbacks {
131 #[allow(dead_code)]
132 fn log(&self, msg: &str) {
133 let msg = msg.as_bytes();
134 unsafe {
136 (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
137 }
138 }
139}
140
141#[allow(dead_code)]
143pub struct UnownedVmo(zx::sys::zx_handle_t);
144
145#[repr(C)]
146pub struct Request {
147 pub request_id: RequestId,
148 pub operation: Operation,
149 pub trace_flow_id: Option<NonZero<u64>>,
150 pub vmo: UnownedVmo,
151}
152
153unsafe impl Send for Callbacks {}
154unsafe impl Sync for Callbacks {}
155
156pub struct Session {
157 manager: Arc<SessionManager>,
158 helper: SessionHelper<SessionManager>,
159 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
160 queue: Mutex<SessionQueue>,
161 vmos: Mutex<HashMap<RequestId, Arc<zx::Vmo>>>,
162 abort_handle: AbortHandle,
163}
164
165#[derive(Default)]
166struct SessionQueue {
167 responses: VecDeque<BlockFifoResponse>,
168}
169
170pub const MAX_REQUESTS: usize = 64;
171
172impl Session {
173 fn run(&self) {
174 self.fifo_loop();
175 self.abort_handle.abort();
176 }
177
178 fn fifo_loop(&self) {
179 let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
180
181 loop {
182 let is_queue_empty = {
184 let mut queue = self.queue.lock().unwrap();
185 while !queue.responses.is_empty() {
186 let (front, _) = queue.responses.as_slices();
187 match self.fifo.write(front) {
188 Ok(count) => {
189 let full = count < front.len();
190 queue.responses.drain(..count);
191 if full {
192 break;
193 }
194 }
195 Err(zx::Status::SHOULD_WAIT) => break,
196 Err(_) => return,
197 }
198 }
199 queue.responses.is_empty()
200 };
201
202 match self.fifo.read_uninit(&mut requests) {
204 Ok(valid_requests) => self.handle_requests(valid_requests.iter()),
205 Err(zx::Status::SHOULD_WAIT) => {
206 let mut signals =
207 zx::Signals::OBJECT_READABLE | zx::Signals::USER_0 | zx::Signals::USER_1;
208 if !is_queue_empty {
209 signals |= zx::Signals::OBJECT_WRITABLE;
210 }
211 let Ok(signals) =
212 self.fifo.wait_handle(signals, zx::MonotonicInstant::INFINITE)
213 else {
214 return;
215 };
216 if signals.contains(zx::Signals::USER_0) {
217 return;
218 }
219 if signals.contains(zx::Signals::USER_1) {
221 let _ = self.fifo.signal_handle(zx::Signals::USER_1, zx::Signals::empty());
222 }
223 }
224 Err(_) => return,
225 }
226 }
227 }
228
229 fn handle_requests<'a>(&self, requests: impl Iterator<Item = &'a BlockFifoRequest>) {
230 let mut decoded_requests: [MaybeUninit<Request>; MAX_REQUESTS] =
231 unsafe { MaybeUninit::uninit().assume_init() };
232 let mut count = 0;
233 for request in requests {
234 if let Some(r) = self.helper.decode_fifo_request(request) {
235 let operation = match r.operation {
236 Ok(Operation::CloseVmo) => {
237 self.send_reply(r.request_tracking, zx::Status::OK);
238 continue;
239 }
240 Ok(operation) => operation,
241 Err(status) => {
242 self.send_reply(r.request_tracking, status);
243 continue;
244 }
245 };
246 let RequestTracking { group_or_request, trace_flow_id } = r.request_tracking;
247 let mut request_id = group_or_request.into();
248 let vmo = if let Some(vmo) = r.vmo {
249 let raw_handle = vmo.raw_handle();
250 self.vmos.lock().unwrap().insert(request_id, vmo);
251 request_id = request_id.with_vmo();
252 UnownedVmo(raw_handle)
253 } else {
254 UnownedVmo(zx::sys::ZX_HANDLE_INVALID)
255 };
256 decoded_requests[count].write(Request {
257 request_id,
258 operation,
259 trace_flow_id,
260 vmo,
261 });
262 count += 1;
263 }
264 }
265 if count > 0 {
266 unsafe {
267 (self.manager.callbacks.on_requests)(
268 self.manager.callbacks.context,
269 self,
270 decoded_requests[0].as_mut_ptr(),
271 count,
272 );
273 }
274 }
275 }
276
277 fn send_reply(&self, tracking: RequestTracking, status: zx::Status) {
278 let response = match self.helper.finish_fifo_request(tracking, status) {
279 Some(response) => response,
280 None => return,
281 };
282 let mut queue = self.queue.lock().unwrap();
283 if queue.responses.is_empty() {
284 match self.fifo.write_one(&response) {
285 Ok(()) => {
286 return;
287 }
288 Err(_) => {
289 let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_1);
291 }
292 }
293 }
294 queue.responses.push_back(response);
295 }
296
297 fn terminate(&self) {
298 let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
299 self.abort_handle.abort();
300 }
301}
302
303impl Drop for Session {
304 fn drop(&mut self) {
305 let notify = {
306 let mut open_sessions = self.manager.open_sessions.lock().unwrap();
307 open_sessions.remove(&(self as *const _ as usize));
308 open_sessions.is_empty()
309 };
310 if notify {
311 self.manager.condvar.notify_all();
312 }
313 }
314}
315
316pub struct BlockServer {
317 server: super::BlockServer<SessionManager>,
318 ehandle: EHandle,
319 abort_handle: AbortHandle,
320}
321
322#[derive(Default)]
323struct ExecutorMailbox(Mutex<Mail>, Condvar);
324
325impl ExecutorMailbox {
326 fn post(&self, mail: Mail) -> Mail {
328 let old = std::mem::replace(&mut *self.0.lock().unwrap(), mail);
329 self.1.notify_all();
330 old
331 }
332}
333
334type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
335
336#[derive(Default)]
337enum Mail {
338 #[default]
339 None,
340 Initialized(EHandle, AbortHandle),
341 AsyncShutdown(Box<BlockServer>, ShutdownCallback, *mut c_void),
342 Finished,
343}
344
345impl Drop for BlockServer {
346 fn drop(&mut self) {
347 self.abort_handle.abort();
348 let manager = &self.server.session_manager;
349 let mbox = manager.mbox.0.lock().unwrap();
350 let _unused = manager.mbox.1.wait_while(mbox, |mbox| !matches!(mbox, Mail::Finished));
351 manager.terminate();
352 debug_assert!(Arc::strong_count(manager) > 0);
353 }
354}
355
356#[repr(C)]
357pub struct PartitionInfo {
358 pub device_flags: u32,
359 pub start_block: u64,
360 pub block_count: u64,
361 pub block_size: u32,
362 pub type_guid: [u8; 16],
363 pub instance_guid: [u8; 16],
364 pub name: *const c_char,
365 pub flags: u64,
366}
367
368#[allow(non_camel_case_types)]
370type zx_handle_t = zx::sys::zx_handle_t;
371
372#[allow(non_camel_case_types)]
374type zx_status_t = zx::sys::zx_status_t;
375
376impl PartitionInfo {
377 unsafe fn to_rust(&self) -> super::DeviceInfo {
378 super::DeviceInfo::Partition(super::PartitionInfo {
379 device_flags: fblock::Flag::from_bits_truncate(self.device_flags),
380 block_range: Some(self.start_block..self.start_block + self.block_count),
381 type_guid: self.type_guid,
382 instance_guid: self.instance_guid,
383 name: if self.name.is_null() {
384 "".to_string()
385 } else {
386 String::from_utf8_lossy(CStr::from_ptr(self.name).to_bytes()).to_string()
387 },
388 flags: self.flags,
389 })
390 }
391}
392
393#[no_mangle]
397pub unsafe extern "C" fn block_server_new(
398 partition_info: &PartitionInfo,
399 callbacks: Callbacks,
400) -> *mut BlockServer {
401 let session_manager = Arc::new(SessionManager {
402 callbacks,
403 open_sessions: Mutex::default(),
404 condvar: Condvar::new(),
405 mbox: ExecutorMailbox::default(),
406 info: partition_info.to_rust(),
407 });
408
409 (session_manager.callbacks.start_thread)(
410 session_manager.callbacks.context,
411 Arc::into_raw(session_manager.clone()) as *const c_void,
412 );
413
414 let mbox = &session_manager.mbox;
415 let mail = {
416 let mail = mbox.0.lock().unwrap();
417 std::mem::replace(
418 &mut *mbox.1.wait_while(mail, |mail| matches!(mail, Mail::None)).unwrap(),
419 Mail::None,
420 )
421 };
422
423 let block_size = partition_info.block_size;
424 match mail {
425 Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
426 server: super::BlockServer::new(block_size, session_manager),
427 ehandle,
428 abort_handle,
429 })),
430 Mail::Finished => std::ptr::null_mut(),
431 _ => unreachable!(),
432 }
433}
434
435#[no_mangle]
439pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
440 let session_manager = &*(arg as *const SessionManager);
441
442 let mut executor = fasync::LocalExecutor::new();
443 let (abort_handle, registration) = AbortHandle::new_pair();
444
445 session_manager.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
446
447 let _ = executor.run_singlethreaded(Abortable::new(std::future::pending::<()>(), registration));
448}
449
450#[no_mangle]
457pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
458 let mail = {
459 let session_manager = Arc::from_raw(arg as *const SessionManager);
460 debug_assert!(Arc::strong_count(&session_manager) > 0);
461 session_manager.mbox.post(Mail::Finished)
462 };
463
464 if let Mail::AsyncShutdown(server, callback, arg) = mail {
465 std::mem::drop(server);
466 unsafe {
468 callback(arg);
469 }
470 }
471}
472
473#[no_mangle]
477pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
478 let _ = Box::from_raw(block_server);
479}
480
481#[no_mangle]
485pub unsafe extern "C" fn block_server_delete_async(
486 block_server: *mut BlockServer,
487 callback: ShutdownCallback,
488 arg: *mut c_void,
489) {
490 let block_server = Box::from_raw(block_server);
491 let session_manager = block_server.server.session_manager.clone();
492 let abort_handle = block_server.abort_handle.clone();
493 session_manager.mbox.post(Mail::AsyncShutdown(block_server, callback, arg));
494 abort_handle.abort();
495}
496
497#[no_mangle]
503pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
504 let block_server = &*block_server;
505 let ehandle = &block_server.ehandle;
506 let handle = zx::Handle::from_raw(handle);
507 ehandle.global_scope().spawn(async move {
508 let _ = block_server
509 .server
510 .handle_requests(fvolume::VolumeRequestStream::from_channel(
511 fasync::Channel::from_channel(handle.into()),
512 ))
513 .await;
514 });
515}
516
517#[no_mangle]
521pub unsafe extern "C" fn block_server_session_run(session: &Session) {
522 session.run();
523}
524
525#[no_mangle]
529pub unsafe extern "C" fn block_server_session_release(session: &Session) {
530 session.terminate();
531 Arc::from_raw(session);
532}
533
534#[no_mangle]
538pub unsafe extern "C" fn block_server_send_reply(
539 session: &Session,
540 request_id: RequestId,
541 trace_flow_id: Option<NonZero<u64>>,
542 status: zx_status_t,
543) {
544 if request_id.did_have_vmo() {
545 session.vmos.lock().unwrap().remove(&request_id);
546 }
547 session.send_reply(
548 RequestTracking { group_or_request: request_id.into(), trace_flow_id },
549 zx::Status::from_raw(status),
550 );
551}