1use crate::WriteFlags;
6
7use super::{
8 ActiveRequests, DecodedRequest, IntoSessionManager, OffsetMap, Operation, RequestId,
9 SessionHelper, TraceFlowId,
10};
11use anyhow::Error;
12use block_protocol::{BlockFifoRequest, BlockFifoResponse};
13use fidl::endpoints::RequestStream;
14use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
15use fuchsia_async::{self as fasync, EHandle};
16use fuchsia_sync::{Condvar, Mutex};
17use futures::TryStreamExt;
18use futures::stream::{AbortHandle, Abortable};
19use std::borrow::Cow;
20use std::collections::{HashMap, VecDeque};
21use std::ffi::{CStr, c_char, c_void};
22use std::mem::MaybeUninit;
23use std::num::NonZero;
24use std::ops::{Deref, DerefMut};
25use std::sync::{Arc, Weak};
26use zx::{self as zx, AsHandleRef as _};
27use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
28
29pub struct SessionManager {
30 callbacks: Callbacks,
31 open_sessions: Mutex<HashMap<usize, Weak<Session>>>,
32 open_sessions_condvar: Condvar,
33 active_requests: ActiveRequests<Arc<Session>>,
34 inflight_requests: Mutex<usize>,
35 no_inflight_requests_condvar: Condvar,
36 mbox: ExecutorMailbox,
37 info: super::DeviceInfo,
38}
39
40unsafe impl Send for SessionManager {}
41unsafe impl Sync for SessionManager {}
42
43impl SessionManager {
44 fn submit_requests(&self, requests: &mut [Request]) {
45 *self.inflight_requests.lock() += requests.len();
46 unsafe {
49 (self.callbacks.on_requests)(
50 self.callbacks.context,
51 std::ptr::from_mut(&mut requests[0]),
52 requests.len(),
53 )
54 }
55 }
56
57 fn wait_for_no_inflight_requests(&self) {
62 let mut guard = self.inflight_requests.lock();
63 self.no_inflight_requests_condvar.wait_while(&mut guard, |count| *count > 0);
64 }
65
66 fn complete_unsubmitted_request(&self, request_id: RequestId, status: zx::Status) {
69 if let Some((session, response)) =
70 self.active_requests.complete_and_take_response(request_id, status)
71 {
72 session.send_response(response);
73 }
74 }
75
76 fn complete_request(&self, request_id: RequestId, status: zx::Status) {
77 let notify = {
78 let mut inflight_requests = self.inflight_requests.lock();
79 *inflight_requests -= 1;
80 *inflight_requests == 0
81 };
82 self.complete_unsubmitted_request(request_id, status);
83 if notify {
84 self.no_inflight_requests_condvar.notify_all();
85 }
86 }
87
88 fn terminate(&self) {
89 {
90 #[allow(clippy::collection_is_never_read)]
93 let mut terminated_sessions = Vec::new();
94 for (_, session) in &*self.open_sessions.lock() {
95 if let Some(session) = session.upgrade() {
96 session.terminate();
97 terminated_sessions.push(session);
98 }
99 }
100 }
101 let mut guard = self.open_sessions.lock();
102 self.open_sessions_condvar.wait_while(&mut guard, |s| !s.is_empty());
103 }
104}
105
106impl super::SessionManager for SessionManager {
107 const SUPPORTS_DECOMPRESSION: bool = false;
108 type Session = Arc<Session>;
109
110 async fn on_attach_vmo(self: Arc<Self>, _vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
111 Ok(())
112 }
113
114 async fn open_session(
115 self: Arc<Self>,
116 mut stream: fblock::SessionRequestStream,
117 offset_map: OffsetMap,
118 block_size: u32,
119 ) -> Result<(), Error> {
120 let (helper, fifo) = SessionHelper::new(self.clone(), offset_map, block_size)?;
121 let (abort_handle, registration) = AbortHandle::new_pair();
122 let session = Arc::new(Session {
123 manager: self.clone(),
124 helper,
125 fifo,
126 queue: Mutex::default(),
127 abort_handle,
128 });
129 self.open_sessions.lock().insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
130 unsafe {
131 (self.callbacks.on_new_session)(self.callbacks.context, Arc::into_raw(session.clone()));
132 }
133
134 let result = Abortable::new(
135 async {
136 while let Some(request) = stream.try_next().await? {
137 session.helper.handle_request(request).await?;
138 }
139 Ok(())
140 },
141 registration,
142 )
143 .await
144 .unwrap_or_else(|e| Err(e.into()));
145
146 let _ = session.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
147
148 result
149 }
150
151 fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
152 Cow::Borrowed(&self.info)
153 }
154
155 fn active_requests(&self) -> &ActiveRequests<Arc<Session>> {
156 &self.active_requests
157 }
158}
159
160impl Drop for SessionManager {
161 fn drop(&mut self) {
162 self.terminate();
163 }
164}
165
166impl IntoSessionManager for Arc<SessionManager> {
167 type SM = SessionManager;
168
169 fn into_session_manager(self) -> Self {
170 self
171 }
172}
173
174#[repr(C)]
175pub struct Callbacks {
176 pub context: *mut c_void,
180 pub start_thread: unsafe extern "C" fn(context: *mut c_void, arg: *const c_void),
184 pub on_new_session: unsafe extern "C" fn(context: *mut c_void, session: *const Session),
189 pub on_requests:
195 unsafe extern "C" fn(context: *mut c_void, requests: *mut Request, request_count: usize),
196 pub log: unsafe extern "C" fn(context: *mut c_void, message: *const c_char, message_len: usize),
199}
200
201impl Callbacks {
202 #[allow(dead_code)]
203 fn log(&self, msg: &str) {
204 let msg = msg.as_bytes();
205 unsafe {
207 (self.log)(self.context, msg.as_ptr() as *const c_char, msg.len());
208 }
209 }
210}
211
212#[allow(dead_code)]
214pub struct UnownedVmo(zx::sys::zx_handle_t);
215
216#[repr(C)]
217pub struct Request {
218 pub request_id: RequestId,
219 pub operation: Operation,
220 pub trace_flow_id: TraceFlowId,
221 pub vmo: UnownedVmo,
222}
223
224unsafe impl Send for Callbacks {}
225unsafe impl Sync for Callbacks {}
226
227pub struct Session {
228 manager: Arc<SessionManager>,
229 helper: SessionHelper<SessionManager>,
230 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
231 queue: Mutex<SessionQueue>,
232 abort_handle: AbortHandle,
233}
234
235#[derive(Default)]
236struct SessionQueue {
237 responses: VecDeque<BlockFifoResponse>,
238}
239
240pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
241
242impl Session {
243 fn run(self: &Arc<Self>) {
244 self.fifo_loop();
245 self.abort_handle.abort();
246 self.helper.drop_active_requests(|s| Arc::ptr_eq(s, self));
247 }
248
249 fn fifo_loop(self: &Arc<Self>) {
250 let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
251
252 loop {
253 let is_queue_empty = {
255 let mut queue = self.queue.lock();
256 while !queue.responses.is_empty() {
257 let (front, _) = queue.responses.as_slices();
258 match self.fifo.write(front) {
259 Ok(count) => {
260 let full = count < front.len();
261 queue.responses.drain(..count);
262 if full {
263 break;
264 }
265 }
266 Err(zx::Status::SHOULD_WAIT) => break,
267 Err(_) => return,
268 }
269 }
270 queue.responses.is_empty()
271 };
272
273 match self.fifo.read_uninit(&mut requests) {
275 Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
276 Err(zx::Status::SHOULD_WAIT) => {
277 let mut signals =
278 zx::Signals::OBJECT_READABLE | zx::Signals::USER_0 | zx::Signals::USER_1;
279 if !is_queue_empty {
280 signals |= zx::Signals::OBJECT_WRITABLE;
281 }
282 let Ok(signals) =
283 self.fifo.wait_handle(signals, zx::MonotonicInstant::INFINITE).to_result()
284 else {
285 return;
286 };
287 if signals.contains(zx::Signals::USER_0) {
288 return;
289 }
290 if signals.contains(zx::Signals::USER_1) {
292 let _ = self.fifo.signal_handle(zx::Signals::USER_1, zx::Signals::empty());
293 }
294 }
295 Err(_) => return,
296 }
297 }
298 }
299
300 fn pre_flush(self: &Arc<Self>, request_id: RequestId) -> Result<(), zx::Status> {
302 let trace_flow_id = {
303 let mut request = self.manager.active_requests.request(request_id);
304 if let Some(id) = request.trace_flow_id {
305 fuchsia_trace::async_instant!(
306 fuchsia_trace::Id::from(id.get()),
307 c"storage",
308 c"block_server::SimulatedBarrier",
309 "request_id" => request_id.0
310 );
311 }
312 request.count += 1;
313 request.trace_flow_id
314 };
315 self.manager.submit_requests(&mut [Request {
316 request_id,
317 operation: Operation::Flush,
318 trace_flow_id,
319 vmo: UnownedVmo(zx::sys::ZX_HANDLE_INVALID),
320 }]);
321 self.manager.wait_for_no_inflight_requests();
322 let status = self.manager.active_requests.request(request_id).status;
323 match status {
324 zx::Status::OK => Ok(()),
325 status => {
326 self.manager.complete_unsubmitted_request(request_id, status);
328 Err(status)
329 }
330 }
331 }
332
333 fn post_flush(self: &Arc<Self>, request_id: RequestId, decoded_requests: &mut DecodedRequests) {
336 if decoded_requests.len() > 0 {
337 self.manager.submit_requests(decoded_requests);
338 decoded_requests.clear();
339 }
340 self.manager.wait_for_no_inflight_requests();
341 let request = self.manager.active_requests.request(request_id);
342 match request.status {
343 zx::Status::OK => decoded_requests.push(Request {
344 request_id,
345 operation: Operation::Flush,
346 trace_flow_id: request.trace_flow_id,
347 vmo: UnownedVmo(zx::sys::ZX_HANDLE_INVALID),
348 }),
349 status => {
350 drop(request);
351 self.manager.complete_unsubmitted_request(request_id, status)
352 }
353 }
354 }
355
356 fn handle_requests<'a>(
357 self: &Arc<Self>,
358 requests: impl Iterator<Item = &'a mut BlockFifoRequest>,
359 ) {
360 let mut decoded_requests = DecodedRequests::default();
361 for request in requests {
362 match self.helper.decode_fifo_request(self.clone(), request) {
363 Ok(DecodedRequest { operation: Operation::CloseVmo, request_id, .. }) => {
364 self.manager.complete_unsubmitted_request(request_id, zx::Status::OK);
365 }
366 Ok(mut request) => {
367 let request_id = request.request_id;
368 if !self.manager.info.device_flags().contains(fblock::Flag::BARRIER_SUPPORT)
371 && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
372 && self.pre_flush(request_id).is_err()
373 {
374 continue;
375 }
376 let simulate_fua =
379 !self.manager.info.device_flags().contains(fblock::Flag::FUA_SUPPORT)
380 && request.operation.take_write_flag(WriteFlags::FORCE_ACCESS);
381 if simulate_fua {
382 self.manager.active_requests.request(request_id).count += 1;
384 }
385
386 loop {
387 let result = self.helper.map_request(
388 request,
389 &mut self.manager.active_requests.request(request_id),
390 );
391 match result {
392 Ok((
393 DecodedRequest { request_id, operation, vmo, trace_flow_id },
394 remainder,
395 )) => {
396 decoded_requests.push(Request {
400 request_id,
401 operation,
402 trace_flow_id,
403 vmo: UnownedVmo(
404 vmo.as_ref()
405 .map(|vmo| vmo.raw_handle())
406 .unwrap_or(zx::sys::ZX_HANDLE_INVALID),
407 ),
408 });
409
410 if decoded_requests.is_full() {
411 self.manager.submit_requests(&mut decoded_requests);
412 decoded_requests.clear();
413 }
414 if let Some(r) = remainder {
415 request = r;
416 } else {
417 break;
418 }
419 }
420 Err(status) => {
421 self.manager.complete_unsubmitted_request(request_id, status);
422 break;
423 }
424 }
425 }
426
427 if simulate_fua {
428 self.post_flush(request_id, &mut decoded_requests);
429 }
430 }
431 Err(None) => {}
432 Err(Some(response)) => self.send_response(response),
433 }
434 }
435 if !decoded_requests.is_empty() {
436 self.manager.submit_requests(&mut decoded_requests);
437 }
438 }
439
440 fn send_response(&self, response: BlockFifoResponse) {
441 let mut queue = self.queue.lock();
442 if queue.responses.is_empty() {
443 match self.fifo.write_one(&response) {
444 Ok(()) => {
445 return;
446 }
447 Err(_) => {
448 let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_1);
450 }
451 }
452 }
453 queue.responses.push_back(response);
454 }
455
456 fn terminate(&self) {
457 let _ = self.fifo.signal_handle(zx::Signals::empty(), zx::Signals::USER_0);
458 self.abort_handle.abort();
459 }
460}
461
462impl Drop for Session {
463 fn drop(&mut self) {
464 let notify = {
465 let mut open_sessions = self.manager.open_sessions.lock();
466 open_sessions.remove(&(self as *const _ as usize));
467 open_sessions.is_empty()
468 };
469 if notify {
470 self.manager.open_sessions_condvar.notify_all();
471 }
472 }
473}
474
475pub struct BlockServer {
476 server: super::BlockServer<SessionManager>,
477 ehandle: EHandle,
478 abort_handle: AbortHandle,
479}
480
481struct ExecutorMailbox(Mutex<Mail>, Condvar);
482
483impl ExecutorMailbox {
484 fn post(&self, mail: Mail) -> Mail {
486 let old = std::mem::replace(&mut *self.0.lock(), mail);
487 self.1.notify_all();
488 old
489 }
490
491 fn new() -> Self {
492 Self(Mutex::default(), Condvar::new())
493 }
494}
495
496type ShutdownCallback = unsafe extern "C" fn(*mut c_void);
497
498#[derive(Default)]
499enum Mail {
500 #[default]
501 None,
502 Initialized(EHandle, AbortHandle),
503 AsyncShutdown(Box<BlockServer>, ShutdownCallback, *mut c_void),
504 Finished,
505}
506
507impl Drop for BlockServer {
508 fn drop(&mut self) {
509 self.abort_handle.abort();
510 let manager = &self.server.session_manager;
511 let mut mbox = manager.mbox.0.lock();
512 manager.mbox.1.wait_while(&mut mbox, |mbox| !matches!(mbox, Mail::Finished));
513 manager.terminate();
514 debug_assert!(Arc::strong_count(manager) > 0);
515 }
516}
517
518#[repr(C)]
519pub struct PartitionInfo {
520 pub device_flags: u32,
521 pub start_block: u64,
522 pub block_count: u64,
523 pub block_size: u32,
524 pub type_guid: [u8; 16],
525 pub instance_guid: [u8; 16],
526 pub name: *const c_char,
527 pub flags: u64,
528 pub max_transfer_size: u32,
529}
530
531#[allow(non_camel_case_types)]
533type zx_handle_t = zx::sys::zx_handle_t;
534
535#[allow(non_camel_case_types)]
537type zx_status_t = zx::sys::zx_status_t;
538
539impl PartitionInfo {
540 unsafe fn to_rust(&self) -> super::DeviceInfo {
544 super::DeviceInfo::Partition(super::PartitionInfo {
545 device_flags: fblock::Flag::from_bits_truncate(self.device_flags),
546 block_range: Some(self.start_block..self.start_block + self.block_count),
547 type_guid: self.type_guid,
548 instance_guid: self.instance_guid,
549 name: if self.name.is_null() {
550 "".to_string()
551 } else {
552 String::from_utf8_lossy(unsafe { CStr::from_ptr(self.name).to_bytes() }).to_string()
553 },
554 flags: self.flags,
555 max_transfer_blocks: if self.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
556 NonZero::new(self.max_transfer_size / self.block_size)
557 } else {
558 None
559 },
560 })
561 }
562}
563
564struct DecodedRequests {
565 requests: [MaybeUninit<Request>; MAX_REQUESTS],
566 count: usize,
567}
568
569impl Default for DecodedRequests {
570 fn default() -> Self {
571 Self { requests: unsafe { MaybeUninit::uninit().assume_init() }, count: 0 }
572 }
573}
574
575impl DecodedRequests {
576 fn push(&mut self, request: Request) {
577 assert!(self.count < MAX_REQUESTS);
578 self.requests[self.count].write(request);
579 self.count += 1;
580 }
581
582 fn is_full(&self) -> bool {
583 self.count == MAX_REQUESTS
584 }
585
586 fn clear(&mut self) {
587 self.count = 0;
588 }
589}
590
591impl Deref for DecodedRequests {
592 type Target = [Request];
593
594 fn deref(&self) -> &Self::Target {
595 unsafe { std::slice::from_raw_parts(self.requests[0].as_ptr(), self.count) }
597 }
598}
599
600impl DerefMut for DecodedRequests {
601 fn deref_mut(&mut self) -> &mut Self::Target {
602 unsafe { std::slice::from_raw_parts_mut(self.requests[0].as_mut_ptr(), self.count) }
604 }
605}
606
607#[unsafe(no_mangle)]
611pub unsafe extern "C" fn block_server_new(
612 partition_info: &PartitionInfo,
613 callbacks: Callbacks,
614) -> *mut BlockServer {
615 let session_manager = Arc::new(SessionManager {
616 callbacks,
617 open_sessions: Mutex::default(),
618 active_requests: ActiveRequests::default(),
619 open_sessions_condvar: Condvar::new(),
620 inflight_requests: Mutex::default(),
621 no_inflight_requests_condvar: Condvar::new(),
622 mbox: ExecutorMailbox::new(),
623 info: unsafe { partition_info.to_rust() },
624 });
625
626 unsafe {
627 (session_manager.callbacks.start_thread)(
628 session_manager.callbacks.context,
629 Arc::into_raw(session_manager.clone()) as *const c_void,
630 );
631 }
632
633 let mbox = &session_manager.mbox;
634 let mail = {
635 let mut mail = mbox.0.lock();
636 mbox.1.wait_while(&mut mail, |mail| matches!(mail, Mail::None));
637 std::mem::replace(&mut *mail, Mail::None)
638 };
639
640 let block_size = partition_info.block_size;
641 match mail {
642 Mail::Initialized(ehandle, abort_handle) => Box::into_raw(Box::new(BlockServer {
643 server: super::BlockServer::new(block_size, session_manager),
644 ehandle,
645 abort_handle,
646 })),
647 Mail::Finished => std::ptr::null_mut(),
648 _ => unreachable!(),
649 }
650}
651
652#[unsafe(no_mangle)]
656pub unsafe extern "C" fn block_server_thread(arg: *const c_void) {
657 let session_manager = unsafe { &*(arg as *const SessionManager) };
658
659 let mut executor = fasync::LocalExecutor::default();
660 let (abort_handle, registration) = AbortHandle::new_pair();
661
662 session_manager.mbox.post(Mail::Initialized(EHandle::local(), abort_handle));
663
664 let _ = executor.run_singlethreaded(Abortable::new(std::future::pending::<()>(), registration));
665}
666
667#[unsafe(no_mangle)]
674pub unsafe extern "C" fn block_server_thread_delete(arg: *const c_void) {
675 let mail = {
676 let session_manager = unsafe { Arc::from_raw(arg as *const SessionManager) };
677 debug_assert!(Arc::strong_count(&session_manager) > 0);
678 session_manager.mbox.post(Mail::Finished)
679 };
680
681 if let Mail::AsyncShutdown(server, callback, arg) = mail {
682 std::mem::drop(server);
683 unsafe {
685 callback(arg);
686 }
687 }
688}
689
690#[unsafe(no_mangle)]
694pub unsafe extern "C" fn block_server_delete(block_server: *mut BlockServer) {
695 let _ = unsafe { Box::from_raw(block_server) };
696}
697
698#[unsafe(no_mangle)]
702pub unsafe extern "C" fn block_server_delete_async(
703 block_server: *mut BlockServer,
704 callback: ShutdownCallback,
705 arg: *mut c_void,
706) {
707 let block_server = unsafe { Box::from_raw(block_server) };
708 let session_manager = block_server.server.session_manager.clone();
709 let abort_handle = block_server.abort_handle.clone();
710 session_manager.mbox.post(Mail::AsyncShutdown(block_server, callback, arg));
711 abort_handle.abort();
712}
713
714#[unsafe(no_mangle)]
720pub unsafe extern "C" fn block_server_serve(block_server: *const BlockServer, handle: zx_handle_t) {
721 let block_server = unsafe { &*block_server };
722 let ehandle = &block_server.ehandle;
723 let handle = unsafe { zx::NullableHandle::from_raw(handle) };
724 ehandle.global_scope().spawn(async move {
725 let _ = block_server
726 .server
727 .handle_requests(fvolume::VolumeRequestStream::from_channel(
728 fasync::Channel::from_channel(handle.into()),
729 ))
730 .await;
731 });
732}
733
734#[unsafe(no_mangle)]
738pub unsafe extern "C" fn block_server_session_run(session: &Session) {
739 let session = unsafe { Arc::from_raw(session) };
740 session.run();
741 let _ = Arc::into_raw(session);
742}
743
744#[unsafe(no_mangle)]
748pub unsafe extern "C" fn block_server_session_release(session: &Session) {
749 session.terminate();
750 unsafe { Arc::from_raw(session) };
751}
752
753#[unsafe(no_mangle)]
757pub unsafe extern "C" fn block_server_send_reply(
758 block_server: &BlockServer,
759 request_id: RequestId,
760 status: zx_status_t,
761) {
762 block_server.server.session_manager.complete_request(request_id, zx::Status::from_raw(status));
763}