Skip to main content

block_server/
callback_interface.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::{
6    ActiveRequests, DecodedRequest, DeviceInfo, IntoOrchestrator, OffsetMap, Operation, RequestId,
7    SessionHelper, TraceFlowId, WriteFlags,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse};
11use fidl_fuchsia_storage_block as fblock;
12use fuchsia_sync::{Condvar, Mutex};
13use futures::TryStreamExt as _;
14use futures::stream::{AbortHandle, Abortable};
15use std::borrow::{Borrow, Cow};
16use std::collections::{HashMap, VecDeque};
17use std::mem::MaybeUninit;
18use std::sync::{Arc, Weak};
19
20/// An in-flight request.
21#[derive(Clone, Debug)]
22pub struct Request {
23    /// The ID that a request is associated with, for later completion in
24    /// [`SessionManager::complete_request`].  Note that this is not necessarily a unique
25    /// identifier, and multiple Requests may have the same request_id (e.g. due to request
26    /// splitting).  The library internally reference-counts requests which use this ID.
27    pub request_id: RequestId,
28    pub operation: Operation,
29    pub trace_flow_id: TraceFlowId,
30    /// `vmo` is always Some for Operation::Read or Operation::Write, and None otherwise.
31    pub vmo: Option<Arc<zx::Vmo>>,
32}
33
34pub trait Interface: Send + Sync + Unpin + 'static {
35    type Orchestrator: Borrow<SessionManager<Self>> + Send + Sync;
36
37    /// Called to get block/partition information.
38    fn get_info(&self) -> Cow<'_, DeviceInfo>;
39
40    /// Start running a new session.  The interface must ensure that [`Session::run`] is called
41    /// on a different thread.
42    fn spawn_session(&self, session: Arc<Session<Self>>);
43
44    /// Starts a batch of requests.  The implementation may block if there are too many in-flight
45    /// requests, providing pushback.  The interface is responsible for eventually calling
46    /// [`SessionManager::complete_request`] for each request.
47    fn on_requests(&self, requests: &[Request]);
48}
49
50struct SessionManagerInner<I: Interface + ?Sized> {
51    open_sessions: HashMap<usize, Weak<Session<I>>>,
52}
53
54// The signals used in `SessionManagerInner::event`.
55
56/// Signalled on `SessionManagerInner::event` to wake up the FIFO loop.
57const FIFO_WAKE_SIGNAL: zx::Signals = zx::Signals::USER_0;
58
59/// Signalled on `SessionManagerInner::event` to terminate the FIFO loop.
60const SHUTDOWN_SIGNAL: zx::Signals = zx::Signals::USER_1;
61
62pub struct SessionManager<I: Interface + ?Sized> {
63    interface: Arc<I>,
64    // These represent active *client* requests, which correspond to one or more in-flight requests.
65    active_requests: ActiveRequests<Arc<Session<I>>>,
66    inflight_requests: Mutex<usize>,
67    no_inflight_requests_condvar: Condvar,
68    inner: Mutex<SessionManagerInner<I>>,
69    no_open_sessions_condvar: Condvar,
70}
71
72impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
73    const SUPPORTS_DECOMPRESSION: bool = false;
74
75    type Orchestrator = I::Orchestrator;
76    type Session = Arc<Session<I>>;
77
78    async fn on_attach_vmo(
79        _orchestrator: Arc<Self::Orchestrator>,
80        _vmo: &Arc<zx::Vmo>,
81    ) -> Result<(), zx::Status> {
82        Ok(())
83    }
84
85    async fn open_session(
86        orchestrator: Arc<Self::Orchestrator>,
87        mut stream: fblock::SessionRequestStream,
88        offset_map: OffsetMap,
89        block_size: u32,
90    ) -> Result<(), Error> {
91        let (helper, fifo) = SessionHelper::new(orchestrator.clone(), offset_map, block_size)?;
92        let (abort_handle, registration) = AbortHandle::new_pair();
93        let session = Arc::new(Session { helper, fifo, queue: Mutex::default(), abort_handle });
94        let sm = orchestrator.as_ref().borrow();
95        sm.inner
96            .lock()
97            .open_sessions
98            .insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
99
100        sm.interface.spawn_session(session.clone());
101
102        let result = Abortable::new(
103            async {
104                while let Some(request) = stream.try_next().await? {
105                    session.helper.handle_request(request).await?;
106                }
107                Ok(())
108            },
109            registration,
110        )
111        .await
112        .unwrap_or_else(|e| Err(e.into()));
113
114        let _ = session.fifo.signal(zx::Signals::empty(), SHUTDOWN_SIGNAL);
115
116        result
117    }
118
119    fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
120        self.interface.get_info()
121    }
122
123    fn active_requests(&self) -> &ActiveRequests<Arc<Session<I>>> {
124        &self.active_requests
125    }
126}
127
128impl<I: Interface + ?Sized> SessionManager<I> {
129    pub fn new(interface: Arc<I>) -> Self {
130        Self {
131            interface,
132            active_requests: ActiveRequests::default(),
133            inflight_requests: Mutex::new(0),
134            no_inflight_requests_condvar: Condvar::new(),
135            inner: Mutex::new(SessionManagerInner { open_sessions: HashMap::new() }),
136            no_open_sessions_condvar: Condvar::new(),
137        }
138    }
139
140    /// Reports the given task as complete with a given status.
141    pub fn complete_request(&self, request_id: RequestId, status: zx::Status) {
142        let notify = {
143            let mut inflight_requests = self.inflight_requests.lock();
144            *inflight_requests -= 1;
145            *inflight_requests == 0
146        };
147        self.complete_unsubmitted_request(request_id, status);
148        if notify {
149            self.no_inflight_requests_condvar.notify_all();
150        }
151    }
152
153    fn submit_requests(&self, requests: &[Request]) {
154        *self.inflight_requests.lock() += requests.len();
155        self.interface.on_requests(requests);
156    }
157
158    /// Waits for there to be no requests in-flight.
159    ///
160    /// NOTE: To void TOCTOUs, this must be called on the same thread which calls
161    /// [`Self::submit_requests`].
162    fn wait_for_no_inflight_requests(&self) {
163        let mut guard = self.inflight_requests.lock();
164        self.no_inflight_requests_condvar.wait_while(&mut guard, |count| *count > 0);
165    }
166
167    /// Called instead of `[Self::complete_request]` when a request is completed before it was
168    /// actually submitted.
169    fn complete_unsubmitted_request(&self, request_id: RequestId, status: zx::Status) {
170        if let Some((session, response)) =
171            self.active_requests.complete_and_take_response(request_id, status)
172        {
173            session.send_response(response);
174        }
175    }
176
177    /// Terminates the session manager.  Blocks until all sessions have terminated.
178    pub fn terminate(&self) {
179        {
180            // We must drop references to sessions whilst we're not holding the lock for
181            // `open_sessions` because `Session::drop` needs to take that same lock.
182            #[allow(clippy::collection_is_never_read)]
183            let mut terminated_sessions = Vec::new();
184            for (_, session) in &self.inner.lock().open_sessions {
185                if let Some(session) = session.upgrade() {
186                    session.terminate_async();
187                    terminated_sessions.push(session);
188                }
189            }
190        }
191        let mut guard = self.inner.lock();
192        self.no_open_sessions_condvar.wait_while(&mut guard, |s| !s.open_sessions.is_empty());
193    }
194}
195
196pub struct Session<I: Interface + ?Sized> {
197    helper: SessionHelper<SessionManager<I>>,
198    fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
199    queue: Mutex<SessionQueue>,
200    abort_handle: AbortHandle,
201}
202
203#[derive(Default)]
204struct SessionQueue {
205    responses: VecDeque<BlockFifoResponse>,
206}
207
208pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
209
210struct DecodedRequests {
211    requests: [MaybeUninit<Request>; MAX_REQUESTS],
212    count: usize,
213}
214
215impl Default for DecodedRequests {
216    fn default() -> Self {
217        Self { requests: unsafe { MaybeUninit::uninit().assume_init() }, count: 0 }
218    }
219}
220
221impl DecodedRequests {
222    fn push(&mut self, request: Request) {
223        assert!(self.count < MAX_REQUESTS);
224        // To ensure drop runs, we must initialize each element at most once.
225        // As long as we only go via this function, that's satisfied.
226        self.requests[self.count].write(request);
227        self.count += 1;
228    }
229
230    fn is_empty(&self) -> bool {
231        self.count == 0
232    }
233
234    fn is_full(&self) -> bool {
235        self.count == MAX_REQUESTS
236    }
237
238    fn clear(&mut self) {
239        for i in 0..self.count {
240            // SAFETY: We initialized `count` elements via [`Self::push`].
241            unsafe { self.requests[i].assume_init_drop() };
242        }
243        self.count = 0;
244    }
245}
246
247impl Drop for DecodedRequests {
248    fn drop(&mut self) {
249        self.clear();
250    }
251}
252
253impl std::ops::Deref for DecodedRequests {
254    type Target = [Request];
255
256    fn deref(&self) -> &Self::Target {
257        // SAFETY: We wrote the request in [`Self::push`].
258        unsafe { std::slice::from_raw_parts(self.requests[0].as_ptr(), self.count) }
259    }
260}
261
262impl std::ops::DerefMut for DecodedRequests {
263    fn deref_mut(&mut self) -> &mut Self::Target {
264        // SAFETY: We wrote the request in [`Self::push`].
265        unsafe { std::slice::from_raw_parts_mut(self.requests[0].as_mut_ptr(), self.count) }
266    }
267}
268
269impl<I: Interface + ?Sized> Session<I> {
270    /// Begins processing the session's FIFO.  Blocks until completion (either because of
271    /// termination, or due to unrecoverable error).
272    pub fn run(self: &Arc<Self>) {
273        self.fifo_loop();
274        self.abort_handle.abort();
275        self.helper.drop_active_requests(|s| Arc::ptr_eq(s, self));
276    }
277
278    fn fifo_loop(self: &Arc<Self>) {
279        let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
280
281        loop {
282            // Send queued responses.
283            let is_queue_empty = {
284                let mut queue = self.queue.lock();
285                while !queue.responses.is_empty() {
286                    let (front, _) = queue.responses.as_slices();
287                    match self.fifo.write(front) {
288                        Ok(count) => {
289                            let full = count < front.len();
290                            queue.responses.drain(..count);
291                            if full {
292                                break;
293                            }
294                        }
295                        Err(zx::Status::SHOULD_WAIT) => break,
296                        Err(_) => return,
297                    }
298                }
299                queue.responses.is_empty()
300            };
301
302            // Process pending reads.
303            match self.fifo.read_uninit(&mut requests) {
304                Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
305                Err(zx::Status::SHOULD_WAIT) => {
306                    let mut signals =
307                        zx::Signals::OBJECT_READABLE | SHUTDOWN_SIGNAL | FIFO_WAKE_SIGNAL;
308                    if !is_queue_empty {
309                        signals |= zx::Signals::OBJECT_WRITABLE;
310                    }
311                    let Ok(signals) =
312                        self.fifo.wait_one(signals, zx::MonotonicInstant::INFINITE).to_result()
313                    else {
314                        return;
315                    };
316                    if signals.contains(SHUTDOWN_SIGNAL) {
317                        return;
318                    }
319                    // Clear FIFO_WAKE_SIGNAL if it's set.
320                    if signals.contains(FIFO_WAKE_SIGNAL) {
321                        let _ = self.fifo.signal(FIFO_WAKE_SIGNAL, zx::Signals::empty());
322                    }
323                }
324                Err(_) => return,
325            }
326        }
327    }
328
329    /// Synchronously performs a device flush.
330    fn pre_flush(self: &Arc<Self>, request_id: RequestId) -> Result<(), zx::Status> {
331        let trace_flow_id = {
332            let mut request = self.helper.session_manager().active_requests.request(request_id);
333            if let Some(id) = request.trace_flow_id {
334                fuchsia_trace::async_instant!(
335                    fuchsia_trace::Id::from(id.get()),
336                    c"storage",
337                    c"block_server::SimulatedBarrier",
338                    "request_id" => request_id.0
339                );
340            }
341            request.count += 1;
342            request.trace_flow_id
343        };
344        self.helper.session_manager().submit_requests(&[Request {
345            request_id,
346            operation: Operation::Flush,
347            trace_flow_id,
348            vmo: None,
349        }]);
350        self.helper.session_manager().wait_for_no_inflight_requests();
351        let status = self.helper.session_manager().active_requests.request(request_id).status;
352        match status {
353            zx::Status::OK => Ok(()),
354            status => {
355                // Respond for the unsubmitted request too.
356                self.helper.session_manager().complete_unsubmitted_request(request_id, status);
357                Err(status)
358            }
359        }
360    }
361
362    /// Synchronously completes `decoded_requests`, and inserts a post-flush into `decoded_requests`
363    /// to be submitted later.
364    fn post_flush(self: &Arc<Self>, request_id: RequestId, decoded_requests: &mut DecodedRequests) {
365        if !decoded_requests.is_empty() {
366            self.helper.session_manager().submit_requests(decoded_requests);
367            decoded_requests.clear();
368        }
369        self.helper.session_manager().wait_for_no_inflight_requests();
370        let request = self.helper.session_manager().active_requests.request(request_id);
371        match request.status {
372            zx::Status::OK => decoded_requests.push(Request {
373                request_id,
374                operation: Operation::Flush,
375                trace_flow_id: request.trace_flow_id,
376                vmo: None,
377            }),
378            status => {
379                drop(request);
380                self.helper.session_manager().complete_unsubmitted_request(request_id, status)
381            }
382        }
383    }
384
385    fn handle_requests<'a>(
386        self: &Arc<Self>,
387        requests: impl Iterator<Item = &'a mut BlockFifoRequest>,
388    ) {
389        let manager = &self.helper.session_manager();
390        let mut decoded_requests = DecodedRequests::default();
391
392        for request in requests {
393            match self.helper.decode_fifo_request(self.clone(), request) {
394                Ok(DecodedRequest { operation: Operation::CloseVmo, request_id, .. }) => {
395                    manager.complete_unsubmitted_request(request_id, zx::Status::OK);
396                }
397                Ok(mut request) => {
398                    let request_id = request.request_id;
399
400                    // Strip the PRE_BARRIER flag if we don't support it, and simulate the barrier
401                    // with a pre-flush.
402                    if !manager
403                        .interface
404                        .get_info()
405                        .device_flags()
406                        .contains(fblock::DeviceFlag::BARRIER_SUPPORT)
407                        && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
408                        && self.pre_flush(request_id).is_err()
409                    {
410                        continue;
411                    }
412                    // Strip the FORCE_ACCESS flag if we don't support it, and simulate the FUA with
413                    // a post-flush.
414                    let simulate_fua = !manager
415                        .interface
416                        .get_info()
417                        .device_flags()
418                        .contains(fblock::DeviceFlag::FUA_SUPPORT)
419                        && request.operation.take_write_flag(WriteFlags::FORCE_ACCESS);
420
421                    if simulate_fua {
422                        // Account for the additional request we need at the end.
423                        manager.active_requests.request(request_id).count += 1;
424                    }
425
426                    loop {
427                        let result = self
428                            .helper
429                            .map_request(request, &mut manager.active_requests.request(request_id));
430                        match result {
431                            Ok((
432                                DecodedRequest { request_id, operation, vmo, trace_flow_id },
433                                remainder,
434                            )) => {
435                                decoded_requests.push(Request {
436                                    request_id,
437                                    operation,
438                                    trace_flow_id,
439                                    vmo,
440                                });
441
442                                if decoded_requests.is_full() {
443                                    manager.submit_requests(&*decoded_requests);
444                                    decoded_requests.clear();
445                                }
446
447                                if let Some(r) = remainder {
448                                    request = r;
449                                } else {
450                                    break;
451                                }
452                            }
453                            Err(status) => {
454                                manager.complete_unsubmitted_request(request_id, status);
455                                break;
456                            }
457                        }
458                    }
459
460                    if simulate_fua {
461                        self.post_flush(request_id, &mut decoded_requests);
462                    }
463                }
464                Err(None) => {}
465                Err(Some(response)) => self.send_response(response),
466            }
467        }
468
469        if !decoded_requests.is_empty() {
470            manager.submit_requests(&decoded_requests);
471        }
472    }
473
474    fn send_response(&self, response: BlockFifoResponse) {
475        let mut queue = self.queue.lock();
476        if queue.responses.is_empty() {
477            match self.fifo.write_one(&response) {
478                Ok(()) => {
479                    return;
480                }
481                Err(_) => {
482                    // Wake the FIFO loop up so that we can send the response later.
483                    let _ = self.fifo.signal(zx::Signals::empty(), FIFO_WAKE_SIGNAL);
484                }
485            }
486        }
487        queue.responses.push_back(response);
488    }
489
490    /// Asynchronously request to terminate the Session.  The session's thread will eventually stop
491    /// running.
492    pub fn terminate_async(&self) {
493        let _ = self.fifo.signal(zx::Signals::empty(), SHUTDOWN_SIGNAL);
494        self.abort_handle.abort();
495    }
496}
497
498impl<I: Interface + ?Sized> Drop for Session<I> {
499    fn drop(&mut self) {
500        let mut inner = self.helper.session_manager().inner.lock();
501        let notify = {
502            inner.open_sessions.remove(&(self as *const _ as usize));
503            inner.open_sessions.is_empty()
504        };
505        if notify {
506            self.helper.session_manager().no_open_sessions_condvar.notify_all();
507        }
508    }
509}
510
511impl<I: Interface + ?Sized> Drop for SessionManager<I> {
512    fn drop(&mut self) {
513        self.terminate();
514    }
515}
516
517impl<I: Interface<Orchestrator = SessionManager<I>>> IntoOrchestrator for Arc<SessionManager<I>> {
518    type SM = SessionManager<I>;
519
520    fn into_orchestrator(self) -> Arc<I::Orchestrator> {
521        self
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528    use crate::BlockInfo;
529    use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse};
530    use fidl::endpoints::create_proxy_and_stream;
531    use zx::HandleBased;
532    use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
533
534    struct MockInterface {
535        request_sender: std::sync::mpsc::Sender<Request>,
536    }
537
538    impl Interface for MockInterface {
539        type Orchestrator = SessionManager<Self>;
540
541        fn get_info(&self) -> Cow<'_, DeviceInfo> {
542            Cow::Owned(DeviceInfo::Block(BlockInfo { block_count: 1024, ..Default::default() }))
543        }
544
545        fn spawn_session(&self, session: Arc<Session<Self>>) {
546            std::thread::spawn(move || {
547                session.run();
548            });
549        }
550
551        fn on_requests(&self, requests: &[Request]) {
552            for request in requests {
553                self.request_sender.send(request.clone()).unwrap();
554            }
555        }
556    }
557
558    #[fasync::run_singlethreaded(test)]
559    async fn test_basic_request() {
560        let (tx, rx) = std::sync::mpsc::channel();
561        let interface = Arc::new(MockInterface { request_sender: tx });
562        let session_manager = Arc::new(SessionManager::new(interface.clone()));
563
564        let sm_clone = session_manager.clone();
565        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
566        let _server_task = fasync::Task::spawn(async move {
567            let server = crate::BlockServer::new(512, sm_clone);
568            server.handle_requests(stream).await.unwrap();
569        })
570        .detach();
571
572        let (session_proxy, session_server_end) =
573            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
574        proxy.open_session(session_server_end).unwrap();
575
576        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
577        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
578
579        let vmo = zx::Vmo::create(8192).unwrap();
580        let vmo_id = session_proxy
581            .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
582            .await
583            .unwrap()
584            .unwrap();
585
586        let req = BlockFifoRequest {
587            command: BlockFifoCommand {
588                opcode: fblock::BlockOpcode::Read.into_primitive(),
589                ..Default::default()
590            },
591            reqid: 123,
592            group: 0,
593            vmoid: vmo_id.id,
594            length: 1,
595            vmo_offset: 0,
596            dev_offset: 0,
597            trace_flow_id: 0,
598            ..Default::default()
599        };
600        fifo.write(&[req]).unwrap();
601
602        let r = rx.recv().unwrap();
603        assert_eq!(r.request_id.0, 0);
604        assert!(matches!(r.operation, Operation::Read { .. }));
605
606        session_manager.complete_request(r.request_id, zx::Status::OK);
607
608        let signals =
609            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
610        assert!(signals.contains(zx::Signals::FIFO_READABLE));
611
612        let mut resp = [BlockFifoResponse::default()];
613        fifo.read(&mut resp).unwrap();
614        assert_eq!(resp[0].reqid, 123);
615        assert_eq!(resp[0].status, zx::sys::ZX_OK);
616
617        std::mem::drop(proxy);
618    }
619    #[fasync::run_singlethreaded(test)]
620    async fn test_write_request() {
621        let (tx, rx) = std::sync::mpsc::channel();
622        let interface = Arc::new(MockInterface { request_sender: tx });
623        let session_manager = Arc::new(SessionManager::new(interface.clone()));
624
625        let sm_clone = session_manager.clone();
626        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
627        let _server_task = fasync::Task::spawn(async move {
628            let server = crate::BlockServer::new(512, sm_clone);
629            server.handle_requests(stream).await.unwrap();
630        })
631        .detach();
632
633        let (session_proxy, session_server_end) =
634            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
635        proxy.open_session(session_server_end).unwrap();
636
637        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
638        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
639
640        let vmo = zx::Vmo::create(8192).unwrap();
641        let vmo_id = session_proxy
642            .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
643            .await
644            .unwrap()
645            .unwrap();
646
647        let req = BlockFifoRequest {
648            command: BlockFifoCommand {
649                opcode: fblock::BlockOpcode::Write.into_primitive(),
650                ..Default::default()
651            },
652            reqid: 124,
653            group: 0,
654            vmoid: vmo_id.id,
655            length: 1,
656            vmo_offset: 0,
657            dev_offset: 0,
658            trace_flow_id: 0,
659            ..Default::default()
660        };
661        fifo.write(&[req]).unwrap();
662
663        let r = rx.recv().unwrap();
664        assert_eq!(r.request_id.0, 0);
665        assert!(matches!(r.operation, Operation::Write { .. }));
666
667        session_manager.complete_request(r.request_id, zx::Status::OK);
668
669        let signals =
670            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
671        assert!(signals.contains(zx::Signals::FIFO_READABLE));
672
673        let mut resp = [BlockFifoResponse::default()];
674        fifo.read(&mut resp).unwrap();
675        assert_eq!(resp[0].reqid, 124);
676        assert_eq!(resp[0].status, zx::sys::ZX_OK);
677    }
678
679    #[fasync::run_singlethreaded(test)]
680    async fn test_flush_request() {
681        let (tx, rx) = std::sync::mpsc::channel();
682        let interface = Arc::new(MockInterface { request_sender: tx });
683        let session_manager = Arc::new(SessionManager::new(interface.clone()));
684
685        let sm_clone = session_manager.clone();
686        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
687        let _server_task = fasync::Task::spawn(async move {
688            let server = crate::BlockServer::new(512, sm_clone);
689            server.handle_requests(stream).await.unwrap();
690        })
691        .detach();
692
693        let (session_proxy, session_server_end) =
694            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
695        proxy.open_session(session_server_end).unwrap();
696
697        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
698        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
699
700        let req = BlockFifoRequest {
701            command: BlockFifoCommand {
702                opcode: fblock::BlockOpcode::Flush.into_primitive(),
703                ..Default::default()
704            },
705            reqid: 125,
706            group: 0,
707            vmoid: fblock::VMOID_INVALID,
708            length: 0,
709            vmo_offset: 0,
710            dev_offset: 0,
711            trace_flow_id: 0,
712            ..Default::default()
713        };
714        fifo.write(&[req]).unwrap();
715
716        let r = rx.recv().unwrap();
717        assert_eq!(r.request_id.0, 0);
718        assert!(matches!(r.operation, Operation::Flush { .. }));
719
720        session_manager.complete_request(r.request_id, zx::Status::OK);
721
722        let signals =
723            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
724        assert!(signals.contains(zx::Signals::FIFO_READABLE));
725
726        let mut resp = [BlockFifoResponse::default()];
727        fifo.read(&mut resp).unwrap();
728        assert_eq!(resp[0].reqid, 125);
729        assert_eq!(resp[0].status, zx::sys::ZX_OK);
730    }
731
732    #[fasync::run_singlethreaded(test)]
733    async fn test_trim_request() {
734        let (tx, rx) = std::sync::mpsc::channel();
735        let interface = Arc::new(MockInterface { request_sender: tx });
736        let session_manager = Arc::new(SessionManager::new(interface.clone()));
737
738        let sm_clone = session_manager.clone();
739        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
740        let _server_task = fasync::Task::spawn(async move {
741            let server = crate::BlockServer::new(512, sm_clone);
742            server.handle_requests(stream).await.unwrap();
743        })
744        .detach();
745
746        let (session_proxy, session_server_end) =
747            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
748        proxy.open_session(session_server_end).unwrap();
749
750        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
751        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
752
753        let req = BlockFifoRequest {
754            command: BlockFifoCommand {
755                opcode: fblock::BlockOpcode::Trim.into_primitive(),
756                ..Default::default()
757            },
758            reqid: 126,
759            group: 0,
760            vmoid: fblock::VMOID_INVALID,
761            length: 1,
762            vmo_offset: 0,
763            dev_offset: 0,
764            trace_flow_id: 0,
765            ..Default::default()
766        };
767        fifo.write(&[req]).unwrap();
768
769        let r = rx.recv().unwrap();
770        assert_eq!(r.request_id.0, 0);
771        assert!(matches!(r.operation, Operation::Trim { .. }));
772
773        session_manager.complete_request(r.request_id, zx::Status::OK);
774
775        let signals =
776            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
777        assert!(signals.contains(zx::Signals::FIFO_READABLE));
778
779        let mut resp = [BlockFifoResponse::default()];
780        fifo.read(&mut resp).unwrap();
781        assert_eq!(resp[0].reqid, 126);
782        assert_eq!(resp[0].status, zx::sys::ZX_OK);
783    }
784
785    #[fasync::run_singlethreaded(test)]
786    async fn test_close_vmo() {
787        let (tx, rx) = std::sync::mpsc::channel();
788        let interface = Arc::new(MockInterface { request_sender: tx });
789        let session_manager = Arc::new(SessionManager::new(interface.clone()));
790
791        let sm_clone = session_manager.clone();
792        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
793        let _server_task = fasync::Task::spawn(async move {
794            let server = crate::BlockServer::new(512, sm_clone);
795            server.handle_requests(stream).await.unwrap();
796        })
797        .detach();
798
799        let (session_proxy, session_server_end) =
800            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
801        proxy.open_session(session_server_end).unwrap();
802
803        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
804        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
805
806        let vmo = zx::Vmo::create(8192).unwrap();
807        let vmo_id = session_proxy
808            .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
809            .await
810            .unwrap()
811            .unwrap();
812
813        let req = BlockFifoRequest {
814            command: BlockFifoCommand {
815                opcode: fblock::BlockOpcode::CloseVmo.into_primitive(),
816                ..Default::default()
817            },
818            reqid: 127,
819            group: 0,
820            vmoid: vmo_id.id,
821            length: 0,
822            vmo_offset: 0,
823            dev_offset: 0,
824            trace_flow_id: 0,
825            ..Default::default()
826        };
827        fifo.write(&[req]).unwrap();
828
829        let signals =
830            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
831        assert!(signals.contains(zx::Signals::FIFO_READABLE));
832
833        let mut resp = [BlockFifoResponse::default()];
834        fifo.read(&mut resp).unwrap();
835        assert_eq!(resp[0].reqid, 127);
836        assert_eq!(resp[0].status, zx::sys::ZX_OK);
837
838        // CloseVmo is handled automatically. Interface should not receive anything.
839        assert!(rx.try_recv().is_err());
840    }
841
842    #[fasync::run_singlethreaded(test)]
843    async fn test_error() {
844        let (tx, rx) = std::sync::mpsc::channel();
845        let interface = Arc::new(MockInterface { request_sender: tx });
846        let session_manager = Arc::new(SessionManager::new(interface.clone()));
847
848        let sm_clone = session_manager.clone();
849        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
850        let _server_task = fasync::Task::spawn(async move {
851            let server = crate::BlockServer::new(512, sm_clone);
852            server.handle_requests(stream).await.unwrap();
853        })
854        .detach();
855
856        let (session_proxy, session_server_end) =
857            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
858        proxy.open_session(session_server_end).unwrap();
859
860        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
861        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
862
863        let req = BlockFifoRequest {
864            command: BlockFifoCommand {
865                opcode: fblock::BlockOpcode::Flush.into_primitive(),
866                ..Default::default()
867            },
868            reqid: 128,
869            group: 0,
870            vmoid: fblock::VMOID_INVALID,
871            length: 0,
872            vmo_offset: 0,
873            dev_offset: 0,
874            trace_flow_id: 0,
875            ..Default::default()
876        };
877        fifo.write(&[req]).unwrap();
878
879        let r = rx.recv().unwrap();
880        session_manager.complete_request(r.request_id, zx::Status::IO);
881
882        let signals =
883            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
884        assert!(signals.contains(zx::Signals::FIFO_READABLE));
885
886        let mut resp = [BlockFifoResponse::default()];
887        fifo.read(&mut resp).unwrap();
888        assert_eq!(resp[0].reqid, 128);
889        assert_eq!(resp[0].status, zx::sys::ZX_ERR_IO);
890    }
891}