1use super::{
6 ActiveRequests, DecodedRequest, IntoSessionManager, OffsetMap, Operation, RequestId,
7 SessionHelper, TraceFlowId,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse};
11use fidl::endpoints::RequestStream;
12use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
13use fuchsia_async::{self as fasync, EHandle};
14use fuchsia_sync::{Condvar, Mutex};
15use futures::TryStreamExt;
16use futures::stream::{AbortHandle, Abortable};
17use std::borrow::Cow;
18use std::collections::{HashMap, VecDeque};
19use std::ffi::{CStr, c_char, c_void};
20use std::mem::MaybeUninit;
21use std::num::NonZero;
22use std::sync::{Arc, Weak};
23use zx::{self as zx, AsHandleRef as _};
24use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
25
26pub struct SessionManager {
27 callbacks: Callbacks,
28 open_sessions: Mutex<HashMap<usize, Weak<Session>>>,
29 active_requests: ActiveRequests<Arc<Session>>,
30 condvar: Condvar,
31 mbox: ExecutorMailbox,
32 info: super::DeviceInfo,
33}
34
35unsafe impl Send for SessionManager {}
36unsafe impl Sync for SessionManager {}
37
38impl SessionManager {
39 fn complete_request(&self, request_id: RequestId, status: zx::Status) {
40 if let Some((session, response)) =
41 self.active_requests.complete_and_take_response(request_id, status)
42 {
43 session.send_response(response);
44 }
45 }
46
47 fn terminate(&self) {
48 {
49 #[allow(clippy::collection_is_never_read)]
52 let mut terminated_sessions = Vec::new();
53 for (_, session) in &*self.open_sessions.lock() {
54 if let Some(session) = session.upgrade() {
55 session.terminate();
56 terminated_sessions.push(session);
57 }
58 }
59 }
60 let mut guard = self.open_sessions.lock();
61 self.condvar.wait_while(&mut guard, |s| !s.is_empty());
62 }
63}
64
65impl super::SessionManager for SessionManager {
66 type Session = Arc<Session>;
67
68 async fn on_attach_vmo(self: Arc<Self>, _vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
69 Ok(())
70 }
71
72 async fn open_session(
73 self: Arc<Self>,
74 mut stream: fblock::SessionRequestStream,
75 offset_map: OffsetMap,
76 block_size: u32,
77 ) -> Result<(), Error> {
78 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
79 let (abort_handle, registration) = AbortHandle::new_pair();
80 let session = Arc::new(Session {
81 manager: self.clone(),
82 helper,
83 fifo,
84 queue: Mutex::default(),
85 abort_handle,
86 });
87 self.open_sessions.lock().insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
88 unsafe {
89 (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session.clone()));
90 }
91
92 let result = Abortable::new(
93 async {
94 while let Some(request) = stream.try_next().await? {
95 session.helper.handle_request(request).await?;
96 }
97 Ok(())
98 },
99 registration,
100 )
101 .await
102 .unwrap_or_else(|e| Err(e.into()));
103
104 let _ = session.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
105
106 result
107 }
108
109 async fn get_info(&self) -> Result<Cow<'_, super::DeviceInfo>, zx::Status> {
110 Ok(Cow::Borrowed(&self.info))
111 }
112
113 fn active_requests(&self) -> &ActiveRequests<Arc<Session>> {
114 &self.active_requests
115 }
116}
117
118impl Drop for SessionManager {
119 fn drop(&mut self) {
120 self.terminate();
121 }
122}
123
124impl IntoSessionManager for Arc<SessionManager> {
125 type SM = SessionManager;
126
127 fn into_session_manager(self) -> Self {
128 self
129 }
130}
131
132#[repr(C)]
133pub struct Callbacks {
134 pub context: *mut c_void,
135 pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
136 pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
137 pub on_requests:
138 unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
139 pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
140}
141
142impl Callbacks {
143 #[allow(dead_code)]
144 fn log(&self, msg: &str) {
145 let msg = msg.as_bytes();
146 unsafe {
148 (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
149 }
150 }
151}
152
153#[allow(dead_code)]
155pub struct UnownedVmo(zx::sys::zx_handle_t);
156
157#[repr(C)]
158pub struct Request {
159 pub request_id: RequestId,
160 pub operation: Operation,
161 pub trace_flow_id: TraceFlowId,
162 pub vmo: UnownedVmo,
163}
164
165unsafe impl Send for Callbacks {}
166unsafe impl Sync for Callbacks {}
167
168pub struct Session {
169 manager: Arc<SessionManager>,
170 helper: SessionHelper<SessionManager>,
171 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
172 queue: Mutex<SessionQueue>,
173 abort_handle: AbortHandle,
174}
175
176#[derive(Default)]
177struct SessionQueue {
178 responses: VecDeque<BlockFifoResponse>,
179}
180
181pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
182
183impl Session {
184 fn run(self: &Arc<Self>) {
185 self.fifo_loop();
186 self.abort_handle.abort();
187 self.helper.drop_active_requests(|s| Arc::ptr_eq(s, self));
188 }
189
190 fn fifo_loop(self: &Arc<Self>) {
191 let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
192
193 loop {
194 let is_queue_empty = {
196 let mut queue = self.queue.lock();
197 while !queue.responses.is_empty() {
198 let (front, _) = queue.responses.as_slices();
199 match self.fifo.write(front) {
200 Ok(count) => {
201 let full = count < front.len();
202 queue.responses.drain(..count);
203 if full {
204 break;
205 }
206 }
207 Err(zx::Status::SHOULD_WAIT) => break,
208 Err(_) => return,
209 }
210 }
211 queue.responses.is_empty()
212 };
213
214 match self.fifo.read_uninit(&mut requests) {
216 Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
217 Err(zx::Status::SHOULD_WAIT) => {
218 let mut signals =
219 zx::Signals::OBJECT_READABLE | zx::Signals::USER_0 | zx::Signals::USER_1;
220 if !is_queue_empty {
221 signals |= zx::Signals::OBJECT_WRITABLE;
222 }
223 let Ok(signals) =
224 self.fifo.wait_handle(signals, zx::MonotonicInstant::INFINITE).to_result()
225 else {
226 return;
227 };
228 if signals.contains(zx::Signals::USER_0) {
229 return;
230 }
231 if signals.contains(zx::Signals::USER_1) {
233 let _ = self.fifo.signal_handle(zx::Signals::USER_1, zx::Signals::empty());
234 }
235 }
236 Err(_) => return,
237 }
238 }
239 }
240
241 fn handle_requests<'a>(
242 self: &Arc<Self>,
243 requests: impl Iterator<Item = &'a mut BlockFifoRequest>,
244 ) {
245 let mut c_requests: [MaybeUninit<Request>; MAX_REQUESTS] =
246 unsafe { MaybeUninit::uninit().assume_init() };
247 let mut count = 0;
248 for request in requests {
249 match self.helper.decode_fifo_request(self.clone(), request) {
250 Ok(DecodedRequest { operation: Operation::CloseVmo, request_id, .. }) => {
251 self.complete_request(request_id, zx::Status::OK);
252 }
253 Ok(mut request) => loop {
254 match self.helper.map_request(request) {
255 Ok((
256 DecodedRequest { request_id, operation, trace_flow_id, vmo },
257 remainder,
258 )) => {
259 c_requests[count].write(Request {
263 request_id,
264 operation,
265 trace_flow_id,
266 vmo: UnownedVmo(
267 vmo.as_ref()
268 .map(|vmo| vmo.raw_handle())
269 .unwrap_or(zx::sys::ZX_HANDLE_INVALID),
270 ),
271 });
272 count += 1;
273 if count == MAX_REQUESTS {
274 unsafe {
275 (self.manager.callbacks.on_requests)(
276 self.manager.callbacks.context,
277 c_requests[0].as_mut_ptr(),
278 count,
279 );
280 }
281 count = 0;
282 }
283 if let Some(r) = remainder {
284 request = r;
285 } else {
286 break;
287 }
288 }
289 Err(Some(response)) => {
290 self.send_response(response);
291 break;
292 }
293 Err(None) => break,
294 }
295 },
296 Err(None) => {}
297 Err(Some(response)) => self.send_response(response),
298 }
299 }
300 if count > 0 {
301 unsafe {
302 (self.manager.callbacks.on_requests)(
303 self.manager.callbacks.context,
304 c_requests[0].as_mut_ptr(),
305 count,
306 );
307 }
308 }
309 }
310
311 fn complete_request(&self, request_id: RequestId, status: zx::Status) {
312 self.manager.complete_request(request_id, status);
313 }
314
315 fn send_response(&self, response: BlockFifoResponse) {
316 let mut queue = self.queue.lock();
317 if queue.responses.is_empty() {
318 match self.fifo.write_one(&response) {
319 Ok(()) => {
320 return;
321 }
322 Err(_) => {
323 let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_1);
325 }
326 }
327 }
328 queue.responses.push_back(response);
329 }
330
331 fn terminate(&self) {
332 let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
333 self.abort_handle.abort();
334 }
335}
336
337impl Drop for Session {
338 fn drop(&mut self) {
339 let notify = {
340 let mut open_sessions = self.manager.open_sessions.lock();
341 open_sessions.remove(&(self as *const _ as usize));
342 open_sessions.is_empty()
343 };
344 if notify {
345 self.manager.condvar.notify_all();
346 }
347 }
348}
349
350pub struct BlockServer {
351 server: super::BlockServer<SessionManager>,
352 ehandle: EHandle,
353 abort_handle: AbortHandle,
354}
355
356struct ExecutorMailbox(Mutex<Mail>, Condvar);
357
358impl ExecutorMailbox {
359 fn post(&self, mail: Mail) -> Mail {
361 let old = std::mem::replace(&mut *self.0.lock(), mail);
362 self.1.notify_all();
363 old
364 }
365
366 fn new() -> Self {
367 Self(Mutex::default(), Condvar::new())
368 }
369}
370
371type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
372
373#[derive(Default)]
374enum Mail {
375 #[default]
376 None,
377 Initialized(EHandle, AbortHandle),
378 AsyncShutdown(Box<BlockServer>, ShutdownCallback, *mut c_void),
379 Finished,
380}
381
382impl Drop for BlockServer {
383 fn drop(&mut self) {
384 self.abort_handle.abort();
385 let manager = &self.server.session_manager;
386 let mut mbox = manager.mbox.0.lock();
387 manager.mbox.1.wait_while(&mut mbox, |mbox| !matches!(mbox, Mail::Finished));
388 manager.terminate();
389 debug_assert!(Arc::strong_count(manager) > 0);
390 }
391}
392
393#[repr(C)]
394pub struct PartitionInfo {
395 pub device_flags: u32,
396 pub start_block: u64,
397 pub block_count: u64,
398 pub block_size: u32,
399 pub type_guid: [u8; 16],
400 pub instance_guid: [u8; 16],
401 pub name: *const c_char,
402 pub flags: u64,
403 pub max_transfer_size: u32,
404}
405
406#[allow(non_camel_case_types)]
408type zx_handle_t = zx::sys::zx_handle_t;
409
410#[allow(non_camel_case_types)]
412type zx_status_t = zx::sys::zx_status_t;
413
414impl PartitionInfo {
415 unsafe fn to_rust(&self) -> super::DeviceInfo {
419 super::DeviceInfo::Partition(super::PartitionInfo {
420 device_flags: fblock::Flag::from_bits_truncate(self.device_flags),
421 block_range: Some(self.start_block..self.start_block + self.block_count),
422 type_guid: self.type_guid,
423 instance_guid: self.instance_guid,
424 name: if self.name.is_null() {
425 "".to_string()
426 } else {
427 String::from_utf8_lossy(unsafe { CStr::from_ptr(self.name).to_bytes() }).to_string()
428 },
429 flags: self.flags,
430 max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
431 NonZero::new(self.max_transfer_size / self.block_size)
432 } else {
433 None
434 },
435 })
436 }
437}
438
439#[unsafe(no_mangle)]
443pub unsafe extern "C" fn block_server_new(
444 partition_info: &PartitionInfo,
445 callbacks: Callbacks,
446) -> *mut BlockServer {
447 let session_manager = Arc::new(SessionManager {
448 callbacks,
449 open_sessions: Mutex::default(),
450 active_requests: ActiveRequests::default(),
451 condvar: Condvar::new(),
452 mbox: ExecutorMailbox::new(),
453 info: unsafe { partition_info.to_rust() },
454 });
455
456 unsafe {
457 (session_manager.callbacks.start_thread)(
458 session_manager.callbacks.context,
459 Arc::into_raw(session_manager.clone()) as *const c_void,
460 );
461 }
462
463 let mbox = &session_manager.mbox;
464 let mail = {
465 let mut mail = mbox.0.lock();
466 mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
467 std::mem::replace(&mut *mail, Mail::None)
468 };
469
470 let block_size = partition_info.block_size;
471 match mail {
472 Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
473 server: super::BlockServer::new(block_size, session_manager),
474 ehandle,
475 abort_handle,
476 })),
477 Mail::Finished => std::ptr::null_mut(),
478 _ => unreachable!(),
479 }
480}
481
482#[unsafe(no_mangle)]
486pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
487 let session_manager = unsafe { &*(arg as *const SessionManager) };
488
489 let mut executor = fasync::LocalExecutor::default();
490 let (abort_handle, registration) = AbortHandle::new_pair();
491
492 session_manager.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
493
494 let _ = executor.run_singlethreaded(Abortable::new(std::future::pending::<()>(), registration));
495}
496
497#[unsafe(no_mangle)]
504pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
505 let mail = {
506 let session_manager = unsafe { Arc::from_raw(arg as *const SessionManager) };
507 debug_assert!(Arc::strong_count(&session_manager) > 0);
508 session_manager.mbox.post(Mail::Finished)
509 };
510
511 if let Mail::AsyncShutdown(server, callback, arg) = mail {
512 std::mem::drop(server);
513 unsafe {
515 callback(arg);
516 }
517 }
518}
519
520#[unsafe(no_mangle)]
524pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
525 let _ = unsafe { Box::from_raw(block_server) };
526}
527
528#[unsafe(no_mangle)]
532pub unsafe extern "C" fn block_server_delete_async(
533 block_server: *mut BlockServer,
534 callback: ShutdownCallback,
535 arg: *mut c_void,
536) {
537 let block_server = unsafe { Box::from_raw(block_server) };
538 let session_manager = block_server.server.session_manager.clone();
539 let abort_handle = block_server.abort_handle.clone();
540 session_manager.mbox.post(Mail::AsyncShutdown(block_server, callback, arg));
541 abort_handle.abort();
542}
543
544#[unsafe(no_mangle)]
550pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
551 let block_server = unsafe { &*block_server };
552 let ehandle = &block_server.ehandle;
553 let handle = unsafe { zx::Handle::from_raw(handle) };
554 ehandle.global_scope().spawn(async move {
555 let _ = block_server
556 .server
557 .handle_requests(fvolume::VolumeRequestStream::from_channel(
558 fasync::Channel::from_channel(handle.into()),
559 ))
560 .await;
561 });
562}
563
564#[unsafe(no_mangle)]
568pub unsafe extern "C" fn block_server_session_run(session: &Session) {
569 let session = unsafe { Arc::from_raw(session) };
570 session.run();
571 let _ = Arc::into_raw(session);
572}
573
574#[unsafe(no_mangle)]
578pub unsafe extern "C" fn block_server_session_release(session: &Session) {
579 session.terminate();
580 unsafe { Arc::from_raw(session) };
581}
582
583#[unsafe(no_mangle)]
587pub unsafe extern "C" fn block_server_send_reply(
588 block_server: &BlockServer,
589 request_id: RequestId,
590 status: zx_status_t,
591) {
592 block_server.server.session_manager.complete_request(request_id, zx::Status::from_raw(status));
593}