1use crate::{
6 ActiveRequests, DecodedRequest, DeviceInfo, HandleRequestResult, IntoOrchestrator, OffsetMap,
7 Operation, RequestId, SessionHelper, TraceFlowId, WriteFlags,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse};
11use fidl_fuchsia_storage_block as fblock;
12use fuchsia_sync::{Condvar, Mutex};
13use futures::TryStreamExt as _;
14use futures::stream::{AbortHandle, Abortable};
15use std::borrow::{Borrow, Cow};
16use std::collections::{HashMap, VecDeque};
17use std::mem::MaybeUninit;
18use std::sync::{Arc, Weak};
19
20#[derive(Clone, Debug)]
22pub struct Request {
23 pub request_id: RequestId,
28 pub operation: Operation,
29 pub trace_flow_id: TraceFlowId,
30 pub vmo: Option<Arc<zx::Vmo>>,
32}
33
34pub trait Interface: Send + Sync + Unpin + 'static {
35 type Orchestrator: Borrow<SessionManager<Self>> + Send + Sync;
36
37 fn get_info(&self) -> Cow<'_, DeviceInfo>;
39
40 fn spawn_session(&self, session: Arc<Session<Self>>);
43
44 fn on_requests(&self, requests: &[Request]);
48}
49
50struct SessionManagerInner<I: Interface + ?Sized> {
51 open_sessions: HashMap<usize, Weak<Session<I>>>,
52}
53
54const FIFO_WAKE_SIGNAL: zx::Signals = zx::Signals::USER_0;
58
59const SHUTDOWN_SIGNAL: zx::Signals = zx::Signals::USER_1;
61
62pub struct SessionManager<I: Interface + ?Sized> {
63 interface: Arc<I>,
64 active_requests: ActiveRequests<Arc<Session<I>>>,
66 inflight_requests: Mutex<usize>,
67 no_inflight_requests_condvar: Condvar,
68 inner: Mutex<SessionManagerInner<I>>,
69 no_open_sessions_condvar: Condvar,
70}
71
72impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
73 const SUPPORTS_DECOMPRESSION: bool = false;
74
75 type Orchestrator = I::Orchestrator;
76 type Session = Arc<Session<I>>;
77
78 async fn on_attach_vmo(
79 _orchestrator: Arc<Self::Orchestrator>,
80 _vmo: &Arc<zx::Vmo>,
81 ) -> Result<(), zx::Status> {
82 Ok(())
83 }
84
85 async fn open_session(
86 orchestrator: Arc<Self::Orchestrator>,
87 mut stream: fblock::SessionRequestStream,
88 offset_map: OffsetMap,
89 block_size: u32,
90 ) -> Result<(), Error> {
91 let (helper, fifo) = SessionHelper::new(orchestrator.clone(), offset_map, block_size)?;
92 let (abort_handle, registration) = AbortHandle::new_pair();
93 let session = Arc::new(Session {
94 helper,
95 fifo,
96 queue: Mutex::default(),
97 abort_handle,
98 close_callback: Mutex::new(None),
99 });
100 let sm = orchestrator.as_ref().borrow();
101 sm.inner
102 .lock()
103 .open_sessions
104 .insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
105
106 sm.interface.spawn_session(session.clone());
107
108 let result = Abortable::new(
109 async {
110 while let Some(request) = stream.try_next().await? {
111 match session.helper.handle_request(request).await? {
112 HandleRequestResult::Ok => {}
113 HandleRequestResult::Closed(callback) => {
114 *session.close_callback.lock() = Some(callback);
115 break;
116 }
117 }
118 }
119 Ok(())
120 },
121 registration,
122 )
123 .await
124 .unwrap_or_else(|e| Err(e.into()));
125
126 let _ = session.fifo.signal(zx::Signals::empty(), SHUTDOWN_SIGNAL);
127
128 result
129 }
130
131 fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
132 self.interface.get_info()
133 }
134
135 fn active_requests(&self) -> &ActiveRequests<Arc<Session<I>>> {
136 &self.active_requests
137 }
138}
139
140impl<I: Interface + ?Sized> SessionManager<I> {
141 pub fn new(interface: Arc<I>) -> Self {
142 Self {
143 interface,
144 active_requests: ActiveRequests::default(),
145 inflight_requests: Mutex::new(0),
146 no_inflight_requests_condvar: Condvar::new(),
147 inner: Mutex::new(SessionManagerInner { open_sessions: HashMap::new() }),
148 no_open_sessions_condvar: Condvar::new(),
149 }
150 }
151
152 pub fn complete_request(&self, request_id: RequestId, status: zx::Status) {
154 let notify = {
155 let mut inflight_requests = self.inflight_requests.lock();
156 *inflight_requests -= 1;
157 *inflight_requests == 0
158 };
159 self.complete_unsubmitted_request(request_id, status);
160 if notify {
161 self.no_inflight_requests_condvar.notify_all();
162 }
163 }
164
165 fn submit_requests(&self, requests: &[Request]) {
166 *self.inflight_requests.lock() += requests.len();
167 self.interface.on_requests(requests);
168 }
169
170 fn wait_for_no_inflight_requests(&self) {
175 let mut guard = self.inflight_requests.lock();
176 self.no_inflight_requests_condvar.wait_while(&mut guard, |count| *count > 0);
177 }
178
179 fn complete_unsubmitted_request(&self, request_id: RequestId, status: zx::Status) {
182 if let Some((session, response)) =
183 self.active_requests.complete_and_take_response(request_id, status)
184 {
185 session.send_response(response);
186 }
187 }
188
189 pub fn terminate(&self) {
191 {
192 #[allow(clippy::collection_is_never_read)]
195 let mut terminated_sessions = Vec::new();
196 for (_, session) in &self.inner.lock().open_sessions {
197 if let Some(session) = session.upgrade() {
198 session.terminate_async();
199 terminated_sessions.push(session);
200 }
201 }
202 }
203 let mut guard = self.inner.lock();
204 self.no_open_sessions_condvar.wait_while(&mut guard, |s| !s.open_sessions.is_empty());
205 }
206}
207
208pub struct Session<I: Interface + ?Sized> {
209 helper: SessionHelper<SessionManager<I>>,
210 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
211 queue: Mutex<SessionQueue>,
212 abort_handle: AbortHandle,
213 close_callback: Mutex<Option<Box<dyn FnOnce() + Send + 'static>>>,
214}
215
216#[derive(Default)]
217struct SessionQueue {
218 responses: VecDeque<BlockFifoResponse>,
219}
220
221pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
222
223struct DecodedRequests {
224 requests: [MaybeUninit<Request>; MAX_REQUESTS],
225 count: usize,
226}
227
228impl Default for DecodedRequests {
229 fn default() -> Self {
230 Self { requests: unsafe { MaybeUninit::uninit().assume_init() }, count: 0 }
231 }
232}
233
234impl DecodedRequests {
235 fn push(&mut self, request: Request) {
236 assert!(self.count < MAX_REQUESTS);
237 self.requests[self.count].write(request);
240 self.count += 1;
241 }
242
243 fn is_empty(&self) -> bool {
244 self.count == 0
245 }
246
247 fn is_full(&self) -> bool {
248 self.count == MAX_REQUESTS
249 }
250
251 fn clear(&mut self) {
252 for i in 0..self.count {
253 unsafe { self.requests[i].assume_init_drop() };
255 }
256 self.count = 0;
257 }
258}
259
260impl Drop for DecodedRequests {
261 fn drop(&mut self) {
262 self.clear();
263 }
264}
265
266impl std::ops::Deref for DecodedRequests {
267 type Target = [Request];
268
269 fn deref(&self) -> &Self::Target {
270 unsafe { std::slice::from_raw_parts(self.requests[0].as_ptr(), self.count) }
272 }
273}
274
275impl std::ops::DerefMut for DecodedRequests {
276 fn deref_mut(&mut self) -> &mut Self::Target {
277 unsafe { std::slice::from_raw_parts_mut(self.requests[0].as_mut_ptr(), self.count) }
279 }
280}
281
282impl<I: Interface + ?Sized> Session<I> {
283 pub fn run(self: &Arc<Self>) {
286 self.fifo_loop();
287 self.abort_handle.abort();
288 self.helper.close_active_groups(|s| Arc::ptr_eq(s, self));
292 }
293
294 fn fifo_loop(self: &Arc<Self>) {
295 let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
296
297 loop {
298 let is_queue_empty = {
300 let mut queue = self.queue.lock();
301 while !queue.responses.is_empty() {
302 let (front, _) = queue.responses.as_slices();
303 match self.fifo.write(front) {
304 Ok(count) => {
305 let full = count < front.len();
306 queue.responses.drain(..count);
307 if full {
308 break;
309 }
310 }
311 Err(zx::Status::SHOULD_WAIT) => break,
312 Err(_) => return,
313 }
314 }
315 queue.responses.is_empty()
316 };
317
318 match self.fifo.read_uninit(&mut requests) {
320 Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
321 Err(zx::Status::SHOULD_WAIT) => {
322 let mut signals =
323 zx::Signals::OBJECT_READABLE | SHUTDOWN_SIGNAL | FIFO_WAKE_SIGNAL;
324 if !is_queue_empty {
325 signals |= zx::Signals::OBJECT_WRITABLE;
326 }
327 let Ok(signals) =
328 self.fifo.wait_one(signals, zx::MonotonicInstant::INFINITE).to_result()
329 else {
330 return;
331 };
332 if signals.contains(SHUTDOWN_SIGNAL) {
333 return;
334 }
335 if signals.contains(FIFO_WAKE_SIGNAL) {
337 let _ = self.fifo.signal(FIFO_WAKE_SIGNAL, zx::Signals::empty());
338 }
339 }
340 Err(_) => return,
341 }
342 }
343 }
344
345 fn pre_flush(self: &Arc<Self>, request_id: RequestId) -> Result<(), zx::Status> {
347 let trace_flow_id = {
348 let mut request = self.helper.session_manager().active_requests.request(request_id);
349 if let Some(id) = request.trace_flow_id {
350 fuchsia_trace::async_instant!(
351 fuchsia_trace::Id::from(id.get()),
352 c"storage",
353 c"block_server::SimulatedBarrier",
354 "request_id" => request_id.0
355 );
356 }
357 request.count += 1;
358 request.trace_flow_id
359 };
360 self.helper.session_manager().submit_requests(&[Request {
361 request_id,
362 operation: Operation::Flush,
363 trace_flow_id,
364 vmo: None,
365 }]);
366 self.helper.session_manager().wait_for_no_inflight_requests();
367 let status = self.helper.session_manager().active_requests.request(request_id).status;
368 match status {
369 zx::Status::OK => Ok(()),
370 status => {
371 self.helper.session_manager().complete_unsubmitted_request(request_id, status);
373 Err(status)
374 }
375 }
376 }
377
378 fn post_flush(self: &Arc<Self>, request_id: RequestId, decoded_requests: &mut DecodedRequests) {
381 if !decoded_requests.is_empty() {
382 self.helper.session_manager().submit_requests(decoded_requests);
383 decoded_requests.clear();
384 }
385 self.helper.session_manager().wait_for_no_inflight_requests();
386 let request = self.helper.session_manager().active_requests.request(request_id);
387 match request.status {
388 zx::Status::OK => decoded_requests.push(Request {
389 request_id,
390 operation: Operation::Flush,
391 trace_flow_id: request.trace_flow_id,
392 vmo: None,
393 }),
394 status => {
395 drop(request);
396 self.helper.session_manager().complete_unsubmitted_request(request_id, status)
397 }
398 }
399 }
400
401 fn handle_requests<'a>(
402 self: &Arc<Self>,
403 requests: impl Iterator<Item = &'a mut BlockFifoRequest>,
404 ) {
405 let manager = &self.helper.session_manager();
406 let mut decoded_requests = DecodedRequests::default();
407
408 for request in requests {
409 match self.helper.decode_fifo_request(self.clone(), request) {
410 Ok(DecodedRequest { operation: Operation::CloseVmo, request_id, .. }) => {
411 manager.complete_unsubmitted_request(request_id, zx::Status::OK);
412 }
413 Ok(mut request) => {
414 let request_id = request.request_id;
415
416 if !manager
419 .interface
420 .get_info()
421 .device_flags()
422 .contains(fblock::DeviceFlag::BARRIER_SUPPORT)
423 && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
424 && self.pre_flush(request_id).is_err()
425 {
426 continue;
427 }
428 let simulate_fua = !manager
431 .interface
432 .get_info()
433 .device_flags()
434 .contains(fblock::DeviceFlag::FUA_SUPPORT)
435 && request.operation.take_write_flag(WriteFlags::FORCE_ACCESS);
436
437 if simulate_fua {
438 manager.active_requests.request(request_id).count += 1;
440 }
441
442 loop {
443 let result = self
444 .helper
445 .map_request(request, &mut manager.active_requests.request(request_id));
446 match result {
447 Ok((
448 DecodedRequest { request_id, operation, vmo, trace_flow_id },
449 remainder,
450 )) => {
451 decoded_requests.push(Request {
452 request_id,
453 operation,
454 trace_flow_id,
455 vmo,
456 });
457
458 if decoded_requests.is_full() {
459 manager.submit_requests(&*decoded_requests);
460 decoded_requests.clear();
461 }
462
463 if let Some(r) = remainder {
464 request = r;
465 } else {
466 break;
467 }
468 }
469 Err(status) => {
470 manager.complete_unsubmitted_request(request_id, status);
471 break;
472 }
473 }
474 }
475
476 if simulate_fua {
477 self.post_flush(request_id, &mut decoded_requests);
478 }
479 }
480 Err(None) => {}
481 Err(Some(response)) => self.send_response(response),
482 }
483 }
484
485 if !decoded_requests.is_empty() {
486 manager.submit_requests(&decoded_requests);
487 }
488 }
489
490 fn send_response(&self, response: BlockFifoResponse) {
491 let mut queue = self.queue.lock();
492 if queue.responses.is_empty() {
493 match self.fifo.write_one(&response) {
494 Ok(()) => {
495 return;
496 }
497 Err(_) => {
498 let _ = self.fifo.signal(zx::Signals::empty(), FIFO_WAKE_SIGNAL);
500 }
501 }
502 }
503 queue.responses.push_back(response);
504 }
505
506 pub fn terminate_async(&self) {
509 let _ = self.fifo.signal(zx::Signals::empty(), SHUTDOWN_SIGNAL);
510 self.abort_handle.abort();
511 }
512}
513
514impl<I: Interface + ?Sized> Drop for Session<I> {
515 fn drop(&mut self) {
516 let callback = std::mem::take(&mut *self.close_callback.lock());
517 if let Some(callback) = callback {
518 callback();
519 }
520 let notify = {
521 let mut inner = self.helper.session_manager().inner.lock();
522 inner.open_sessions.remove(&(self as *const _ as usize));
523 inner.open_sessions.is_empty()
524 };
525 if notify {
526 self.helper.session_manager().no_open_sessions_condvar.notify_all();
527 }
528 }
529}
530
531impl<I: Interface + ?Sized> Drop for SessionManager<I> {
532 fn drop(&mut self) {
533 self.terminate();
534 }
535}
536
537impl<I: Interface<Orchestrator = SessionManager<I>>> IntoOrchestrator for Arc<SessionManager<I>> {
538 type SM = SessionManager<I>;
539
540 fn into_orchestrator(self) -> Arc<I::Orchestrator> {
541 self
542 }
543}
544
545#[cfg(test)]
546mod tests {
547 use super::*;
548 use crate::BlockInfo;
549 use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse};
550 use fidl::endpoints::create_proxy_and_stream;
551 use fidl_fuchsia_storage_block as fblock;
552 use fuchsia_async as fasync;
553
554 struct MockInterface {
555 request_sender: std::sync::mpsc::Sender<Request>,
556 }
557
558 impl Interface for MockInterface {
559 type Orchestrator = SessionManager<Self>;
560
561 fn get_info(&self) -> Cow<'_, DeviceInfo> {
562 Cow::Owned(DeviceInfo::Block(BlockInfo { block_count: 1024, ..Default::default() }))
563 }
564
565 fn spawn_session(&self, session: Arc<Session<Self>>) {
566 std::thread::spawn(move || {
567 session.run();
568 });
569 }
570
571 fn on_requests(&self, requests: &[Request]) {
572 for request in requests {
573 self.request_sender.send(request.clone()).unwrap();
574 }
575 }
576 }
577
578 #[fasync::run_singlethreaded(test)]
579 async fn test_basic_request() {
580 let (tx, rx) = std::sync::mpsc::channel();
581 let interface = Arc::new(MockInterface { request_sender: tx });
582 let session_manager = Arc::new(SessionManager::new(interface.clone()));
583
584 let sm_clone = session_manager.clone();
585 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
586 let _server_task = fasync::Task::spawn(async move {
587 let server = crate::BlockServer::new(512, sm_clone);
588 server.handle_requests(stream).await.unwrap();
589 })
590 .detach();
591
592 let (session_proxy, session_server_end) =
593 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
594 proxy.open_session(session_server_end).unwrap();
595
596 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
597 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
598
599 let vmo = zx::Vmo::create(8192).unwrap();
600 let vmo_id = session_proxy
601 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
602 .await
603 .unwrap()
604 .unwrap();
605
606 let req = BlockFifoRequest {
607 command: BlockFifoCommand {
608 opcode: fblock::BlockOpcode::Read.into_primitive(),
609 ..Default::default()
610 },
611 reqid: 123,
612 group: 0,
613 vmoid: vmo_id.id,
614 length: 1,
615 vmo_offset: 0,
616 dev_offset: 0,
617 trace_flow_id: 0,
618 ..Default::default()
619 };
620 fifo.write(&[req]).unwrap();
621
622 let r = rx.recv().unwrap();
623 assert_eq!(r.request_id.0, 0);
624 assert!(matches!(r.operation, Operation::Read { .. }));
625
626 session_manager.complete_request(r.request_id, zx::Status::OK);
627
628 let signals =
629 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
630 assert!(signals.contains(zx::Signals::FIFO_READABLE));
631
632 let mut resp = [BlockFifoResponse::default()];
633 fifo.read(&mut resp).unwrap();
634 assert_eq!(resp[0].reqid, 123);
635 assert_eq!(resp[0].status, zx::sys::ZX_OK);
636
637 std::mem::drop(proxy);
638 }
639 #[fasync::run_singlethreaded(test)]
640 async fn test_write_request() {
641 let (tx, rx) = std::sync::mpsc::channel();
642 let interface = Arc::new(MockInterface { request_sender: tx });
643 let session_manager = Arc::new(SessionManager::new(interface.clone()));
644
645 let sm_clone = session_manager.clone();
646 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
647 let _server_task = fasync::Task::spawn(async move {
648 let server = crate::BlockServer::new(512, sm_clone);
649 server.handle_requests(stream).await.unwrap();
650 })
651 .detach();
652
653 let (session_proxy, session_server_end) =
654 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
655 proxy.open_session(session_server_end).unwrap();
656
657 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
658 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
659
660 let vmo = zx::Vmo::create(8192).unwrap();
661 let vmo_id = session_proxy
662 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
663 .await
664 .unwrap()
665 .unwrap();
666
667 let req = BlockFifoRequest {
668 command: BlockFifoCommand {
669 opcode: fblock::BlockOpcode::Write.into_primitive(),
670 ..Default::default()
671 },
672 reqid: 124,
673 group: 0,
674 vmoid: vmo_id.id,
675 length: 1,
676 vmo_offset: 0,
677 dev_offset: 0,
678 trace_flow_id: 0,
679 ..Default::default()
680 };
681 fifo.write(&[req]).unwrap();
682
683 let r = rx.recv().unwrap();
684 assert_eq!(r.request_id.0, 0);
685 assert!(matches!(r.operation, Operation::Write { .. }));
686
687 session_manager.complete_request(r.request_id, zx::Status::OK);
688
689 let signals =
690 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
691 assert!(signals.contains(zx::Signals::FIFO_READABLE));
692
693 let mut resp = [BlockFifoResponse::default()];
694 fifo.read(&mut resp).unwrap();
695 assert_eq!(resp[0].reqid, 124);
696 assert_eq!(resp[0].status, zx::sys::ZX_OK);
697 }
698
699 #[fasync::run_singlethreaded(test)]
700 async fn test_flush_request() {
701 let (tx, rx) = std::sync::mpsc::channel();
702 let interface = Arc::new(MockInterface { request_sender: tx });
703 let session_manager = Arc::new(SessionManager::new(interface.clone()));
704
705 let sm_clone = session_manager.clone();
706 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
707 let _server_task = fasync::Task::spawn(async move {
708 let server = crate::BlockServer::new(512, sm_clone);
709 server.handle_requests(stream).await.unwrap();
710 })
711 .detach();
712
713 let (session_proxy, session_server_end) =
714 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
715 proxy.open_session(session_server_end).unwrap();
716
717 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
718 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
719
720 let req = BlockFifoRequest {
721 command: BlockFifoCommand {
722 opcode: fblock::BlockOpcode::Flush.into_primitive(),
723 ..Default::default()
724 },
725 reqid: 125,
726 group: 0,
727 vmoid: fblock::VMOID_INVALID,
728 length: 0,
729 vmo_offset: 0,
730 dev_offset: 0,
731 trace_flow_id: 0,
732 ..Default::default()
733 };
734 fifo.write(&[req]).unwrap();
735
736 let r = rx.recv().unwrap();
737 assert_eq!(r.request_id.0, 0);
738 assert!(matches!(r.operation, Operation::Flush { .. }));
739
740 session_manager.complete_request(r.request_id, zx::Status::OK);
741
742 let signals =
743 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
744 assert!(signals.contains(zx::Signals::FIFO_READABLE));
745
746 let mut resp = [BlockFifoResponse::default()];
747 fifo.read(&mut resp).unwrap();
748 assert_eq!(resp[0].reqid, 125);
749 assert_eq!(resp[0].status, zx::sys::ZX_OK);
750 }
751
752 #[fasync::run_singlethreaded(test)]
753 async fn test_trim_request() {
754 let (tx, rx) = std::sync::mpsc::channel();
755 let interface = Arc::new(MockInterface { request_sender: tx });
756 let session_manager = Arc::new(SessionManager::new(interface.clone()));
757
758 let sm_clone = session_manager.clone();
759 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
760 let _server_task = fasync::Task::spawn(async move {
761 let server = crate::BlockServer::new(512, sm_clone);
762 server.handle_requests(stream).await.unwrap();
763 })
764 .detach();
765
766 let (session_proxy, session_server_end) =
767 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
768 proxy.open_session(session_server_end).unwrap();
769
770 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
771 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
772
773 let req = BlockFifoRequest {
774 command: BlockFifoCommand {
775 opcode: fblock::BlockOpcode::Trim.into_primitive(),
776 ..Default::default()
777 },
778 reqid: 126,
779 group: 0,
780 vmoid: fblock::VMOID_INVALID,
781 length: 1,
782 vmo_offset: 0,
783 dev_offset: 0,
784 trace_flow_id: 0,
785 ..Default::default()
786 };
787 fifo.write(&[req]).unwrap();
788
789 let r = rx.recv().unwrap();
790 assert_eq!(r.request_id.0, 0);
791 assert!(matches!(r.operation, Operation::Trim { .. }));
792
793 session_manager.complete_request(r.request_id, zx::Status::OK);
794
795 let signals =
796 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
797 assert!(signals.contains(zx::Signals::FIFO_READABLE));
798
799 let mut resp = [BlockFifoResponse::default()];
800 fifo.read(&mut resp).unwrap();
801 assert_eq!(resp[0].reqid, 126);
802 assert_eq!(resp[0].status, zx::sys::ZX_OK);
803 }
804
805 #[fasync::run_singlethreaded(test)]
806 async fn test_close_vmo() {
807 let (tx, rx) = std::sync::mpsc::channel();
808 let interface = Arc::new(MockInterface { request_sender: tx });
809 let session_manager = Arc::new(SessionManager::new(interface.clone()));
810
811 let sm_clone = session_manager.clone();
812 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
813 let _server_task = fasync::Task::spawn(async move {
814 let server = crate::BlockServer::new(512, sm_clone);
815 server.handle_requests(stream).await.unwrap();
816 })
817 .detach();
818
819 let (session_proxy, session_server_end) =
820 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
821 proxy.open_session(session_server_end).unwrap();
822
823 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
824 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
825
826 let vmo = zx::Vmo::create(8192).unwrap();
827 let vmo_id = session_proxy
828 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
829 .await
830 .unwrap()
831 .unwrap();
832
833 let req = BlockFifoRequest {
834 command: BlockFifoCommand {
835 opcode: fblock::BlockOpcode::CloseVmo.into_primitive(),
836 ..Default::default()
837 },
838 reqid: 127,
839 group: 0,
840 vmoid: vmo_id.id,
841 length: 0,
842 vmo_offset: 0,
843 dev_offset: 0,
844 trace_flow_id: 0,
845 ..Default::default()
846 };
847 fifo.write(&[req]).unwrap();
848
849 let signals =
850 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
851 assert!(signals.contains(zx::Signals::FIFO_READABLE));
852
853 let mut resp = [BlockFifoResponse::default()];
854 fifo.read(&mut resp).unwrap();
855 assert_eq!(resp[0].reqid, 127);
856 assert_eq!(resp[0].status, zx::sys::ZX_OK);
857
858 assert!(rx.try_recv().is_err());
860 }
861
862 #[fasync::run_singlethreaded(test)]
863 async fn test_error() {
864 let (tx, rx) = std::sync::mpsc::channel();
865 let interface = Arc::new(MockInterface { request_sender: tx });
866 let session_manager = Arc::new(SessionManager::new(interface.clone()));
867
868 let sm_clone = session_manager.clone();
869 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
870 let _server_task = fasync::Task::spawn(async move {
871 let server = crate::BlockServer::new(512, sm_clone);
872 server.handle_requests(stream).await.unwrap();
873 })
874 .detach();
875
876 let (session_proxy, session_server_end) =
877 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
878 proxy.open_session(session_server_end).unwrap();
879
880 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
881 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
882
883 let req = BlockFifoRequest {
884 command: BlockFifoCommand {
885 opcode: fblock::BlockOpcode::Flush.into_primitive(),
886 ..Default::default()
887 },
888 reqid: 128,
889 group: 0,
890 vmoid: fblock::VMOID_INVALID,
891 length: 0,
892 vmo_offset: 0,
893 dev_offset: 0,
894 trace_flow_id: 0,
895 ..Default::default()
896 };
897 fifo.write(&[req]).unwrap();
898
899 let r = rx.recv().unwrap();
900 session_manager.complete_request(r.request_id, zx::Status::IO);
901
902 let signals =
903 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
904 assert!(signals.contains(zx::Signals::FIFO_READABLE));
905
906 let mut resp = [BlockFifoResponse::default()];
907 fifo.read(&mut resp).unwrap();
908 assert_eq!(resp[0].reqid, 128);
909 assert_eq!(resp[0].status, zx::sys::ZX_ERR_IO);
910 }
911
912 #[fasync::run_singlethreaded(test)]
913 async fn test_teardown_with_active_requests() {
914 let (tx, rx) = std::sync::mpsc::channel();
915 let interface = Arc::new(MockInterface { request_sender: tx });
916 let session_manager = Arc::new(SessionManager::new(interface.clone()));
917
918 let sm_clone = session_manager.clone();
919 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
920 let server_task = fasync::Task::spawn(async move {
921 let server = crate::BlockServer::new(512, sm_clone);
922 server.handle_requests(stream).await.unwrap();
923 });
924
925 let (session_proxy, session_server_end) =
926 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
927 proxy.open_session(session_server_end).unwrap();
928
929 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
930 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
931
932 let vmo = zx::Vmo::create(8192).unwrap();
933 let vmo_id = session_proxy
934 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
935 .await
936 .unwrap()
937 .unwrap();
938
939 let req = BlockFifoRequest {
940 command: BlockFifoCommand {
941 opcode: fblock::BlockOpcode::Read.into_primitive(),
942 ..Default::default()
943 },
944 reqid: 129,
945 group: 0,
946 vmoid: vmo_id.id,
947 length: 1,
948 vmo_offset: 0,
949 dev_offset: 0,
950 trace_flow_id: 0,
951 ..Default::default()
952 };
953 fifo.write(&[req]).unwrap();
955 let r = rx.recv().unwrap();
956
957 drop(session_proxy);
959 fasync::Timer::new(std::time::Duration::from_millis(50)).await;
960
961 session_manager.complete_request(r.request_id, zx::Status::OK);
963
964 drop(proxy);
965 fasync::unblock(move || session_manager.terminate()).await;
966 server_task.await;
967 }
968
969 #[fasync::run_singlethreaded(test)]
970 async fn test_teardown_with_active_grouped_requests() {
971 let (tx, rx) = std::sync::mpsc::channel();
972 let interface = Arc::new(MockInterface { request_sender: tx });
973 let session_manager = Arc::new(SessionManager::new(interface.clone()));
974
975 let sm_clone = session_manager.clone();
976 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
977 let server_task = fasync::Task::spawn(async move {
978 let server = crate::BlockServer::new(512, sm_clone);
979 server.handle_requests(stream).await.unwrap();
980 });
981
982 let (session_proxy, session_server_end) =
983 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
984 proxy.open_session(session_server_end).unwrap();
985
986 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
987 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
988
989 let vmo = zx::Vmo::create(8192).unwrap();
990 let vmo_id = session_proxy
991 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
992 .await
993 .unwrap()
994 .unwrap();
995
996 let mut req = BlockFifoRequest {
997 command: BlockFifoCommand {
998 opcode: fblock::BlockOpcode::Read.into_primitive(),
999 flags: fblock::BlockIoFlag::GROUP_ITEM.bits(),
1000 ..Default::default()
1001 },
1002 reqid: 1,
1003 group: 1,
1004 vmoid: vmo_id.id,
1005 length: 1,
1006 vmo_offset: 0,
1007 dev_offset: 0,
1008 trace_flow_id: 0,
1009 ..Default::default()
1010 };
1011
1012 fifo.write(&[req]).unwrap();
1014 req.reqid = 2;
1015 req.group = 2;
1016 fifo.write(&[req]).unwrap();
1017 let r1 = rx.recv().unwrap();
1018 let r2 = rx.recv().unwrap();
1019 req.reqid = 3;
1020 req.group = 2;
1021 fifo.write(&[req]).unwrap();
1022 let r3 = rx.recv().unwrap();
1023
1024 session_manager.complete_request(r1.request_id, zx::Status::OK);
1029 session_manager.complete_request(r2.request_id, zx::Status::OK);
1030
1031 drop(session_proxy);
1034 fasync::Timer::new(std::time::Duration::from_millis(50)).await;
1035
1036 session_manager.complete_request(r3.request_id, zx::Status::OK);
1038
1039 drop(proxy);
1040
1041 fasync::unblock(move || session_manager.terminate()).await;
1042 server_task.await;
1043 }
1044
1045 #[fasync::run_singlethreaded(test)]
1046 async fn test_session_close_is_synchronous() {
1047 use futures::FutureExt as _;
1048
1049 let (tx, rx) = std::sync::mpsc::channel();
1050 let interface = Arc::new(MockInterface { request_sender: tx });
1051 let session_manager = Arc::new(SessionManager::new(interface.clone()));
1052
1053 let sm_clone = session_manager.clone();
1054 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
1055 let server_task = fasync::Task::spawn(async move {
1056 let server = crate::BlockServer::new(512, sm_clone);
1057 server.handle_requests(stream).await.unwrap();
1058 });
1059
1060 let (session_proxy, session_server_end) =
1061 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
1062 proxy.open_session(session_server_end).unwrap();
1063
1064 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
1065 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
1066
1067 let vmo = zx::Vmo::create(8192).unwrap();
1068 let vmo_id = session_proxy
1069 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1070 .await
1071 .unwrap()
1072 .unwrap();
1073
1074 let req = BlockFifoRequest {
1075 command: BlockFifoCommand {
1076 opcode: fblock::BlockOpcode::Read.into_primitive(),
1077 ..Default::default()
1078 },
1079 reqid: 123,
1080 group: 0,
1081 vmoid: vmo_id.id,
1082 length: 1,
1083 vmo_offset: 0,
1084 dev_offset: 0,
1085 trace_flow_id: 0,
1086 ..Default::default()
1087 };
1088 fifo.write(&[req]).unwrap();
1089
1090 let r = rx.recv().unwrap();
1091
1092 let mut close_fut = std::pin::pin!(session_proxy.close().fuse());
1094 let mut timer_fut =
1095 std::pin::pin!(fasync::Timer::new(std::time::Duration::from_millis(100)).fuse());
1096 futures::select! {
1097 res = close_fut => panic!("close completed too early: {:?}", res),
1098 _ = timer_fut => {}
1099 }
1100
1101 session_manager.complete_request(r.request_id, zx::Status::OK);
1102
1103 close_fut.await.unwrap().unwrap();
1105
1106 std::mem::drop(proxy);
1107 server_task.await;
1108 }
1109}