Skip to main content

block_server/
callback_interface.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::{
6    ActiveRequests, DecodedRequest, DeviceInfo, IntoOrchestrator, OffsetMap, Operation, RequestId,
7    SessionHelper, TraceFlowId, WriteFlags,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse};
11use fidl_fuchsia_storage_block as fblock;
12use fuchsia_sync::{Condvar, Mutex};
13use futures::TryStreamExt as _;
14use futures::stream::{AbortHandle, Abortable};
15use std::borrow::{Borrow, Cow};
16use std::collections::{HashMap, VecDeque};
17use std::mem::MaybeUninit;
18use std::sync::{Arc, Weak};
19
20/// An in-flight request.
21#[derive(Clone, Debug)]
22pub struct Request {
23    /// The ID that a request is associated with, for later completion in
24    /// [`SessionManager::complete_request`].  Note that this is not necessarily a unique
25    /// identifier, and multiple Requests may have the same request_id (e.g. due to request
26    /// splitting).  The library internally reference-counts requests which use this ID.
27    pub request_id: RequestId,
28    pub operation: Operation,
29    pub trace_flow_id: TraceFlowId,
30    /// `vmo` is always Some for Operation::Read or Operation::Write, and None otherwise.
31    pub vmo: Option<Arc<zx::Vmo>>,
32}
33
34pub trait Interface: Send + Sync + Unpin + 'static {
35    type Orchestrator: Borrow<SessionManager<Self>> + Send + Sync;
36
37    /// Called to get block/partition information.
38    fn get_info(&self) -> Cow<'_, DeviceInfo>;
39
40    /// Start running a new session.  The interface must ensure that [`Session::run`] is called
41    /// on a different thread.
42    fn spawn_session(&self, session: Arc<Session<Self>>);
43
44    /// Starts a batch of requests.  The implementation may block if there are too many in-flight
45    /// requests, providing pushback.  The interface is responsible for eventually calling
46    /// [`SessionManager::complete_request`] for each request (even during server shutdown).
47    fn on_requests(&self, requests: &[Request]);
48}
49
50struct SessionManagerInner<I: Interface + ?Sized> {
51    open_sessions: HashMap<usize, Weak<Session<I>>>,
52}
53
54// The signals used in `SessionManagerInner::event`.
55
56/// Signalled on `SessionManagerInner::event` to wake up the FIFO loop.
57const FIFO_WAKE_SIGNAL: zx::Signals = zx::Signals::USER_0;
58
59/// Signalled on `SessionManagerInner::event` to terminate the FIFO loop.
60const SHUTDOWN_SIGNAL: zx::Signals = zx::Signals::USER_1;
61
62pub struct SessionManager<I: Interface + ?Sized> {
63    interface: Arc<I>,
64    // These represent active *client* requests, which correspond to one or more in-flight requests.
65    active_requests: ActiveRequests<Arc<Session<I>>>,
66    inflight_requests: Mutex<usize>,
67    no_inflight_requests_condvar: Condvar,
68    inner: Mutex<SessionManagerInner<I>>,
69    no_open_sessions_condvar: Condvar,
70}
71
72impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
73    const SUPPORTS_DECOMPRESSION: bool = false;
74
75    type Orchestrator = I::Orchestrator;
76    type Session = Arc<Session<I>>;
77
78    async fn on_attach_vmo(
79        _orchestrator: Arc<Self::Orchestrator>,
80        _vmo: &Arc<zx::Vmo>,
81    ) -> Result<(), zx::Status> {
82        Ok(())
83    }
84
85    async fn open_session(
86        orchestrator: Arc<Self::Orchestrator>,
87        mut stream: fblock::SessionRequestStream,
88        offset_map: OffsetMap,
89        block_size: u32,
90    ) -> Result<(), Error> {
91        let (helper, fifo) = SessionHelper::new(orchestrator.clone(), offset_map, block_size)?;
92        let (abort_handle, registration) = AbortHandle::new_pair();
93        let session = Arc::new(Session { helper, fifo, queue: Mutex::default(), abort_handle });
94        let sm = orchestrator.as_ref().borrow();
95        sm.inner
96            .lock()
97            .open_sessions
98            .insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
99
100        sm.interface.spawn_session(session.clone());
101
102        let result = Abortable::new(
103            async {
104                while let Some(request) = stream.try_next().await? {
105                    session.helper.handle_request(request).await?;
106                }
107                Ok(())
108            },
109            registration,
110        )
111        .await
112        .unwrap_or_else(|e| Err(e.into()));
113
114        let _ = session.fifo.signal(zx::Signals::empty(), SHUTDOWN_SIGNAL);
115
116        result
117    }
118
119    fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
120        self.interface.get_info()
121    }
122
123    fn active_requests(&self) -> &ActiveRequests<Arc<Session<I>>> {
124        &self.active_requests
125    }
126}
127
128impl<I: Interface + ?Sized> SessionManager<I> {
129    pub fn new(interface: Arc<I>) -> Self {
130        Self {
131            interface,
132            active_requests: ActiveRequests::default(),
133            inflight_requests: Mutex::new(0),
134            no_inflight_requests_condvar: Condvar::new(),
135            inner: Mutex::new(SessionManagerInner { open_sessions: HashMap::new() }),
136            no_open_sessions_condvar: Condvar::new(),
137        }
138    }
139
140    /// Reports the given task as complete with a given status.
141    pub fn complete_request(&self, request_id: RequestId, status: zx::Status) {
142        let notify = {
143            let mut inflight_requests = self.inflight_requests.lock();
144            *inflight_requests -= 1;
145            *inflight_requests == 0
146        };
147        self.complete_unsubmitted_request(request_id, status);
148        if notify {
149            self.no_inflight_requests_condvar.notify_all();
150        }
151    }
152
153    fn submit_requests(&self, requests: &[Request]) {
154        *self.inflight_requests.lock() += requests.len();
155        self.interface.on_requests(requests);
156    }
157
158    /// Waits for there to be no requests in-flight.
159    ///
160    /// NOTE: To void TOCTOUs, this must be called on the same thread which calls
161    /// [`Self::submit_requests`].
162    fn wait_for_no_inflight_requests(&self) {
163        let mut guard = self.inflight_requests.lock();
164        self.no_inflight_requests_condvar.wait_while(&mut guard, |count| *count > 0);
165    }
166
167    /// Called instead of `[Self::complete_request]` when a request is completed before it was
168    /// actually submitted.
169    fn complete_unsubmitted_request(&self, request_id: RequestId, status: zx::Status) {
170        if let Some((session, response)) =
171            self.active_requests.complete_and_take_response(request_id, status)
172        {
173            session.send_response(response);
174        }
175    }
176
177    /// Terminates the session manager.  Blocks until all sessions have terminated.
178    pub fn terminate(&self) {
179        {
180            // We must drop references to sessions whilst we're not holding the lock for
181            // `open_sessions` because `Session::drop` needs to take that same lock.
182            #[allow(clippy::collection_is_never_read)]
183            let mut terminated_sessions = Vec::new();
184            for (_, session) in &self.inner.lock().open_sessions {
185                if let Some(session) = session.upgrade() {
186                    session.terminate_async();
187                    terminated_sessions.push(session);
188                }
189            }
190        }
191        let mut guard = self.inner.lock();
192        self.no_open_sessions_condvar.wait_while(&mut guard, |s| !s.open_sessions.is_empty());
193    }
194}
195
196pub struct Session<I: Interface + ?Sized> {
197    helper: SessionHelper<SessionManager<I>>,
198    fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
199    queue: Mutex<SessionQueue>,
200    abort_handle: AbortHandle,
201}
202
203#[derive(Default)]
204struct SessionQueue {
205    responses: VecDeque<BlockFifoResponse>,
206}
207
208pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
209
210struct DecodedRequests {
211    requests: [MaybeUninit<Request>; MAX_REQUESTS],
212    count: usize,
213}
214
215impl Default for DecodedRequests {
216    fn default() -> Self {
217        Self { requests: unsafe { MaybeUninit::uninit().assume_init() }, count: 0 }
218    }
219}
220
221impl DecodedRequests {
222    fn push(&mut self, request: Request) {
223        assert!(self.count < MAX_REQUESTS);
224        // To ensure drop runs, we must initialize each element at most once.
225        // As long as we only go via this function, that's satisfied.
226        self.requests[self.count].write(request);
227        self.count += 1;
228    }
229
230    fn is_empty(&self) -> bool {
231        self.count == 0
232    }
233
234    fn is_full(&self) -> bool {
235        self.count == MAX_REQUESTS
236    }
237
238    fn clear(&mut self) {
239        for i in 0..self.count {
240            // SAFETY: We initialized `count` elements via [`Self::push`].
241            unsafe { self.requests[i].assume_init_drop() };
242        }
243        self.count = 0;
244    }
245}
246
247impl Drop for DecodedRequests {
248    fn drop(&mut self) {
249        self.clear();
250    }
251}
252
253impl std::ops::Deref for DecodedRequests {
254    type Target = [Request];
255
256    fn deref(&self) -> &Self::Target {
257        // SAFETY: We wrote the request in [`Self::push`].
258        unsafe { std::slice::from_raw_parts(self.requests[0].as_ptr(), self.count) }
259    }
260}
261
262impl std::ops::DerefMut for DecodedRequests {
263    fn deref_mut(&mut self) -> &mut Self::Target {
264        // SAFETY: We wrote the request in [`Self::push`].
265        unsafe { std::slice::from_raw_parts_mut(self.requests[0].as_mut_ptr(), self.count) }
266    }
267}
268
269impl<I: Interface + ?Sized> Session<I> {
270    /// Begins processing the session's FIFO.  Blocks until completion (either because of
271    /// termination, or due to unrecoverable error).
272    pub fn run(self: &Arc<Self>) {
273        self.fifo_loop();
274        self.abort_handle.abort();
275        // NB: We cannot call [`drop_active_requests`] here, because requests which have already
276        // been submitted are no longer in control by this thread, and will be later completed by
277        // another thread.  If we dropped them here, then they would be completed twice.
278        self.helper.close_active_groups(|s| Arc::ptr_eq(s, self));
279    }
280
281    fn fifo_loop(self: &Arc<Self>) {
282        let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
283
284        loop {
285            // Send queued responses.
286            let is_queue_empty = {
287                let mut queue = self.queue.lock();
288                while !queue.responses.is_empty() {
289                    let (front, _) = queue.responses.as_slices();
290                    match self.fifo.write(front) {
291                        Ok(count) => {
292                            let full = count < front.len();
293                            queue.responses.drain(..count);
294                            if full {
295                                break;
296                            }
297                        }
298                        Err(zx::Status::SHOULD_WAIT) => break,
299                        Err(_) => return,
300                    }
301                }
302                queue.responses.is_empty()
303            };
304
305            // Process pending reads.
306            match self.fifo.read_uninit(&mut requests) {
307                Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
308                Err(zx::Status::SHOULD_WAIT) => {
309                    let mut signals =
310                        zx::Signals::OBJECT_READABLE | SHUTDOWN_SIGNAL | FIFO_WAKE_SIGNAL;
311                    if !is_queue_empty {
312                        signals |= zx::Signals::OBJECT_WRITABLE;
313                    }
314                    let Ok(signals) =
315                        self.fifo.wait_one(signals, zx::MonotonicInstant::INFINITE).to_result()
316                    else {
317                        return;
318                    };
319                    if signals.contains(SHUTDOWN_SIGNAL) {
320                        return;
321                    }
322                    // Clear FIFO_WAKE_SIGNAL if it's set.
323                    if signals.contains(FIFO_WAKE_SIGNAL) {
324                        let _ = self.fifo.signal(FIFO_WAKE_SIGNAL, zx::Signals::empty());
325                    }
326                }
327                Err(_) => return,
328            }
329        }
330    }
331
332    /// Synchronously performs a device flush.
333    fn pre_flush(self: &Arc<Self>, request_id: RequestId) -> Result<(), zx::Status> {
334        let trace_flow_id = {
335            let mut request = self.helper.session_manager().active_requests.request(request_id);
336            if let Some(id) = request.trace_flow_id {
337                fuchsia_trace::async_instant!(
338                    fuchsia_trace::Id::from(id.get()),
339                    c"storage",
340                    c"block_server::SimulatedBarrier",
341                    "request_id" => request_id.0
342                );
343            }
344            request.count += 1;
345            request.trace_flow_id
346        };
347        self.helper.session_manager().submit_requests(&[Request {
348            request_id,
349            operation: Operation::Flush,
350            trace_flow_id,
351            vmo: None,
352        }]);
353        self.helper.session_manager().wait_for_no_inflight_requests();
354        let status = self.helper.session_manager().active_requests.request(request_id).status;
355        match status {
356            zx::Status::OK => Ok(()),
357            status => {
358                // Respond for the unsubmitted request too.
359                self.helper.session_manager().complete_unsubmitted_request(request_id, status);
360                Err(status)
361            }
362        }
363    }
364
365    /// Synchronously completes `decoded_requests`, and inserts a post-flush into `decoded_requests`
366    /// to be submitted later.
367    fn post_flush(self: &Arc<Self>, request_id: RequestId, decoded_requests: &mut DecodedRequests) {
368        if !decoded_requests.is_empty() {
369            self.helper.session_manager().submit_requests(decoded_requests);
370            decoded_requests.clear();
371        }
372        self.helper.session_manager().wait_for_no_inflight_requests();
373        let request = self.helper.session_manager().active_requests.request(request_id);
374        match request.status {
375            zx::Status::OK => decoded_requests.push(Request {
376                request_id,
377                operation: Operation::Flush,
378                trace_flow_id: request.trace_flow_id,
379                vmo: None,
380            }),
381            status => {
382                drop(request);
383                self.helper.session_manager().complete_unsubmitted_request(request_id, status)
384            }
385        }
386    }
387
388    fn handle_requests<'a>(
389        self: &Arc<Self>,
390        requests: impl Iterator<Item = &'a mut BlockFifoRequest>,
391    ) {
392        let manager = &self.helper.session_manager();
393        let mut decoded_requests = DecodedRequests::default();
394
395        for request in requests {
396            match self.helper.decode_fifo_request(self.clone(), request) {
397                Ok(DecodedRequest { operation: Operation::CloseVmo, request_id, .. }) => {
398                    manager.complete_unsubmitted_request(request_id, zx::Status::OK);
399                }
400                Ok(mut request) => {
401                    let request_id = request.request_id;
402
403                    // Strip the PRE_BARRIER flag if we don't support it, and simulate the barrier
404                    // with a pre-flush.
405                    if !manager
406                        .interface
407                        .get_info()
408                        .device_flags()
409                        .contains(fblock::DeviceFlag::BARRIER_SUPPORT)
410                        && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
411                        && self.pre_flush(request_id).is_err()
412                    {
413                        continue;
414                    }
415                    // Strip the FORCE_ACCESS flag if we don't support it, and simulate the FUA with
416                    // a post-flush.
417                    let simulate_fua = !manager
418                        .interface
419                        .get_info()
420                        .device_flags()
421                        .contains(fblock::DeviceFlag::FUA_SUPPORT)
422                        && request.operation.take_write_flag(WriteFlags::FORCE_ACCESS);
423
424                    if simulate_fua {
425                        // Account for the additional request we need at the end.
426                        manager.active_requests.request(request_id).count += 1;
427                    }
428
429                    loop {
430                        let result = self
431                            .helper
432                            .map_request(request, &mut manager.active_requests.request(request_id));
433                        match result {
434                            Ok((
435                                DecodedRequest { request_id, operation, vmo, trace_flow_id },
436                                remainder,
437                            )) => {
438                                decoded_requests.push(Request {
439                                    request_id,
440                                    operation,
441                                    trace_flow_id,
442                                    vmo,
443                                });
444
445                                if decoded_requests.is_full() {
446                                    manager.submit_requests(&*decoded_requests);
447                                    decoded_requests.clear();
448                                }
449
450                                if let Some(r) = remainder {
451                                    request = r;
452                                } else {
453                                    break;
454                                }
455                            }
456                            Err(status) => {
457                                manager.complete_unsubmitted_request(request_id, status);
458                                break;
459                            }
460                        }
461                    }
462
463                    if simulate_fua {
464                        self.post_flush(request_id, &mut decoded_requests);
465                    }
466                }
467                Err(None) => {}
468                Err(Some(response)) => self.send_response(response),
469            }
470        }
471
472        if !decoded_requests.is_empty() {
473            manager.submit_requests(&decoded_requests);
474        }
475    }
476
477    fn send_response(&self, response: BlockFifoResponse) {
478        let mut queue = self.queue.lock();
479        if queue.responses.is_empty() {
480            match self.fifo.write_one(&response) {
481                Ok(()) => {
482                    return;
483                }
484                Err(_) => {
485                    // Wake the FIFO loop up so that we can send the response later.
486                    let _ = self.fifo.signal(zx::Signals::empty(), FIFO_WAKE_SIGNAL);
487                }
488            }
489        }
490        queue.responses.push_back(response);
491    }
492
493    /// Asynchronously request to terminate the Session.  The session's thread will eventually stop
494    /// running.
495    pub fn terminate_async(&self) {
496        let _ = self.fifo.signal(zx::Signals::empty(), SHUTDOWN_SIGNAL);
497        self.abort_handle.abort();
498    }
499}
500
501impl<I: Interface + ?Sized> Drop for Session<I> {
502    fn drop(&mut self) {
503        let mut inner = self.helper.session_manager().inner.lock();
504        let notify = {
505            inner.open_sessions.remove(&(self as *const _ as usize));
506            inner.open_sessions.is_empty()
507        };
508        if notify {
509            self.helper.session_manager().no_open_sessions_condvar.notify_all();
510        }
511    }
512}
513
514impl<I: Interface + ?Sized> Drop for SessionManager<I> {
515    fn drop(&mut self) {
516        self.terminate();
517    }
518}
519
520impl<I: Interface<Orchestrator = SessionManager<I>>> IntoOrchestrator for Arc<SessionManager<I>> {
521    type SM = SessionManager<I>;
522
523    fn into_orchestrator(self) -> Arc<I::Orchestrator> {
524        self
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use crate::BlockInfo;
532    use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse};
533    use fidl::endpoints::create_proxy_and_stream;
534    use zx::HandleBased;
535    use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
536
537    struct MockInterface {
538        request_sender: std::sync::mpsc::Sender<Request>,
539    }
540
541    impl Interface for MockInterface {
542        type Orchestrator = SessionManager<Self>;
543
544        fn get_info(&self) -> Cow<'_, DeviceInfo> {
545            Cow::Owned(DeviceInfo::Block(BlockInfo { block_count: 1024, ..Default::default() }))
546        }
547
548        fn spawn_session(&self, session: Arc<Session<Self>>) {
549            std::thread::spawn(move || {
550                session.run();
551            });
552        }
553
554        fn on_requests(&self, requests: &[Request]) {
555            for request in requests {
556                self.request_sender.send(request.clone()).unwrap();
557            }
558        }
559    }
560
561    #[fasync::run_singlethreaded(test)]
562    async fn test_basic_request() {
563        let (tx, rx) = std::sync::mpsc::channel();
564        let interface = Arc::new(MockInterface { request_sender: tx });
565        let session_manager = Arc::new(SessionManager::new(interface.clone()));
566
567        let sm_clone = session_manager.clone();
568        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
569        let _server_task = fasync::Task::spawn(async move {
570            let server = crate::BlockServer::new(512, sm_clone);
571            server.handle_requests(stream).await.unwrap();
572        })
573        .detach();
574
575        let (session_proxy, session_server_end) =
576            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
577        proxy.open_session(session_server_end).unwrap();
578
579        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
580        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
581
582        let vmo = zx::Vmo::create(8192).unwrap();
583        let vmo_id = session_proxy
584            .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
585            .await
586            .unwrap()
587            .unwrap();
588
589        let req = BlockFifoRequest {
590            command: BlockFifoCommand {
591                opcode: fblock::BlockOpcode::Read.into_primitive(),
592                ..Default::default()
593            },
594            reqid: 123,
595            group: 0,
596            vmoid: vmo_id.id,
597            length: 1,
598            vmo_offset: 0,
599            dev_offset: 0,
600            trace_flow_id: 0,
601            ..Default::default()
602        };
603        fifo.write(&[req]).unwrap();
604
605        let r = rx.recv().unwrap();
606        assert_eq!(r.request_id.0, 0);
607        assert!(matches!(r.operation, Operation::Read { .. }));
608
609        session_manager.complete_request(r.request_id, zx::Status::OK);
610
611        let signals =
612            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
613        assert!(signals.contains(zx::Signals::FIFO_READABLE));
614
615        let mut resp = [BlockFifoResponse::default()];
616        fifo.read(&mut resp).unwrap();
617        assert_eq!(resp[0].reqid, 123);
618        assert_eq!(resp[0].status, zx::sys::ZX_OK);
619
620        std::mem::drop(proxy);
621    }
622    #[fasync::run_singlethreaded(test)]
623    async fn test_write_request() {
624        let (tx, rx) = std::sync::mpsc::channel();
625        let interface = Arc::new(MockInterface { request_sender: tx });
626        let session_manager = Arc::new(SessionManager::new(interface.clone()));
627
628        let sm_clone = session_manager.clone();
629        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
630        let _server_task = fasync::Task::spawn(async move {
631            let server = crate::BlockServer::new(512, sm_clone);
632            server.handle_requests(stream).await.unwrap();
633        })
634        .detach();
635
636        let (session_proxy, session_server_end) =
637            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
638        proxy.open_session(session_server_end).unwrap();
639
640        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
641        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
642
643        let vmo = zx::Vmo::create(8192).unwrap();
644        let vmo_id = session_proxy
645            .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
646            .await
647            .unwrap()
648            .unwrap();
649
650        let req = BlockFifoRequest {
651            command: BlockFifoCommand {
652                opcode: fblock::BlockOpcode::Write.into_primitive(),
653                ..Default::default()
654            },
655            reqid: 124,
656            group: 0,
657            vmoid: vmo_id.id,
658            length: 1,
659            vmo_offset: 0,
660            dev_offset: 0,
661            trace_flow_id: 0,
662            ..Default::default()
663        };
664        fifo.write(&[req]).unwrap();
665
666        let r = rx.recv().unwrap();
667        assert_eq!(r.request_id.0, 0);
668        assert!(matches!(r.operation, Operation::Write { .. }));
669
670        session_manager.complete_request(r.request_id, zx::Status::OK);
671
672        let signals =
673            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
674        assert!(signals.contains(zx::Signals::FIFO_READABLE));
675
676        let mut resp = [BlockFifoResponse::default()];
677        fifo.read(&mut resp).unwrap();
678        assert_eq!(resp[0].reqid, 124);
679        assert_eq!(resp[0].status, zx::sys::ZX_OK);
680    }
681
682    #[fasync::run_singlethreaded(test)]
683    async fn test_flush_request() {
684        let (tx, rx) = std::sync::mpsc::channel();
685        let interface = Arc::new(MockInterface { request_sender: tx });
686        let session_manager = Arc::new(SessionManager::new(interface.clone()));
687
688        let sm_clone = session_manager.clone();
689        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
690        let _server_task = fasync::Task::spawn(async move {
691            let server = crate::BlockServer::new(512, sm_clone);
692            server.handle_requests(stream).await.unwrap();
693        })
694        .detach();
695
696        let (session_proxy, session_server_end) =
697            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
698        proxy.open_session(session_server_end).unwrap();
699
700        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
701        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
702
703        let req = BlockFifoRequest {
704            command: BlockFifoCommand {
705                opcode: fblock::BlockOpcode::Flush.into_primitive(),
706                ..Default::default()
707            },
708            reqid: 125,
709            group: 0,
710            vmoid: fblock::VMOID_INVALID,
711            length: 0,
712            vmo_offset: 0,
713            dev_offset: 0,
714            trace_flow_id: 0,
715            ..Default::default()
716        };
717        fifo.write(&[req]).unwrap();
718
719        let r = rx.recv().unwrap();
720        assert_eq!(r.request_id.0, 0);
721        assert!(matches!(r.operation, Operation::Flush { .. }));
722
723        session_manager.complete_request(r.request_id, zx::Status::OK);
724
725        let signals =
726            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
727        assert!(signals.contains(zx::Signals::FIFO_READABLE));
728
729        let mut resp = [BlockFifoResponse::default()];
730        fifo.read(&mut resp).unwrap();
731        assert_eq!(resp[0].reqid, 125);
732        assert_eq!(resp[0].status, zx::sys::ZX_OK);
733    }
734
735    #[fasync::run_singlethreaded(test)]
736    async fn test_trim_request() {
737        let (tx, rx) = std::sync::mpsc::channel();
738        let interface = Arc::new(MockInterface { request_sender: tx });
739        let session_manager = Arc::new(SessionManager::new(interface.clone()));
740
741        let sm_clone = session_manager.clone();
742        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
743        let _server_task = fasync::Task::spawn(async move {
744            let server = crate::BlockServer::new(512, sm_clone);
745            server.handle_requests(stream).await.unwrap();
746        })
747        .detach();
748
749        let (session_proxy, session_server_end) =
750            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
751        proxy.open_session(session_server_end).unwrap();
752
753        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
754        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
755
756        let req = BlockFifoRequest {
757            command: BlockFifoCommand {
758                opcode: fblock::BlockOpcode::Trim.into_primitive(),
759                ..Default::default()
760            },
761            reqid: 126,
762            group: 0,
763            vmoid: fblock::VMOID_INVALID,
764            length: 1,
765            vmo_offset: 0,
766            dev_offset: 0,
767            trace_flow_id: 0,
768            ..Default::default()
769        };
770        fifo.write(&[req]).unwrap();
771
772        let r = rx.recv().unwrap();
773        assert_eq!(r.request_id.0, 0);
774        assert!(matches!(r.operation, Operation::Trim { .. }));
775
776        session_manager.complete_request(r.request_id, zx::Status::OK);
777
778        let signals =
779            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
780        assert!(signals.contains(zx::Signals::FIFO_READABLE));
781
782        let mut resp = [BlockFifoResponse::default()];
783        fifo.read(&mut resp).unwrap();
784        assert_eq!(resp[0].reqid, 126);
785        assert_eq!(resp[0].status, zx::sys::ZX_OK);
786    }
787
788    #[fasync::run_singlethreaded(test)]
789    async fn test_close_vmo() {
790        let (tx, rx) = std::sync::mpsc::channel();
791        let interface = Arc::new(MockInterface { request_sender: tx });
792        let session_manager = Arc::new(SessionManager::new(interface.clone()));
793
794        let sm_clone = session_manager.clone();
795        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
796        let _server_task = fasync::Task::spawn(async move {
797            let server = crate::BlockServer::new(512, sm_clone);
798            server.handle_requests(stream).await.unwrap();
799        })
800        .detach();
801
802        let (session_proxy, session_server_end) =
803            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
804        proxy.open_session(session_server_end).unwrap();
805
806        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
807        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
808
809        let vmo = zx::Vmo::create(8192).unwrap();
810        let vmo_id = session_proxy
811            .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
812            .await
813            .unwrap()
814            .unwrap();
815
816        let req = BlockFifoRequest {
817            command: BlockFifoCommand {
818                opcode: fblock::BlockOpcode::CloseVmo.into_primitive(),
819                ..Default::default()
820            },
821            reqid: 127,
822            group: 0,
823            vmoid: vmo_id.id,
824            length: 0,
825            vmo_offset: 0,
826            dev_offset: 0,
827            trace_flow_id: 0,
828            ..Default::default()
829        };
830        fifo.write(&[req]).unwrap();
831
832        let signals =
833            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
834        assert!(signals.contains(zx::Signals::FIFO_READABLE));
835
836        let mut resp = [BlockFifoResponse::default()];
837        fifo.read(&mut resp).unwrap();
838        assert_eq!(resp[0].reqid, 127);
839        assert_eq!(resp[0].status, zx::sys::ZX_OK);
840
841        // CloseVmo is handled automatically. Interface should not receive anything.
842        assert!(rx.try_recv().is_err());
843    }
844
845    #[fasync::run_singlethreaded(test)]
846    async fn test_error() {
847        let (tx, rx) = std::sync::mpsc::channel();
848        let interface = Arc::new(MockInterface { request_sender: tx });
849        let session_manager = Arc::new(SessionManager::new(interface.clone()));
850
851        let sm_clone = session_manager.clone();
852        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
853        let _server_task = fasync::Task::spawn(async move {
854            let server = crate::BlockServer::new(512, sm_clone);
855            server.handle_requests(stream).await.unwrap();
856        })
857        .detach();
858
859        let (session_proxy, session_server_end) =
860            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
861        proxy.open_session(session_server_end).unwrap();
862
863        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
864        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
865
866        let req = BlockFifoRequest {
867            command: BlockFifoCommand {
868                opcode: fblock::BlockOpcode::Flush.into_primitive(),
869                ..Default::default()
870            },
871            reqid: 128,
872            group: 0,
873            vmoid: fblock::VMOID_INVALID,
874            length: 0,
875            vmo_offset: 0,
876            dev_offset: 0,
877            trace_flow_id: 0,
878            ..Default::default()
879        };
880        fifo.write(&[req]).unwrap();
881
882        let r = rx.recv().unwrap();
883        session_manager.complete_request(r.request_id, zx::Status::IO);
884
885        let signals =
886            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
887        assert!(signals.contains(zx::Signals::FIFO_READABLE));
888
889        let mut resp = [BlockFifoResponse::default()];
890        fifo.read(&mut resp).unwrap();
891        assert_eq!(resp[0].reqid, 128);
892        assert_eq!(resp[0].status, zx::sys::ZX_ERR_IO);
893    }
894
895    #[fasync::run_singlethreaded(test)]
896    async fn test_teardown_with_active_requests() {
897        let (tx, rx) = std::sync::mpsc::channel();
898        let interface = Arc::new(MockInterface { request_sender: tx });
899        let session_manager = Arc::new(SessionManager::new(interface.clone()));
900
901        let sm_clone = session_manager.clone();
902        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
903        let server_task = fasync::Task::spawn(async move {
904            let server = crate::BlockServer::new(512, sm_clone);
905            server.handle_requests(stream).await.unwrap();
906        });
907
908        let (session_proxy, session_server_end) =
909            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
910        proxy.open_session(session_server_end).unwrap();
911
912        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
913        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
914
915        let vmo = zx::Vmo::create(8192).unwrap();
916        let vmo_id = session_proxy
917            .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
918            .await
919            .unwrap()
920            .unwrap();
921
922        let req = BlockFifoRequest {
923            command: BlockFifoCommand {
924                opcode: fblock::BlockOpcode::Read.into_primitive(),
925                ..Default::default()
926            },
927            reqid: 129,
928            group: 0,
929            vmoid: vmo_id.id,
930            length: 1,
931            vmo_offset: 0,
932            dev_offset: 0,
933            trace_flow_id: 0,
934            ..Default::default()
935        };
936        // Start a request and wait until it's been submitted.
937        fifo.write(&[req]).unwrap();
938        let r = rx.recv().unwrap();
939
940        // Close the client, which will eventually cause the FIFO loop to exit.
941        drop(session_proxy);
942        fasync::Timer::new(std::time::Duration::from_millis(50)).await;
943
944        // Complete the request, simulating a completion after the FIFO loop has exited.
945        session_manager.complete_request(r.request_id, zx::Status::OK);
946
947        drop(proxy);
948        fasync::unblock(move || session_manager.terminate()).await;
949        server_task.await;
950    }
951
952    #[fasync::run_singlethreaded(test)]
953    async fn test_teardown_with_active_grouped_requests() {
954        let (tx, rx) = std::sync::mpsc::channel();
955        let interface = Arc::new(MockInterface { request_sender: tx });
956        let session_manager = Arc::new(SessionManager::new(interface.clone()));
957
958        let sm_clone = session_manager.clone();
959        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
960        let server_task = fasync::Task::spawn(async move {
961            let server = crate::BlockServer::new(512, sm_clone);
962            server.handle_requests(stream).await.unwrap();
963        });
964
965        let (session_proxy, session_server_end) =
966            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
967        proxy.open_session(session_server_end).unwrap();
968
969        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
970        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
971
972        let vmo = zx::Vmo::create(8192).unwrap();
973        let vmo_id = session_proxy
974            .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
975            .await
976            .unwrap()
977            .unwrap();
978
979        let mut req = BlockFifoRequest {
980            command: BlockFifoCommand {
981                opcode: fblock::BlockOpcode::Read.into_primitive(),
982                flags: fblock::BlockIoFlag::GROUP_ITEM.bits(),
983                ..Default::default()
984            },
985            reqid: 1,
986            group: 1,
987            vmoid: vmo_id.id,
988            length: 1,
989            vmo_offset: 0,
990            dev_offset: 0,
991            trace_flow_id: 0,
992            ..Default::default()
993        };
994
995        // Start two groups of requests and wait until they're been submitted.
996        fifo.write(&[req]).unwrap();
997        req.reqid = 2;
998        req.group = 2;
999        fifo.write(&[req]).unwrap();
1000        let r1 = rx.recv().unwrap();
1001        let r2 = rx.recv().unwrap();
1002        req.reqid = 3;
1003        req.group = 2;
1004        fifo.write(&[req]).unwrap();
1005        let r3 = rx.recv().unwrap();
1006
1007        // Complete requests 1,2.  This leaves two groups in the following states:
1008        // - Group 1: 0 active requests, still waiting for END
1009        // - Group 2: 1 active request, still waiting for END
1010        // Neither group will be able to complete yet.
1011        session_manager.complete_request(r1.request_id, zx::Status::OK);
1012        session_manager.complete_request(r2.request_id, zx::Status::OK);
1013
1014        // Close the client, which will eventually cause the FIFO loop to exit.
1015        // Group 1 should complete now.  Group 2 can't yet.
1016        drop(session_proxy);
1017        fasync::Timer::new(std::time::Duration::from_millis(50)).await;
1018
1019        // At some later time, complete request 3, which should complete group 2.
1020        session_manager.complete_request(r3.request_id, zx::Status::OK);
1021
1022        drop(proxy);
1023
1024        fasync::unblock(move || session_manager.terminate()).await;
1025        server_task.await;
1026    }
1027}