1use crate::{
6 ActiveRequests, DecodedRequest, DeviceInfo, IntoOrchestrator, OffsetMap, Operation, RequestId,
7 SessionHelper, TraceFlowId, WriteFlags,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse};
11use fidl_fuchsia_storage_block as fblock;
12use fuchsia_sync::{Condvar, Mutex};
13use futures::TryStreamExt as _;
14use futures::stream::{AbortHandle, Abortable};
15use std::borrow::{Borrow, Cow};
16use std::collections::{HashMap, VecDeque};
17use std::mem::MaybeUninit;
18use std::sync::{Arc, Weak};
19
20#[derive(Clone, Debug)]
22pub struct Request {
23 pub request_id: RequestId,
28 pub operation: Operation,
29 pub trace_flow_id: TraceFlowId,
30 pub vmo: Option<Arc<zx::Vmo>>,
32}
33
34pub trait Interface: Send + Sync + Unpin + 'static {
35 type Orchestrator: Borrow<SessionManager<Self>> + Send + Sync;
36
37 fn get_info(&self) -> Cow<'_, DeviceInfo>;
39
40 fn spawn_session(&self, session: Arc<Session<Self>>);
43
44 fn on_requests(&self, requests: &[Request]);
48}
49
50struct SessionManagerInner<I: Interface + ?Sized> {
51 open_sessions: HashMap<usize, Weak<Session<I>>>,
52}
53
54const FIFO_WAKE_SIGNAL: zx::Signals = zx::Signals::USER_0;
58
59const SHUTDOWN_SIGNAL: zx::Signals = zx::Signals::USER_1;
61
62pub struct SessionManager<I: Interface + ?Sized> {
63 interface: Arc<I>,
64 active_requests: ActiveRequests<Arc<Session<I>>>,
66 inflight_requests: Mutex<usize>,
67 no_inflight_requests_condvar: Condvar,
68 inner: Mutex<SessionManagerInner<I>>,
69 no_open_sessions_condvar: Condvar,
70}
71
72impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
73 const SUPPORTS_DECOMPRESSION: bool = false;
74
75 type Orchestrator = I::Orchestrator;
76 type Session = Arc<Session<I>>;
77
78 async fn on_attach_vmo(
79 _orchestrator: Arc<Self::Orchestrator>,
80 _vmo: &Arc<zx::Vmo>,
81 ) -> Result<(), zx::Status> {
82 Ok(())
83 }
84
85 async fn open_session(
86 orchestrator: Arc<Self::Orchestrator>,
87 mut stream: fblock::SessionRequestStream,
88 offset_map: OffsetMap,
89 block_size: u32,
90 ) -> Result<(), Error> {
91 let (helper, fifo) = SessionHelper::new(orchestrator.clone(), offset_map, block_size)?;
92 let (abort_handle, registration) = AbortHandle::new_pair();
93 let session = Arc::new(Session { helper, fifo, queue: Mutex::default(), abort_handle });
94 let sm = orchestrator.as_ref().borrow();
95 sm.inner
96 .lock()
97 .open_sessions
98 .insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
99
100 sm.interface.spawn_session(session.clone());
101
102 let result = Abortable::new(
103 async {
104 while let Some(request) = stream.try_next().await? {
105 session.helper.handle_request(request).await?;
106 }
107 Ok(())
108 },
109 registration,
110 )
111 .await
112 .unwrap_or_else(|e| Err(e.into()));
113
114 let _ = session.fifo.signal(zx::Signals::empty(), SHUTDOWN_SIGNAL);
115
116 result
117 }
118
119 fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
120 self.interface.get_info()
121 }
122
123 fn active_requests(&self) -> &ActiveRequests<Arc<Session<I>>> {
124 &self.active_requests
125 }
126}
127
128impl<I: Interface + ?Sized> SessionManager<I> {
129 pub fn new(interface: Arc<I>) -> Self {
130 Self {
131 interface,
132 active_requests: ActiveRequests::default(),
133 inflight_requests: Mutex::new(0),
134 no_inflight_requests_condvar: Condvar::new(),
135 inner: Mutex::new(SessionManagerInner { open_sessions: HashMap::new() }),
136 no_open_sessions_condvar: Condvar::new(),
137 }
138 }
139
140 pub fn complete_request(&self, request_id: RequestId, status: zx::Status) {
142 let notify = {
143 let mut inflight_requests = self.inflight_requests.lock();
144 *inflight_requests -= 1;
145 *inflight_requests == 0
146 };
147 self.complete_unsubmitted_request(request_id, status);
148 if notify {
149 self.no_inflight_requests_condvar.notify_all();
150 }
151 }
152
153 fn submit_requests(&self, requests: &[Request]) {
154 *self.inflight_requests.lock() += requests.len();
155 self.interface.on_requests(requests);
156 }
157
158 fn wait_for_no_inflight_requests(&self) {
163 let mut guard = self.inflight_requests.lock();
164 self.no_inflight_requests_condvar.wait_while(&mut guard, |count| *count > 0);
165 }
166
167 fn complete_unsubmitted_request(&self, request_id: RequestId, status: zx::Status) {
170 if let Some((session, response)) =
171 self.active_requests.complete_and_take_response(request_id, status)
172 {
173 session.send_response(response);
174 }
175 }
176
177 pub fn terminate(&self) {
179 {
180 #[allow(clippy::collection_is_never_read)]
183 let mut terminated_sessions = Vec::new();
184 for (_, session) in &self.inner.lock().open_sessions {
185 if let Some(session) = session.upgrade() {
186 session.terminate_async();
187 terminated_sessions.push(session);
188 }
189 }
190 }
191 let mut guard = self.inner.lock();
192 self.no_open_sessions_condvar.wait_while(&mut guard, |s| !s.open_sessions.is_empty());
193 }
194}
195
196pub struct Session<I: Interface + ?Sized> {
197 helper: SessionHelper<SessionManager<I>>,
198 fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
199 queue: Mutex<SessionQueue>,
200 abort_handle: AbortHandle,
201}
202
203#[derive(Default)]
204struct SessionQueue {
205 responses: VecDeque<BlockFifoResponse>,
206}
207
208pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
209
210struct DecodedRequests {
211 requests: [MaybeUninit<Request>; MAX_REQUESTS],
212 count: usize,
213}
214
215impl Default for DecodedRequests {
216 fn default() -> Self {
217 Self { requests: unsafe { MaybeUninit::uninit().assume_init() }, count: 0 }
218 }
219}
220
221impl DecodedRequests {
222 fn push(&mut self, request: Request) {
223 assert!(self.count < MAX_REQUESTS);
224 self.requests[self.count].write(request);
227 self.count += 1;
228 }
229
230 fn is_empty(&self) -> bool {
231 self.count == 0
232 }
233
234 fn is_full(&self) -> bool {
235 self.count == MAX_REQUESTS
236 }
237
238 fn clear(&mut self) {
239 for i in 0..self.count {
240 unsafe { self.requests[i].assume_init_drop() };
242 }
243 self.count = 0;
244 }
245}
246
247impl Drop for DecodedRequests {
248 fn drop(&mut self) {
249 self.clear();
250 }
251}
252
253impl std::ops::Deref for DecodedRequests {
254 type Target = [Request];
255
256 fn deref(&self) -> &Self::Target {
257 unsafe { std::slice::from_raw_parts(self.requests[0].as_ptr(), self.count) }
259 }
260}
261
262impl std::ops::DerefMut for DecodedRequests {
263 fn deref_mut(&mut self) -> &mut Self::Target {
264 unsafe { std::slice::from_raw_parts_mut(self.requests[0].as_mut_ptr(), self.count) }
266 }
267}
268
269impl<I: Interface + ?Sized> Session<I> {
270 pub fn run(self: &Arc<Self>) {
273 self.fifo_loop();
274 self.abort_handle.abort();
275 self.helper.drop_active_requests(|s| Arc::ptr_eq(s, self));
276 }
277
278 fn fifo_loop(self: &Arc<Self>) {
279 let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
280
281 loop {
282 let is_queue_empty = {
284 let mut queue = self.queue.lock();
285 while !queue.responses.is_empty() {
286 let (front, _) = queue.responses.as_slices();
287 match self.fifo.write(front) {
288 Ok(count) => {
289 let full = count < front.len();
290 queue.responses.drain(..count);
291 if full {
292 break;
293 }
294 }
295 Err(zx::Status::SHOULD_WAIT) => break,
296 Err(_) => return,
297 }
298 }
299 queue.responses.is_empty()
300 };
301
302 match self.fifo.read_uninit(&mut requests) {
304 Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
305 Err(zx::Status::SHOULD_WAIT) => {
306 let mut signals =
307 zx::Signals::OBJECT_READABLE | SHUTDOWN_SIGNAL | FIFO_WAKE_SIGNAL;
308 if !is_queue_empty {
309 signals |= zx::Signals::OBJECT_WRITABLE;
310 }
311 let Ok(signals) =
312 self.fifo.wait_one(signals, zx::MonotonicInstant::INFINITE).to_result()
313 else {
314 return;
315 };
316 if signals.contains(SHUTDOWN_SIGNAL) {
317 return;
318 }
319 if signals.contains(FIFO_WAKE_SIGNAL) {
321 let _ = self.fifo.signal(FIFO_WAKE_SIGNAL, zx::Signals::empty());
322 }
323 }
324 Err(_) => return,
325 }
326 }
327 }
328
329 fn pre_flush(self: &Arc<Self>, request_id: RequestId) -> Result<(), zx::Status> {
331 let trace_flow_id = {
332 let mut request = self.helper.session_manager().active_requests.request(request_id);
333 if let Some(id) = request.trace_flow_id {
334 fuchsia_trace::async_instant!(
335 fuchsia_trace::Id::from(id.get()),
336 c"storage",
337 c"block_server::SimulatedBarrier",
338 "request_id" => request_id.0
339 );
340 }
341 request.count += 1;
342 request.trace_flow_id
343 };
344 self.helper.session_manager().submit_requests(&[Request {
345 request_id,
346 operation: Operation::Flush,
347 trace_flow_id,
348 vmo: None,
349 }]);
350 self.helper.session_manager().wait_for_no_inflight_requests();
351 let status = self.helper.session_manager().active_requests.request(request_id).status;
352 match status {
353 zx::Status::OK => Ok(()),
354 status => {
355 self.helper.session_manager().complete_unsubmitted_request(request_id, status);
357 Err(status)
358 }
359 }
360 }
361
362 fn post_flush(self: &Arc<Self>, request_id: RequestId, decoded_requests: &mut DecodedRequests) {
365 if !decoded_requests.is_empty() {
366 self.helper.session_manager().submit_requests(decoded_requests);
367 decoded_requests.clear();
368 }
369 self.helper.session_manager().wait_for_no_inflight_requests();
370 let request = self.helper.session_manager().active_requests.request(request_id);
371 match request.status {
372 zx::Status::OK => decoded_requests.push(Request {
373 request_id,
374 operation: Operation::Flush,
375 trace_flow_id: request.trace_flow_id,
376 vmo: None,
377 }),
378 status => {
379 drop(request);
380 self.helper.session_manager().complete_unsubmitted_request(request_id, status)
381 }
382 }
383 }
384
385 fn handle_requests<'a>(
386 self: &Arc<Self>,
387 requests: impl Iterator<Item = &'a mut BlockFifoRequest>,
388 ) {
389 let manager = &self.helper.session_manager();
390 let mut decoded_requests = DecodedRequests::default();
391
392 for request in requests {
393 match self.helper.decode_fifo_request(self.clone(), request) {
394 Ok(DecodedRequest { operation: Operation::CloseVmo, request_id, .. }) => {
395 manager.complete_unsubmitted_request(request_id, zx::Status::OK);
396 }
397 Ok(mut request) => {
398 let request_id = request.request_id;
399
400 if !manager
403 .interface
404 .get_info()
405 .device_flags()
406 .contains(fblock::DeviceFlag::BARRIER_SUPPORT)
407 && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
408 && self.pre_flush(request_id).is_err()
409 {
410 continue;
411 }
412 let simulate_fua = !manager
415 .interface
416 .get_info()
417 .device_flags()
418 .contains(fblock::DeviceFlag::FUA_SUPPORT)
419 && request.operation.take_write_flag(WriteFlags::FORCE_ACCESS);
420
421 if simulate_fua {
422 manager.active_requests.request(request_id).count += 1;
424 }
425
426 loop {
427 let result = self
428 .helper
429 .map_request(request, &mut manager.active_requests.request(request_id));
430 match result {
431 Ok((
432 DecodedRequest { request_id, operation, vmo, trace_flow_id },
433 remainder,
434 )) => {
435 decoded_requests.push(Request {
436 request_id,
437 operation,
438 trace_flow_id,
439 vmo,
440 });
441
442 if decoded_requests.is_full() {
443 manager.submit_requests(&*decoded_requests);
444 decoded_requests.clear();
445 }
446
447 if let Some(r) = remainder {
448 request = r;
449 } else {
450 break;
451 }
452 }
453 Err(status) => {
454 manager.complete_unsubmitted_request(request_id, status);
455 break;
456 }
457 }
458 }
459
460 if simulate_fua {
461 self.post_flush(request_id, &mut decoded_requests);
462 }
463 }
464 Err(None) => {}
465 Err(Some(response)) => self.send_response(response),
466 }
467 }
468
469 if !decoded_requests.is_empty() {
470 manager.submit_requests(&decoded_requests);
471 }
472 }
473
474 fn send_response(&self, response: BlockFifoResponse) {
475 let mut queue = self.queue.lock();
476 if queue.responses.is_empty() {
477 match self.fifo.write_one(&response) {
478 Ok(()) => {
479 return;
480 }
481 Err(_) => {
482 let _ = self.fifo.signal(zx::Signals::empty(), FIFO_WAKE_SIGNAL);
484 }
485 }
486 }
487 queue.responses.push_back(response);
488 }
489
490 pub fn terminate_async(&self) {
493 let _ = self.fifo.signal(zx::Signals::empty(), SHUTDOWN_SIGNAL);
494 self.abort_handle.abort();
495 }
496}
497
498impl<I: Interface + ?Sized> Drop for Session<I> {
499 fn drop(&mut self) {
500 let mut inner = self.helper.session_manager().inner.lock();
501 let notify = {
502 inner.open_sessions.remove(&(self as *const _ as usize));
503 inner.open_sessions.is_empty()
504 };
505 if notify {
506 self.helper.session_manager().no_open_sessions_condvar.notify_all();
507 }
508 }
509}
510
511impl<I: Interface + ?Sized> Drop for SessionManager<I> {
512 fn drop(&mut self) {
513 self.terminate();
514 }
515}
516
517impl<I: Interface<Orchestrator = SessionManager<I>>> IntoOrchestrator for Arc<SessionManager<I>> {
518 type SM = SessionManager<I>;
519
520 fn into_orchestrator(self) -> Arc<I::Orchestrator> {
521 self
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528 use crate::BlockInfo;
529 use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse};
530 use fidl::endpoints::create_proxy_and_stream;
531 use zx::HandleBased;
532 use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
533
534 struct MockInterface {
535 request_sender: std::sync::mpsc::Sender<Request>,
536 }
537
538 impl Interface for MockInterface {
539 type Orchestrator = SessionManager<Self>;
540
541 fn get_info(&self) -> Cow<'_, DeviceInfo> {
542 Cow::Owned(DeviceInfo::Block(BlockInfo { block_count: 1024, ..Default::default() }))
543 }
544
545 fn spawn_session(&self, session: Arc<Session<Self>>) {
546 std::thread::spawn(move || {
547 session.run();
548 });
549 }
550
551 fn on_requests(&self, requests: &[Request]) {
552 for request in requests {
553 self.request_sender.send(request.clone()).unwrap();
554 }
555 }
556 }
557
558 #[fasync::run_singlethreaded(test)]
559 async fn test_basic_request() {
560 let (tx, rx) = std::sync::mpsc::channel();
561 let interface = Arc::new(MockInterface { request_sender: tx });
562 let session_manager = Arc::new(SessionManager::new(interface.clone()));
563
564 let sm_clone = session_manager.clone();
565 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
566 let _server_task = fasync::Task::spawn(async move {
567 let server = crate::BlockServer::new(512, sm_clone);
568 server.handle_requests(stream).await.unwrap();
569 })
570 .detach();
571
572 let (session_proxy, session_server_end) =
573 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
574 proxy.open_session(session_server_end).unwrap();
575
576 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
577 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
578
579 let vmo = zx::Vmo::create(8192).unwrap();
580 let vmo_id = session_proxy
581 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
582 .await
583 .unwrap()
584 .unwrap();
585
586 let req = BlockFifoRequest {
587 command: BlockFifoCommand {
588 opcode: fblock::BlockOpcode::Read.into_primitive(),
589 ..Default::default()
590 },
591 reqid: 123,
592 group: 0,
593 vmoid: vmo_id.id,
594 length: 1,
595 vmo_offset: 0,
596 dev_offset: 0,
597 trace_flow_id: 0,
598 ..Default::default()
599 };
600 fifo.write(&[req]).unwrap();
601
602 let r = rx.recv().unwrap();
603 assert_eq!(r.request_id.0, 0);
604 assert!(matches!(r.operation, Operation::Read { .. }));
605
606 session_manager.complete_request(r.request_id, zx::Status::OK);
607
608 let signals =
609 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
610 assert!(signals.contains(zx::Signals::FIFO_READABLE));
611
612 let mut resp = [BlockFifoResponse::default()];
613 fifo.read(&mut resp).unwrap();
614 assert_eq!(resp[0].reqid, 123);
615 assert_eq!(resp[0].status, zx::sys::ZX_OK);
616
617 std::mem::drop(proxy);
618 }
619 #[fasync::run_singlethreaded(test)]
620 async fn test_write_request() {
621 let (tx, rx) = std::sync::mpsc::channel();
622 let interface = Arc::new(MockInterface { request_sender: tx });
623 let session_manager = Arc::new(SessionManager::new(interface.clone()));
624
625 let sm_clone = session_manager.clone();
626 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
627 let _server_task = fasync::Task::spawn(async move {
628 let server = crate::BlockServer::new(512, sm_clone);
629 server.handle_requests(stream).await.unwrap();
630 })
631 .detach();
632
633 let (session_proxy, session_server_end) =
634 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
635 proxy.open_session(session_server_end).unwrap();
636
637 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
638 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
639
640 let vmo = zx::Vmo::create(8192).unwrap();
641 let vmo_id = session_proxy
642 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
643 .await
644 .unwrap()
645 .unwrap();
646
647 let req = BlockFifoRequest {
648 command: BlockFifoCommand {
649 opcode: fblock::BlockOpcode::Write.into_primitive(),
650 ..Default::default()
651 },
652 reqid: 124,
653 group: 0,
654 vmoid: vmo_id.id,
655 length: 1,
656 vmo_offset: 0,
657 dev_offset: 0,
658 trace_flow_id: 0,
659 ..Default::default()
660 };
661 fifo.write(&[req]).unwrap();
662
663 let r = rx.recv().unwrap();
664 assert_eq!(r.request_id.0, 0);
665 assert!(matches!(r.operation, Operation::Write { .. }));
666
667 session_manager.complete_request(r.request_id, zx::Status::OK);
668
669 let signals =
670 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
671 assert!(signals.contains(zx::Signals::FIFO_READABLE));
672
673 let mut resp = [BlockFifoResponse::default()];
674 fifo.read(&mut resp).unwrap();
675 assert_eq!(resp[0].reqid, 124);
676 assert_eq!(resp[0].status, zx::sys::ZX_OK);
677 }
678
679 #[fasync::run_singlethreaded(test)]
680 async fn test_flush_request() {
681 let (tx, rx) = std::sync::mpsc::channel();
682 let interface = Arc::new(MockInterface { request_sender: tx });
683 let session_manager = Arc::new(SessionManager::new(interface.clone()));
684
685 let sm_clone = session_manager.clone();
686 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
687 let _server_task = fasync::Task::spawn(async move {
688 let server = crate::BlockServer::new(512, sm_clone);
689 server.handle_requests(stream).await.unwrap();
690 })
691 .detach();
692
693 let (session_proxy, session_server_end) =
694 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
695 proxy.open_session(session_server_end).unwrap();
696
697 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
698 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
699
700 let req = BlockFifoRequest {
701 command: BlockFifoCommand {
702 opcode: fblock::BlockOpcode::Flush.into_primitive(),
703 ..Default::default()
704 },
705 reqid: 125,
706 group: 0,
707 vmoid: fblock::VMOID_INVALID,
708 length: 0,
709 vmo_offset: 0,
710 dev_offset: 0,
711 trace_flow_id: 0,
712 ..Default::default()
713 };
714 fifo.write(&[req]).unwrap();
715
716 let r = rx.recv().unwrap();
717 assert_eq!(r.request_id.0, 0);
718 assert!(matches!(r.operation, Operation::Flush { .. }));
719
720 session_manager.complete_request(r.request_id, zx::Status::OK);
721
722 let signals =
723 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
724 assert!(signals.contains(zx::Signals::FIFO_READABLE));
725
726 let mut resp = [BlockFifoResponse::default()];
727 fifo.read(&mut resp).unwrap();
728 assert_eq!(resp[0].reqid, 125);
729 assert_eq!(resp[0].status, zx::sys::ZX_OK);
730 }
731
732 #[fasync::run_singlethreaded(test)]
733 async fn test_trim_request() {
734 let (tx, rx) = std::sync::mpsc::channel();
735 let interface = Arc::new(MockInterface { request_sender: tx });
736 let session_manager = Arc::new(SessionManager::new(interface.clone()));
737
738 let sm_clone = session_manager.clone();
739 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
740 let _server_task = fasync::Task::spawn(async move {
741 let server = crate::BlockServer::new(512, sm_clone);
742 server.handle_requests(stream).await.unwrap();
743 })
744 .detach();
745
746 let (session_proxy, session_server_end) =
747 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
748 proxy.open_session(session_server_end).unwrap();
749
750 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
751 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
752
753 let req = BlockFifoRequest {
754 command: BlockFifoCommand {
755 opcode: fblock::BlockOpcode::Trim.into_primitive(),
756 ..Default::default()
757 },
758 reqid: 126,
759 group: 0,
760 vmoid: fblock::VMOID_INVALID,
761 length: 1,
762 vmo_offset: 0,
763 dev_offset: 0,
764 trace_flow_id: 0,
765 ..Default::default()
766 };
767 fifo.write(&[req]).unwrap();
768
769 let r = rx.recv().unwrap();
770 assert_eq!(r.request_id.0, 0);
771 assert!(matches!(r.operation, Operation::Trim { .. }));
772
773 session_manager.complete_request(r.request_id, zx::Status::OK);
774
775 let signals =
776 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
777 assert!(signals.contains(zx::Signals::FIFO_READABLE));
778
779 let mut resp = [BlockFifoResponse::default()];
780 fifo.read(&mut resp).unwrap();
781 assert_eq!(resp[0].reqid, 126);
782 assert_eq!(resp[0].status, zx::sys::ZX_OK);
783 }
784
785 #[fasync::run_singlethreaded(test)]
786 async fn test_close_vmo() {
787 let (tx, rx) = std::sync::mpsc::channel();
788 let interface = Arc::new(MockInterface { request_sender: tx });
789 let session_manager = Arc::new(SessionManager::new(interface.clone()));
790
791 let sm_clone = session_manager.clone();
792 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
793 let _server_task = fasync::Task::spawn(async move {
794 let server = crate::BlockServer::new(512, sm_clone);
795 server.handle_requests(stream).await.unwrap();
796 })
797 .detach();
798
799 let (session_proxy, session_server_end) =
800 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
801 proxy.open_session(session_server_end).unwrap();
802
803 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
804 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
805
806 let vmo = zx::Vmo::create(8192).unwrap();
807 let vmo_id = session_proxy
808 .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
809 .await
810 .unwrap()
811 .unwrap();
812
813 let req = BlockFifoRequest {
814 command: BlockFifoCommand {
815 opcode: fblock::BlockOpcode::CloseVmo.into_primitive(),
816 ..Default::default()
817 },
818 reqid: 127,
819 group: 0,
820 vmoid: vmo_id.id,
821 length: 0,
822 vmo_offset: 0,
823 dev_offset: 0,
824 trace_flow_id: 0,
825 ..Default::default()
826 };
827 fifo.write(&[req]).unwrap();
828
829 let signals =
830 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
831 assert!(signals.contains(zx::Signals::FIFO_READABLE));
832
833 let mut resp = [BlockFifoResponse::default()];
834 fifo.read(&mut resp).unwrap();
835 assert_eq!(resp[0].reqid, 127);
836 assert_eq!(resp[0].status, zx::sys::ZX_OK);
837
838 assert!(rx.try_recv().is_err());
840 }
841
842 #[fasync::run_singlethreaded(test)]
843 async fn test_error() {
844 let (tx, rx) = std::sync::mpsc::channel();
845 let interface = Arc::new(MockInterface { request_sender: tx });
846 let session_manager = Arc::new(SessionManager::new(interface.clone()));
847
848 let sm_clone = session_manager.clone();
849 let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
850 let _server_task = fasync::Task::spawn(async move {
851 let server = crate::BlockServer::new(512, sm_clone);
852 server.handle_requests(stream).await.unwrap();
853 })
854 .detach();
855
856 let (session_proxy, session_server_end) =
857 fidl::endpoints::create_proxy::<fblock::SessionMarker>();
858 proxy.open_session(session_server_end).unwrap();
859
860 let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
861 let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
862
863 let req = BlockFifoRequest {
864 command: BlockFifoCommand {
865 opcode: fblock::BlockOpcode::Flush.into_primitive(),
866 ..Default::default()
867 },
868 reqid: 128,
869 group: 0,
870 vmoid: fblock::VMOID_INVALID,
871 length: 0,
872 vmo_offset: 0,
873 dev_offset: 0,
874 trace_flow_id: 0,
875 ..Default::default()
876 };
877 fifo.write(&[req]).unwrap();
878
879 let r = rx.recv().unwrap();
880 session_manager.complete_request(r.request_id, zx::Status::IO);
881
882 let signals =
883 fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
884 assert!(signals.contains(zx::Signals::FIFO_READABLE));
885
886 let mut resp = [BlockFifoResponse::default()];
887 fifo.read(&mut resp).unwrap();
888 assert_eq!(resp[0].reqid, 128);
889 assert_eq!(resp[0].status, zx::sys::ZX_ERR_IO);
890 }
891}