1use crate::{
6 ActiveRequests, DecodedRequest, DeviceInfo, IntoOrchestrator, OffsetMap, Operation, RequestId,
7 SessionHelper, TraceFlowId, WriteFlags,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse};
11use fidl_fuchsia_storage_block as fblock;
12use fuchsia_sync::{Condvar, Mutex};
13use futures::TryStreamExt as _;
14use futures::stream::{AbortHandle, Abortable};
15use std::borrow::{Borrow, Cow};
16use std::collections::{HashMap, VecDeque};
17use std::mem::MaybeUninit;
18use std::sync::{Arc, Weak};
19
20#[derive(Clone, Debug)]
22pub struct Request {
23 pub request_id: RequestId,
28 pub operation: Operation,
29 pub trace_flow_id: TraceFlowId,
30 pub vmo: Option<Arc<zx::Vmo>>,
32}
33
34pub trait Interface: Send + Sync + Unpin + 'static {
35 type Orchestrator: Borrow<SessionManager<Self>> + Send + Sync;
36
37 fn get_info(&self) -> Cow<'_, DeviceInfo>;
39
40 fn spawn_session(&self, session: Arc<Session<Self>>);
43
44 fn on_requests(&self, requests: &[Request]);
48}
49
50struct SessionManagerInner<I: Interface + ?Sized> {
51 open_sessions: HashMap<usize, Weak<Session<I>>>,
52}
53
54const FIFO_WAKE_SIGNAL: zx::Signals = zx::Signals::USER_0;
58
59const SHUTDOWN_SIGNAL: zx::Signals = zx::Signals::USER_1;
61
62pub struct SessionManager<I: Interface + ?Sized> {
63 interface: Arc<I>,
64 active_requests: ActiveRequests<Arc<Session<I>>>,
66 inflight_requests: Mutex<usize>,
67 no_inflight_requests_condvar: Condvar,
68 inner: Mutex<SessionManagerInner<I>>,
69 no_open_sessions_condvar: Condvar,
70}
71
72impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
73 const SUPPORTS_DECOMPRESSION: bool = false;
74
75 type Orchestrator = I::Orchestrator;
76 type Session = Arc<Session<I>>;
77
78 async fn on_attach_vmo(
79 _orchestrator: Arc<Self::Orchestrator>,
80 _vmo: &Arc<zx::Vmo>,
81 ) -> Result<(), zx::Status> {
82 Ok(())
83 }
84
85 async fn open_session(
86 orchestrator: Arc<Self::Orchestrator>,
87 mut stream: fblock::SessionRequestStream,
88 offset_map: OffsetMap,
89 block_size: u32,
90 ) -> Result<(), Error> {
91 let (helper, fifo) = SessionHelper::new(orchestrator.clone(), offset_map, block_size)?;
92 let (abort_handle, registration) = AbortHandle::new_pair();
93 let session = Arc::new(Session { helper, fifo, queue: Mutex::default(), abort_handle });
94 let sm = orchestrator.as_ref().borrow();
95 sm.inner
96 .lock()
97 .open_sessions
98 .insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
99
100 sm.interface.spawn_session(session.clone());
101
102 let result = Abortable::new(
103 async {
104 while let Some(request) = stream.try_next().await? {
105 session.helper.handle_request(request).await?;
106 }
107 Ok(())
108 },
109 registration,
110 )
111 .await
112 .unwrap_or_else(|e| Err(e.into()));
113
114 let _ = session.fifo.signal(zx::Signals::empty(), SHUTDOWN_SIGNAL);
115
116 result
117 }
118
119 fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
120 self.interface.get_info()
121 }
122
123 fn active_requests(&self) -> &ActiveRequests<Arc<Session<I>>> {
124 &self.active_requests
125 }
126}
127
128impl<I: Interface + ?Sized> SessionManager<I> {
129 pub fn new(interface: Arc<I>) -> Self {
130 Self {
131 interface,
132 active_requests: ActiveRequests::default(),
133 inflight_requests: Mutex::new(0),
134 no_inflight_requests_condvar: Condvar::new(),
135 inner: Mutex::new(SessionManagerInner { open_sessions: HashMap::new() }),
136 no_open_sessions_condvar: Condvar::new(),
137 }
138 }
139
140 pub fn complete_request(&self, request_id: RequestId, status: zx::Status) {
142 let notify = {
143 let mut inflight_requests = self.inflight_requests.lock();
144 *inflight_requests -= 1;
145 *inflight_requests == 0
146 };
147 self.complete_unsubmitted_request(request_id, status);
148 if notify {
149 self.no_inflight_requests_condvar.notify_all();
150 }
151 }
152
153 fn submit_requests(&self, requests: &[Request]) {
154 *self.inflight_requests.lock() += requests.len();
155 self.interface.on_requests(requests);
156 }
157
158 fn wait_for_no_inflight_requests(&self) {
163 let mut guard = self.inflight_requests.lock();
164 self.no_inflight_requests_condvar.wait_while(&mut guard, |count| *count > 0);
165 }
166
167 fn complete_unsubmitted_request(&self, request_id: RequestId, status: zx::Status) {
170 if let Some((session, response)) =
171 self.active_requests.complete_and_take_response(request_id, status)
172 {
173 session.send_response(response);
174 }
175 }
176
177 pub fn terminate(&self) {
179 {
180 #[allow(clippy::collection_is_never_read)]
183 let mut terminated_sessions = Vec::new();
184 for (_, session) in &self.inner.lock().open_sessions {
185 if let Some(session) = session.upgrade() {
186 session.terminate_async();
187 terminated_sessions.push(session);
188 }
189 }
190 }
191 let mut guard = self.inner.lock();
192 self.no_open_sessions_condvar.wait_while(&mut guard, |s| !s.open_sessions.is_empty());
193 }
194}
195
196pub struct Session<I: Interface + ?Sized> {
197 helper: SessionHelper<SessionManager<I>>,
198 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
199 queue: Mutex<SessionQueue>,
200 abort_handle: AbortHandle,
201}
202
203#[derive(Default)]
204struct SessionQueue {
205 responses: VecDeque<BlockFifoResponse>,
206}
207
208pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
209
210struct DecodedRequests {
211 requests: [MaybeUninit<Request>; MAX_REQUESTS],
212 count: usize,
213}
214
215impl Default for DecodedRequests {
216 fn default() -> Self {
217 Self { requests: unsafe { MaybeUninit::uninit().assume_init() }, count: 0 }
218 }
219}
220
221impl DecodedRequests {
222 fn push(&mut self, request: Request) {
223 assert!(self.count < MAX_REQUESTS);
224 self.requests[self.count].write(request);
227 self.count += 1;
228 }
229
230 fn is_empty(&self) -> bool {
231 self.count == 0
232 }
233
234 fn is_full(&self) -> bool {
235 self.count == MAX_REQUESTS
236 }
237
238 fn clear(&mut self) {
239 for i in 0..self.count {
240 unsafe { self.requests[i].assume_init_drop() };
242 }
243 self.count = 0;
244 }
245}
246
247impl Drop for DecodedRequests {
248 fn drop(&mut self) {
249 self.clear();
250 }
251}
252
253impl std::ops::Deref for DecodedRequests {
254 type Target = [Request];
255
256 fn deref(&self) -> &Self::Target {
257 unsafe { std::slice::from_raw_parts(self.requests[0].as_ptr(), self.count) }
259 }
260}
261
262impl std::ops::DerefMut for DecodedRequests {
263 fn deref_mut(&mut self) -> &mut Self::Target {
264 unsafe { std::slice::from_raw_parts_mut(self.requests[0].as_mut_ptr(), self.count) }
266 }
267}
268
269impl<I: Interface + ?Sized> Session<I> {
270 pub fn run(self: &Arc<Self>) {
273 self.fifo_loop();
274 self.abort_handle.abort();
275 self.helper.close_active_groups(|s| Arc::ptr_eq(s, self));
279 }
280
281 fn fifo_loop(self: &Arc<Self>) {
282 let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
283
284 loop {
285 let is_queue_empty = {
287 let mut queue = self.queue.lock();
288 while !queue.responses.is_empty() {
289 let (front, _) = queue.responses.as_slices();
290 match self.fifo.write(front) {
291 Ok(count) => {
292 let full = count < front.len();
293 queue.responses.drain(..count);
294 if full {
295 break;
296 }
297 }
298 Err(zx::Status::SHOULD_WAIT) => break,
299 Err(_) => return,
300 }
301 }
302 queue.responses.is_empty()
303 };
304
305 match self.fifo.read_uninit(&mut requests) {
307 Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
308 Err(zx::Status::SHOULD_WAIT) => {
309 let mut signals =
310 zx::Signals::OBJECT_READABLE | SHUTDOWN_SIGNAL | FIFO_WAKE_SIGNAL;
311 if !is_queue_empty {
312 signals |= zx::Signals::OBJECT_WRITABLE;
313 }
314 let Ok(signals) =
315 self.fifo.wait_one(signals, zx::MonotonicInstant::INFINITE).to_result()
316 else {
317 return;
318 };
319 if signals.contains(SHUTDOWN_SIGNAL) {
320 return;
321 }
322 if signals.contains(FIFO_WAKE_SIGNAL) {
324 let _ = self.fifo.signal(FIFO_WAKE_SIGNAL, zx::Signals::empty());
325 }
326 }
327 Err(_) => return,
328 }
329 }
330 }
331
332 fn pre_flush(self: &Arc<Self>, request_id: RequestId) -> Result<(), zx::Status> {
334 let trace_flow_id = {
335 let mut request = self.helper.session_manager().active_requests.request(request_id);
336 if let Some(id) = request.trace_flow_id {
337 fuchsia_trace::async_instant!(
338 fuchsia_trace::Id::from(id.get()),
339 c"storage",
340 c"block_server::SimulatedBarrier",
341 "request_id" => request_id.0
342 );
343 }
344 request.count += 1;
345 request.trace_flow_id
346 };
347 self.helper.session_manager().submit_requests(&[Request {
348 request_id,
349 operation: Operation::Flush,
350 trace_flow_id,
351 vmo: None,
352 }]);
353 self.helper.session_manager().wait_for_no_inflight_requests();
354 let status = self.helper.session_manager().active_requests.request(request_id).status;
355 match status {
356 zx::Status::OK => Ok(()),
357 status => {
358 self.helper.session_manager().complete_unsubmitted_request(request_id, status);
360 Err(status)
361 }
362 }
363 }
364
365 fn post_flush(self: &Arc<Self>, request_id: RequestId, decoded_requests: &mut DecodedRequests) {
368 if !decoded_requests.is_empty() {
369 self.helper.session_manager().submit_requests(decoded_requests);
370 decoded_requests.clear();
371 }
372 self.helper.session_manager().wait_for_no_inflight_requests();
373 let request = self.helper.session_manager().active_requests.request(request_id);
374 match request.status {
375 zx::Status::OK => decoded_requests.push(Request {
376 request_id,
377 operation: Operation::Flush,
378 trace_flow_id: request.trace_flow_id,
379 vmo: None,
380 }),
381 status => {
382 drop(request);
383 self.helper.session_manager().complete_unsubmitted_request(request_id, status)
384 }
385 }
386 }
387
388 fn handle_requests<'a>(
389 self: &Arc<Self>,
390 requests: impl Iterator<Item = &'a mut BlockFifoRequest>,
391 ) {
392 let manager = &self.helper.session_manager();
393 let mut decoded_requests = DecodedRequests::default();
394
395 for request in requests {
396 match self.helper.decode_fifo_request(self.clone(), request) {
397 Ok(DecodedRequest { operation: Operation::CloseVmo, request_id, .. }) => {
398 manager.complete_unsubmitted_request(request_id, zx::Status::OK);
399 }
400 Ok(mut request) => {
401 let request_id = request.request_id;
402
403 if !manager
406 .interface
407 .get_info()
408 .device_flags()
409 .contains(fblock::DeviceFlag::BARRIER_SUPPORT)
410 && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
411 && self.pre_flush(request_id).is_err()
412 {
413 continue;
414 }
415 let simulate_fua = !manager
418 .interface
419 .get_info()
420 .device_flags()
421 .contains(fblock::DeviceFlag::FUA_SUPPORT)
422 && request.operation.take_write_flag(WriteFlags::FORCE_ACCESS);
423
424 if simulate_fua {
425 manager.active_requests.request(request_id).count += 1;
427 }
428
429 loop {
430 let result = self
431 .helper
432 .map_request(request, &mut manager.active_requests.request(request_id));
433 match result {
434 Ok((
435 DecodedRequest { request_id, operation, vmo, trace_flow_id },
436 remainder,
437 )) => {
438 decoded_requests.push(Request {
439 request_id,
440 operation,
441 trace_flow_id,
442 vmo,
443 });
444
445 if decoded_requests.is_full() {
446 manager.submit_requests(&*decoded_requests);
447 decoded_requests.clear();
448 }
449
450 if let Some(r) = remainder {
451 request = r;
452 } else {
453 break;
454 }
455 }
456 Err(status) => {
457 manager.complete_unsubmitted_request(request_id, status);
458 break;
459 }
460 }
461 }
462
463 if simulate_fua {
464 self.post_flush(request_id, &mut decoded_requests);
465 }
466 }
467 Err(None) => {}
468 Err(Some(response)) => self.send_response(response),
469 }
470 }
471
472 if !decoded_requests.is_empty() {
473 manager.submit_requests(&decoded_requests);
474 }
475 }
476
477 fn send_response(&self, response: BlockFifoResponse) {
478 let mut queue = self.queue.lock();
479 if queue.responses.is_empty() {
480 match self.fifo.write_one(&response) {
481 Ok(()) => {
482 return;
483 }
484 Err(_) => {
485 let _ = self.fifo.signal(zx::Signals::empty(), FIFO_WAKE_SIGNAL);
487 }
488 }
489 }
490 queue.responses.push_back(response);
491 }
492
493 pub fn terminate_async(&self) {
496 let _ = self.fifo.signal(zx::Signals::empty(), SHUTDOWN_SIGNAL);
497 self.abort_handle.abort();
498 }
499}
500
501impl<I: Interface + ?Sized> Drop for Session<I> {
502 fn drop(&mut self) {
503 let mut inner = self.helper.session_manager().inner.lock();
504 let notify = {
505 inner.open_sessions.remove(&(self as *const _ as usize));
506 inner.open_sessions.is_empty()
507 };
508 if notify {
509 self.helper.session_manager().no_open_sessions_condvar.notify_all();
510 }
511 }
512}
513
514impl<I: Interface + ?Sized> Drop for SessionManager<I> {
515 fn drop(&mut self) {
516 self.terminate();
517 }
518}
519
520impl<I: Interface<Orchestrator = SessionManager<I>>> IntoOrchestrator for Arc<SessionManager<I>> {
521 type SM = SessionManager<I>;
522
523 fn into_orchestrator(self) -> Arc<I::Orchestrator> {
524 self
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use crate::BlockInfo;
532 use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse};
533 use fidl::endpoints::create_proxy_and_stream;
534 use zx::HandleBased;
535 use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
536
537 struct MockInterface {
538 request_sender: std::sync::mpsc::Sender<Request>,
539 }
540
541 impl Interface for MockInterface {
542 type Orchestrator = SessionManager<Self>;
543
544 fn get_info(&self) -> Cow<'_, DeviceInfo> {
545 Cow::Owned(DeviceInfo::Block(BlockInfo { block_count: 1024, ..Default::default() }))
546 }
547
548 fn spawn_session(&self, session: Arc<Session<Self>>) {
549 std::thread::spawn(move || {
550 session.run();
551 });
552 }
553
554 fn on_requests(&self, requests: &[Request]) {
555 for request in requests {
556 self.request_sender.send(request.clone()).unwrap();
557 }
558 }
559 }
560
561 #[fasync::run_singlethreaded(test)]
562 async fn test_basic_request() {
563 let (tx, rx) = std::sync::mpsc::channel();
564 let interface = Arc::new(MockInterface { request_sender: tx });
565 let session_manager = Arc::new(SessionManager::new(interface.clone()));
566
567 let sm_clone = session_manager.clone();
568 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
569 let _server_task = fasync::Task::spawn(async move {
570 let server = crate::BlockServer::new(512, sm_clone);
571 server.handle_requests(stream).await.unwrap();
572 })
573 .detach();
574
575 let (session_proxy, session_server_end) =
576 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
577 proxy.open_session(session_server_end).unwrap();
578
579 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
580 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
581
582 let vmo = zx::Vmo::create(8192).unwrap();
583 let vmo_id = session_proxy
584 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
585 .await
586 .unwrap()
587 .unwrap();
588
589 let req = BlockFifoRequest {
590 command: BlockFifoCommand {
591 opcode: fblock::BlockOpcode::Read.into_primitive(),
592 ..Default::default()
593 },
594 reqid: 123,
595 group: 0,
596 vmoid: vmo_id.id,
597 length: 1,
598 vmo_offset: 0,
599 dev_offset: 0,
600 trace_flow_id: 0,
601 ..Default::default()
602 };
603 fifo.write(&[req]).unwrap();
604
605 let r = rx.recv().unwrap();
606 assert_eq!(r.request_id.0, 0);
607 assert!(matches!(r.operation, Operation::Read { .. }));
608
609 session_manager.complete_request(r.request_id, zx::Status::OK);
610
611 let signals =
612 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
613 assert!(signals.contains(zx::Signals::FIFO_READABLE));
614
615 let mut resp = [BlockFifoResponse::default()];
616 fifo.read(&mut resp).unwrap();
617 assert_eq!(resp[0].reqid, 123);
618 assert_eq!(resp[0].status, zx::sys::ZX_OK);
619
620 std::mem::drop(proxy);
621 }
622 #[fasync::run_singlethreaded(test)]
623 async fn test_write_request() {
624 let (tx, rx) = std::sync::mpsc::channel();
625 let interface = Arc::new(MockInterface { request_sender: tx });
626 let session_manager = Arc::new(SessionManager::new(interface.clone()));
627
628 let sm_clone = session_manager.clone();
629 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
630 let _server_task = fasync::Task::spawn(async move {
631 let server = crate::BlockServer::new(512, sm_clone);
632 server.handle_requests(stream).await.unwrap();
633 })
634 .detach();
635
636 let (session_proxy, session_server_end) =
637 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
638 proxy.open_session(session_server_end).unwrap();
639
640 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
641 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
642
643 let vmo = zx::Vmo::create(8192).unwrap();
644 let vmo_id = session_proxy
645 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
646 .await
647 .unwrap()
648 .unwrap();
649
650 let req = BlockFifoRequest {
651 command: BlockFifoCommand {
652 opcode: fblock::BlockOpcode::Write.into_primitive(),
653 ..Default::default()
654 },
655 reqid: 124,
656 group: 0,
657 vmoid: vmo_id.id,
658 length: 1,
659 vmo_offset: 0,
660 dev_offset: 0,
661 trace_flow_id: 0,
662 ..Default::default()
663 };
664 fifo.write(&[req]).unwrap();
665
666 let r = rx.recv().unwrap();
667 assert_eq!(r.request_id.0, 0);
668 assert!(matches!(r.operation, Operation::Write { .. }));
669
670 session_manager.complete_request(r.request_id, zx::Status::OK);
671
672 let signals =
673 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
674 assert!(signals.contains(zx::Signals::FIFO_READABLE));
675
676 let mut resp = [BlockFifoResponse::default()];
677 fifo.read(&mut resp).unwrap();
678 assert_eq!(resp[0].reqid, 124);
679 assert_eq!(resp[0].status, zx::sys::ZX_OK);
680 }
681
682 #[fasync::run_singlethreaded(test)]
683 async fn test_flush_request() {
684 let (tx, rx) = std::sync::mpsc::channel();
685 let interface = Arc::new(MockInterface { request_sender: tx });
686 let session_manager = Arc::new(SessionManager::new(interface.clone()));
687
688 let sm_clone = session_manager.clone();
689 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
690 let _server_task = fasync::Task::spawn(async move {
691 let server = crate::BlockServer::new(512, sm_clone);
692 server.handle_requests(stream).await.unwrap();
693 })
694 .detach();
695
696 let (session_proxy, session_server_end) =
697 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
698 proxy.open_session(session_server_end).unwrap();
699
700 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
701 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
702
703 let req = BlockFifoRequest {
704 command: BlockFifoCommand {
705 opcode: fblock::BlockOpcode::Flush.into_primitive(),
706 ..Default::default()
707 },
708 reqid: 125,
709 group: 0,
710 vmoid: fblock::VMOID_INVALID,
711 length: 0,
712 vmo_offset: 0,
713 dev_offset: 0,
714 trace_flow_id: 0,
715 ..Default::default()
716 };
717 fifo.write(&[req]).unwrap();
718
719 let r = rx.recv().unwrap();
720 assert_eq!(r.request_id.0, 0);
721 assert!(matches!(r.operation, Operation::Flush { .. }));
722
723 session_manager.complete_request(r.request_id, zx::Status::OK);
724
725 let signals =
726 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
727 assert!(signals.contains(zx::Signals::FIFO_READABLE));
728
729 let mut resp = [BlockFifoResponse::default()];
730 fifo.read(&mut resp).unwrap();
731 assert_eq!(resp[0].reqid, 125);
732 assert_eq!(resp[0].status, zx::sys::ZX_OK);
733 }
734
735 #[fasync::run_singlethreaded(test)]
736 async fn test_trim_request() {
737 let (tx, rx) = std::sync::mpsc::channel();
738 let interface = Arc::new(MockInterface { request_sender: tx });
739 let session_manager = Arc::new(SessionManager::new(interface.clone()));
740
741 let sm_clone = session_manager.clone();
742 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
743 let _server_task = fasync::Task::spawn(async move {
744 let server = crate::BlockServer::new(512, sm_clone);
745 server.handle_requests(stream).await.unwrap();
746 })
747 .detach();
748
749 let (session_proxy, session_server_end) =
750 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
751 proxy.open_session(session_server_end).unwrap();
752
753 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
754 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
755
756 let req = BlockFifoRequest {
757 command: BlockFifoCommand {
758 opcode: fblock::BlockOpcode::Trim.into_primitive(),
759 ..Default::default()
760 },
761 reqid: 126,
762 group: 0,
763 vmoid: fblock::VMOID_INVALID,
764 length: 1,
765 vmo_offset: 0,
766 dev_offset: 0,
767 trace_flow_id: 0,
768 ..Default::default()
769 };
770 fifo.write(&[req]).unwrap();
771
772 let r = rx.recv().unwrap();
773 assert_eq!(r.request_id.0, 0);
774 assert!(matches!(r.operation, Operation::Trim { .. }));
775
776 session_manager.complete_request(r.request_id, zx::Status::OK);
777
778 let signals =
779 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
780 assert!(signals.contains(zx::Signals::FIFO_READABLE));
781
782 let mut resp = [BlockFifoResponse::default()];
783 fifo.read(&mut resp).unwrap();
784 assert_eq!(resp[0].reqid, 126);
785 assert_eq!(resp[0].status, zx::sys::ZX_OK);
786 }
787
788 #[fasync::run_singlethreaded(test)]
789 async fn test_close_vmo() {
790 let (tx, rx) = std::sync::mpsc::channel();
791 let interface = Arc::new(MockInterface { request_sender: tx });
792 let session_manager = Arc::new(SessionManager::new(interface.clone()));
793
794 let sm_clone = session_manager.clone();
795 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
796 let _server_task = fasync::Task::spawn(async move {
797 let server = crate::BlockServer::new(512, sm_clone);
798 server.handle_requests(stream).await.unwrap();
799 })
800 .detach();
801
802 let (session_proxy, session_server_end) =
803 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
804 proxy.open_session(session_server_end).unwrap();
805
806 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
807 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
808
809 let vmo = zx::Vmo::create(8192).unwrap();
810 let vmo_id = session_proxy
811 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
812 .await
813 .unwrap()
814 .unwrap();
815
816 let req = BlockFifoRequest {
817 command: BlockFifoCommand {
818 opcode: fblock::BlockOpcode::CloseVmo.into_primitive(),
819 ..Default::default()
820 },
821 reqid: 127,
822 group: 0,
823 vmoid: vmo_id.id,
824 length: 0,
825 vmo_offset: 0,
826 dev_offset: 0,
827 trace_flow_id: 0,
828 ..Default::default()
829 };
830 fifo.write(&[req]).unwrap();
831
832 let signals =
833 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
834 assert!(signals.contains(zx::Signals::FIFO_READABLE));
835
836 let mut resp = [BlockFifoResponse::default()];
837 fifo.read(&mut resp).unwrap();
838 assert_eq!(resp[0].reqid, 127);
839 assert_eq!(resp[0].status, zx::sys::ZX_OK);
840
841 assert!(rx.try_recv().is_err());
843 }
844
845 #[fasync::run_singlethreaded(test)]
846 async fn test_error() {
847 let (tx, rx) = std::sync::mpsc::channel();
848 let interface = Arc::new(MockInterface { request_sender: tx });
849 let session_manager = Arc::new(SessionManager::new(interface.clone()));
850
851 let sm_clone = session_manager.clone();
852 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
853 let _server_task = fasync::Task::spawn(async move {
854 let server = crate::BlockServer::new(512, sm_clone);
855 server.handle_requests(stream).await.unwrap();
856 })
857 .detach();
858
859 let (session_proxy, session_server_end) =
860 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
861 proxy.open_session(session_server_end).unwrap();
862
863 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
864 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
865
866 let req = BlockFifoRequest {
867 command: BlockFifoCommand {
868 opcode: fblock::BlockOpcode::Flush.into_primitive(),
869 ..Default::default()
870 },
871 reqid: 128,
872 group: 0,
873 vmoid: fblock::VMOID_INVALID,
874 length: 0,
875 vmo_offset: 0,
876 dev_offset: 0,
877 trace_flow_id: 0,
878 ..Default::default()
879 };
880 fifo.write(&[req]).unwrap();
881
882 let r = rx.recv().unwrap();
883 session_manager.complete_request(r.request_id, zx::Status::IO);
884
885 let signals =
886 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
887 assert!(signals.contains(zx::Signals::FIFO_READABLE));
888
889 let mut resp = [BlockFifoResponse::default()];
890 fifo.read(&mut resp).unwrap();
891 assert_eq!(resp[0].reqid, 128);
892 assert_eq!(resp[0].status, zx::sys::ZX_ERR_IO);
893 }
894
895 #[fasync::run_singlethreaded(test)]
896 async fn test_teardown_with_active_requests() {
897 let (tx, rx) = std::sync::mpsc::channel();
898 let interface = Arc::new(MockInterface { request_sender: tx });
899 let session_manager = Arc::new(SessionManager::new(interface.clone()));
900
901 let sm_clone = session_manager.clone();
902 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
903 let server_task = fasync::Task::spawn(async move {
904 let server = crate::BlockServer::new(512, sm_clone);
905 server.handle_requests(stream).await.unwrap();
906 });
907
908 let (session_proxy, session_server_end) =
909 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
910 proxy.open_session(session_server_end).unwrap();
911
912 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
913 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
914
915 let vmo = zx::Vmo::create(8192).unwrap();
916 let vmo_id = session_proxy
917 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
918 .await
919 .unwrap()
920 .unwrap();
921
922 let req = BlockFifoRequest {
923 command: BlockFifoCommand {
924 opcode: fblock::BlockOpcode::Read.into_primitive(),
925 ..Default::default()
926 },
927 reqid: 129,
928 group: 0,
929 vmoid: vmo_id.id,
930 length: 1,
931 vmo_offset: 0,
932 dev_offset: 0,
933 trace_flow_id: 0,
934 ..Default::default()
935 };
936 fifo.write(&[req]).unwrap();
938 let r = rx.recv().unwrap();
939
940 drop(session_proxy);
942 fasync::Timer::new(std::time::Duration::from_millis(50)).await;
943
944 session_manager.complete_request(r.request_id, zx::Status::OK);
946
947 drop(proxy);
948 fasync::unblock(move || session_manager.terminate()).await;
949 server_task.await;
950 }
951
952 #[fasync::run_singlethreaded(test)]
953 async fn test_teardown_with_active_grouped_requests() {
954 let (tx, rx) = std::sync::mpsc::channel();
955 let interface = Arc::new(MockInterface { request_sender: tx });
956 let session_manager = Arc::new(SessionManager::new(interface.clone()));
957
958 let sm_clone = session_manager.clone();
959 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
960 let server_task = fasync::Task::spawn(async move {
961 let server = crate::BlockServer::new(512, sm_clone);
962 server.handle_requests(stream).await.unwrap();
963 });
964
965 let (session_proxy, session_server_end) =
966 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
967 proxy.open_session(session_server_end).unwrap();
968
969 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
970 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
971
972 let vmo = zx::Vmo::create(8192).unwrap();
973 let vmo_id = session_proxy
974 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
975 .await
976 .unwrap()
977 .unwrap();
978
979 let mut req = BlockFifoRequest {
980 command: BlockFifoCommand {
981 opcode: fblock::BlockOpcode::Read.into_primitive(),
982 flags: fblock::BlockIoFlag::GROUP_ITEM.bits(),
983 ..Default::default()
984 },
985 reqid: 1,
986 group: 1,
987 vmoid: vmo_id.id,
988 length: 1,
989 vmo_offset: 0,
990 dev_offset: 0,
991 trace_flow_id: 0,
992 ..Default::default()
993 };
994
995 fifo.write(&[req]).unwrap();
997 req.reqid = 2;
998 req.group = 2;
999 fifo.write(&[req]).unwrap();
1000 let r1 = rx.recv().unwrap();
1001 let r2 = rx.recv().unwrap();
1002 req.reqid = 3;
1003 req.group = 2;
1004 fifo.write(&[req]).unwrap();
1005 let r3 = rx.recv().unwrap();
1006
1007 session_manager.complete_request(r1.request_id, zx::Status::OK);
1012 session_manager.complete_request(r2.request_id, zx::Status::OK);
1013
1014 drop(session_proxy);
1017 fasync::Timer::new(std::time::Duration::from_millis(50)).await;
1018
1019 session_manager.complete_request(r3.request_id, zx::Status::OK);
1021
1022 drop(proxy);
1023
1024 fasync::unblock(move || session_manager.terminate()).await;
1025 server_task.await;
1026 }
1027}