Skip to main content

block_server/
callback_interface.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::{
6    ActiveRequests, DecodedRequest, DeviceInfo, HandleRequestResult, IntoOrchestrator, OffsetMap,
7    Operation, RequestId, SessionHelper, TraceFlowId, WriteFlags,
8};
9use anyhow::Error;
10use block_protocol::{BlockFifoRequest, BlockFifoResponse};
11use fidl_fuchsia_storage_block as fblock;
12use fuchsia_sync::{Condvar, Mutex};
13use futures::TryStreamExt as _;
14use futures::stream::{AbortHandle, Abortable};
15use std::borrow::{Borrow, Cow};
16use std::collections::{HashMap, VecDeque};
17use std::mem::MaybeUninit;
18use std::sync::{Arc, Weak};
19
20/// An in-flight request.
21#[derive(Clone, Debug)]
22pub struct Request {
23    /// The ID that a request is associated with, for later completion in
24    /// [`SessionManager::complete_request`].  Note that this is not necessarily a unique
25    /// identifier, and multiple Requests may have the same request_id (e.g. due to request
26    /// splitting).  The library internally reference-counts requests which use this ID.
27    pub request_id: RequestId,
28    pub operation: Operation,
29    pub trace_flow_id: TraceFlowId,
30    /// `vmo` is always Some for Operation::Read or Operation::Write, and None otherwise.
31    pub vmo: Option<Arc<zx::Vmo>>,
32}
33
34pub trait Interface: Send + Sync + Unpin + 'static {
35    type Orchestrator: Borrow<SessionManager<Self>> + Send + Sync;
36
37    /// Called to get block/partition information.
38    fn get_info(&self) -> Cow<'_, DeviceInfo>;
39
40    /// Start running a new session.  The interface must ensure that [`Session::run`] is called
41    /// on a different thread.
42    fn spawn_session(&self, session: Arc<Session<Self>>);
43
44    /// Starts a batch of requests.  The implementation may block if there are too many in-flight
45    /// requests, providing pushback.  The interface is responsible for eventually calling
46    /// [`SessionManager::complete_request`] for each request (even during server shutdown).
47    fn on_requests(&self, requests: &[Request]);
48}
49
50struct SessionManagerInner<I: Interface + ?Sized> {
51    open_sessions: HashMap<usize, Weak<Session<I>>>,
52}
53
54// The signals used on the session's FIFO.
55
56/// Signalled on the session's FIFO to wake up the FIFO loop.
57const FIFO_WAKE_SIGNAL: zx::Signals = zx::Signals::USER_0;
58
59/// Signalled on the session's FIFO to terminate the FIFO loop.
60const SHUTDOWN_SIGNAL: zx::Signals = zx::Signals::USER_1;
61
62pub struct SessionManager<I: Interface + ?Sized> {
63    interface: Arc<I>,
64    // These represent active *client* requests, which correspond to one or more in-flight requests.
65    active_requests: ActiveRequests<Arc<Session<I>>>,
66    inflight_requests: Mutex<usize>,
67    no_inflight_requests_condvar: Condvar,
68    inner: Mutex<SessionManagerInner<I>>,
69    no_open_sessions_condvar: Condvar,
70}
71
72impl<I: Interface + ?Sized> super::SessionManager for SessionManager<I> {
73    const SUPPORTS_DECOMPRESSION: bool = false;
74
75    type Orchestrator = I::Orchestrator;
76    type Session = Arc<Session<I>>;
77
78    async fn on_attach_vmo(
79        _orchestrator: Arc<Self::Orchestrator>,
80        _vmo: &Arc<zx::Vmo>,
81    ) -> Result<(), zx::Status> {
82        Ok(())
83    }
84
85    async fn open_session(
86        orchestrator: Arc<Self::Orchestrator>,
87        mut stream: fblock::SessionRequestStream,
88        offset_map: OffsetMap,
89        block_size: u32,
90    ) -> Result<(), Error> {
91        let (helper, fifo) = SessionHelper::new(orchestrator.clone(), offset_map, block_size)?;
92        let (abort_handle, registration) = AbortHandle::new_pair();
93        let session = Arc::new(Session {
94            helper,
95            fifo,
96            queue: Mutex::default(),
97            abort_handle,
98            close_callback: Mutex::new(None),
99        });
100        let sm = orchestrator.as_ref().borrow();
101        sm.inner
102            .lock()
103            .open_sessions
104            .insert(Arc::as_ptr(&session) as usize, Arc::downgrade(&session));
105
106        sm.interface.spawn_session(session.clone());
107
108        let result = Abortable::new(
109            async {
110                while let Some(request) = stream.try_next().await? {
111                    match session.helper.handle_request(request).await? {
112                        HandleRequestResult::Ok => {}
113                        HandleRequestResult::Closed(callback) => {
114                            *session.close_callback.lock() = Some(callback);
115                            break;
116                        }
117                    }
118                }
119                Ok(())
120            },
121            registration,
122        )
123        .await
124        .unwrap_or_else(|e| Err(e.into()));
125
126        let _ = session.fifo.signal(zx::Signals::empty(), SHUTDOWN_SIGNAL);
127
128        result
129    }
130
131    fn get_info(&self) -> Cow<'_, super::DeviceInfo> {
132        self.interface.get_info()
133    }
134
135    fn active_requests(&self) -> &ActiveRequests<Arc<Session<I>>> {
136        &self.active_requests
137    }
138}
139
140impl<I: Interface + ?Sized> SessionManager<I> {
141    pub fn new(interface: Arc<I>) -> Self {
142        Self {
143            interface,
144            active_requests: ActiveRequests::default(),
145            inflight_requests: Mutex::new(0),
146            no_inflight_requests_condvar: Condvar::new(),
147            inner: Mutex::new(SessionManagerInner { open_sessions: HashMap::new() }),
148            no_open_sessions_condvar: Condvar::new(),
149        }
150    }
151
152    /// Reports the given task as complete with a given status.
153    pub fn complete_request(&self, request_id: RequestId, status: zx::Status) {
154        let notify = {
155            let mut inflight_requests = self.inflight_requests.lock();
156            *inflight_requests -= 1;
157            *inflight_requests == 0
158        };
159        self.complete_unsubmitted_request(request_id, status);
160        if notify {
161            self.no_inflight_requests_condvar.notify_all();
162        }
163    }
164
165    fn submit_requests(&self, requests: &[Request]) {
166        *self.inflight_requests.lock() += requests.len();
167        self.interface.on_requests(requests);
168    }
169
170    /// Waits for there to be no requests in-flight.
171    ///
172    /// NOTE: To void TOCTOUs, this must be called on the same thread which calls
173    /// [`Self::submit_requests`].
174    fn wait_for_no_inflight_requests(&self) {
175        let mut guard = self.inflight_requests.lock();
176        self.no_inflight_requests_condvar.wait_while(&mut guard, |count| *count > 0);
177    }
178
179    /// Called instead of `[Self::complete_request]` when a request is completed before it was
180    /// actually submitted.
181    fn complete_unsubmitted_request(&self, request_id: RequestId, status: zx::Status) {
182        if let Some((session, response)) =
183            self.active_requests.complete_and_take_response(request_id, status)
184        {
185            session.send_response(response);
186        }
187    }
188
189    /// Terminates the session manager.  Blocks until all sessions have terminated.
190    pub fn terminate(&self) {
191        {
192            // We must drop references to sessions whilst we're not holding the lock for
193            // `open_sessions` because `Session::drop` needs to take that same lock.
194            #[allow(clippy::collection_is_never_read)]
195            let mut terminated_sessions = Vec::new();
196            for (_, session) in &self.inner.lock().open_sessions {
197                if let Some(session) = session.upgrade() {
198                    session.terminate_async();
199                    terminated_sessions.push(session);
200                }
201            }
202        }
203        let mut guard = self.inner.lock();
204        self.no_open_sessions_condvar.wait_while(&mut guard, |s| !s.open_sessions.is_empty());
205    }
206}
207
208pub struct Session<I: Interface + ?Sized> {
209    helper: SessionHelper<SessionManager<I>>,
210    fifo: zx::Fifo<BlockFifoRequest, BlockFifoResponse>,
211    queue: Mutex<SessionQueue>,
212    abort_handle: AbortHandle,
213    close_callback: Mutex<Option<Box<dyn FnOnce() + Send + 'static>>>,
214}
215
216#[derive(Default)]
217struct SessionQueue {
218    responses: VecDeque<BlockFifoResponse>,
219}
220
221pub const MAX_REQUESTS: usize = super::FIFO_MAX_REQUESTS;
222
223struct DecodedRequests {
224    requests: [MaybeUninit<Request>; MAX_REQUESTS],
225    count: usize,
226}
227
228impl Default for DecodedRequests {
229    fn default() -> Self {
230        Self { requests: unsafe { MaybeUninit::uninit().assume_init() }, count: 0 }
231    }
232}
233
234impl DecodedRequests {
235    fn push(&mut self, request: Request) {
236        assert!(self.count < MAX_REQUESTS);
237        // To ensure drop runs, we must initialize each element at most once.
238        // As long as we only go via this function, that's satisfied.
239        self.requests[self.count].write(request);
240        self.count += 1;
241    }
242
243    fn is_empty(&self) -> bool {
244        self.count == 0
245    }
246
247    fn is_full(&self) -> bool {
248        self.count == MAX_REQUESTS
249    }
250
251    fn clear(&mut self) {
252        for i in 0..self.count {
253            // SAFETY: We initialized `count` elements via [`Self::push`].
254            unsafe { self.requests[i].assume_init_drop() };
255        }
256        self.count = 0;
257    }
258}
259
260impl Drop for DecodedRequests {
261    fn drop(&mut self) {
262        self.clear();
263    }
264}
265
266impl std::ops::Deref for DecodedRequests {
267    type Target = [Request];
268
269    fn deref(&self) -> &Self::Target {
270        // SAFETY: We wrote the request in [`Self::push`].
271        unsafe { std::slice::from_raw_parts(self.requests[0].as_ptr(), self.count) }
272    }
273}
274
275impl std::ops::DerefMut for DecodedRequests {
276    fn deref_mut(&mut self) -> &mut Self::Target {
277        // SAFETY: We wrote the request in [`Self::push`].
278        unsafe { std::slice::from_raw_parts_mut(self.requests[0].as_mut_ptr(), self.count) }
279    }
280}
281
282impl<I: Interface + ?Sized> Session<I> {
283    /// Begins processing the session's FIFO.  Blocks until completion (either because of
284    /// termination, or due to unrecoverable error).
285    pub fn run(self: &Arc<Self>) {
286        self.fifo_loop();
287        self.abort_handle.abort();
288        // NB: We cannot call [`drop_active_requests`] here, because requests which have already
289        // been submitted are no longer in control by this thread, and will be later completed by
290        // another thread.  If we dropped them here, then they would be completed twice.
291        self.helper.close_active_groups(|s| Arc::ptr_eq(s, self));
292    }
293
294    fn fifo_loop(self: &Arc<Self>) {
295        let mut requests = [MaybeUninit::uninit(); MAX_REQUESTS];
296
297        loop {
298            // Send queued responses.
299            let is_queue_empty = {
300                let mut queue = self.queue.lock();
301                while !queue.responses.is_empty() {
302                    let (front, _) = queue.responses.as_slices();
303                    match self.fifo.write(front) {
304                        Ok(count) => {
305                            let full = count < front.len();
306                            queue.responses.drain(..count);
307                            if full {
308                                break;
309                            }
310                        }
311                        Err(zx::Status::SHOULD_WAIT) => break,
312                        Err(_) => return,
313                    }
314                }
315                queue.responses.is_empty()
316            };
317
318            // Process pending reads.
319            match self.fifo.read_uninit(&mut requests) {
320                Ok(valid_requests) => self.handle_requests(valid_requests.iter_mut()),
321                Err(zx::Status::SHOULD_WAIT) => {
322                    let mut signals =
323                        zx::Signals::OBJECT_READABLE | SHUTDOWN_SIGNAL | FIFO_WAKE_SIGNAL;
324                    if !is_queue_empty {
325                        signals |= zx::Signals::OBJECT_WRITABLE;
326                    }
327                    let Ok(signals) =
328                        self.fifo.wait_one(signals, zx::MonotonicInstant::INFINITE).to_result()
329                    else {
330                        return;
331                    };
332                    if signals.contains(SHUTDOWN_SIGNAL) {
333                        return;
334                    }
335                    // Clear FIFO_WAKE_SIGNAL if it's set.
336                    if signals.contains(FIFO_WAKE_SIGNAL) {
337                        let _ = self.fifo.signal(FIFO_WAKE_SIGNAL, zx::Signals::empty());
338                    }
339                }
340                Err(_) => return,
341            }
342        }
343    }
344
345    /// Synchronously performs a device flush.
346    fn pre_flush(self: &Arc<Self>, request_id: RequestId) -> Result<(), zx::Status> {
347        let trace_flow_id = {
348            let mut request = self.helper.session_manager().active_requests.request(request_id);
349            if let Some(id) = request.trace_flow_id {
350                fuchsia_trace::async_instant!(
351                    fuchsia_trace::Id::from(id.get()),
352                    c"storage",
353                    c"block_server::SimulatedBarrier",
354                    "request_id" => request_id.0
355                );
356            }
357            request.count += 1;
358            request.trace_flow_id
359        };
360        self.helper.session_manager().submit_requests(&[Request {
361            request_id,
362            operation: Operation::Flush,
363            trace_flow_id,
364            vmo: None,
365        }]);
366        self.helper.session_manager().wait_for_no_inflight_requests();
367        let status = self.helper.session_manager().active_requests.request(request_id).status;
368        match status {
369            zx::Status::OK => Ok(()),
370            status => {
371                // Respond for the unsubmitted request too.
372                self.helper.session_manager().complete_unsubmitted_request(request_id, status);
373                Err(status)
374            }
375        }
376    }
377
378    /// Synchronously completes `decoded_requests`, and inserts a post-flush into `decoded_requests`
379    /// to be submitted later.
380    fn post_flush(self: &Arc<Self>, request_id: RequestId, decoded_requests: &mut DecodedRequests) {
381        if !decoded_requests.is_empty() {
382            self.helper.session_manager().submit_requests(decoded_requests);
383            decoded_requests.clear();
384        }
385        self.helper.session_manager().wait_for_no_inflight_requests();
386        let request = self.helper.session_manager().active_requests.request(request_id);
387        match request.status {
388            zx::Status::OK => decoded_requests.push(Request {
389                request_id,
390                operation: Operation::Flush,
391                trace_flow_id: request.trace_flow_id,
392                vmo: None,
393            }),
394            status => {
395                drop(request);
396                self.helper.session_manager().complete_unsubmitted_request(request_id, status)
397            }
398        }
399    }
400
401    fn handle_requests<'a>(
402        self: &Arc<Self>,
403        requests: impl Iterator<Item = &'a mut BlockFifoRequest>,
404    ) {
405        let manager = &self.helper.session_manager();
406        let mut decoded_requests = DecodedRequests::default();
407
408        for request in requests {
409            match self.helper.decode_fifo_request(self.clone(), request) {
410                Ok(DecodedRequest { operation: Operation::CloseVmo, request_id, .. }) => {
411                    manager.complete_unsubmitted_request(request_id, zx::Status::OK);
412                }
413                Ok(mut request) => {
414                    let request_id = request.request_id;
415
416                    // Strip the PRE_BARRIER flag if we don't support it, and simulate the barrier
417                    // with a pre-flush.
418                    if !manager
419                        .interface
420                        .get_info()
421                        .device_flags()
422                        .contains(fblock::DeviceFlag::BARRIER_SUPPORT)
423                        && request.operation.take_write_flag(WriteFlags::PRE_BARRIER)
424                        && self.pre_flush(request_id).is_err()
425                    {
426                        continue;
427                    }
428                    // Strip the FORCE_ACCESS flag if we don't support it, and simulate the FUA with
429                    // a post-flush.
430                    let simulate_fua = !manager
431                        .interface
432                        .get_info()
433                        .device_flags()
434                        .contains(fblock::DeviceFlag::FUA_SUPPORT)
435                        && request.operation.take_write_flag(WriteFlags::FORCE_ACCESS);
436
437                    if simulate_fua {
438                        // Account for the additional request we need at the end.
439                        manager.active_requests.request(request_id).count += 1;
440                    }
441
442                    loop {
443                        let result = self
444                            .helper
445                            .map_request(request, &mut manager.active_requests.request(request_id));
446                        match result {
447                            Ok((
448                                DecodedRequest { request_id, operation, vmo, trace_flow_id },
449                                remainder,
450                            )) => {
451                                decoded_requests.push(Request {
452                                    request_id,
453                                    operation,
454                                    trace_flow_id,
455                                    vmo,
456                                });
457
458                                if decoded_requests.is_full() {
459                                    manager.submit_requests(&*decoded_requests);
460                                    decoded_requests.clear();
461                                }
462
463                                if let Some(r) = remainder {
464                                    request = r;
465                                } else {
466                                    break;
467                                }
468                            }
469                            Err(status) => {
470                                manager.complete_unsubmitted_request(request_id, status);
471                                break;
472                            }
473                        }
474                    }
475
476                    if simulate_fua {
477                        self.post_flush(request_id, &mut decoded_requests);
478                    }
479                }
480                Err(None) => {}
481                Err(Some(response)) => self.send_response(response),
482            }
483        }
484
485        if !decoded_requests.is_empty() {
486            manager.submit_requests(&decoded_requests);
487        }
488    }
489
490    fn send_response(&self, response: BlockFifoResponse) {
491        let mut queue = self.queue.lock();
492        if queue.responses.is_empty() {
493            match self.fifo.write_one(&response) {
494                Ok(()) => {
495                    return;
496                }
497                Err(_) => {
498                    // Wake the FIFO loop up so that we can send the response later.
499                    let _ = self.fifo.signal(zx::Signals::empty(), FIFO_WAKE_SIGNAL);
500                }
501            }
502        }
503        queue.responses.push_back(response);
504    }
505
506    /// Asynchronously request to terminate the Session.  The session's thread will eventually stop
507    /// running.
508    pub fn terminate_async(&self) {
509        let _ = self.fifo.signal(zx::Signals::empty(), SHUTDOWN_SIGNAL);
510        self.abort_handle.abort();
511    }
512}
513
514impl<I: Interface + ?Sized> Drop for Session<I> {
515    fn drop(&mut self) {
516        let callback = std::mem::take(&mut *self.close_callback.lock());
517        if let Some(callback) = callback {
518            callback();
519        }
520        let notify = {
521            let mut inner = self.helper.session_manager().inner.lock();
522            inner.open_sessions.remove(&(self as *const _ as usize));
523            inner.open_sessions.is_empty()
524        };
525        if notify {
526            self.helper.session_manager().no_open_sessions_condvar.notify_all();
527        }
528    }
529}
530
531impl<I: Interface + ?Sized> Drop for SessionManager<I> {
532    fn drop(&mut self) {
533        self.terminate();
534    }
535}
536
537impl<I: Interface<Orchestrator = SessionManager<I>>> IntoOrchestrator for Arc<SessionManager<I>> {
538    type SM = SessionManager<I>;
539
540    fn into_orchestrator(self) -> Arc<I::Orchestrator> {
541        self
542    }
543}
544
545#[cfg(test)]
546mod tests {
547    use super::*;
548    use crate::BlockInfo;
549    use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse};
550    use fidl::endpoints::create_proxy_and_stream;
551    use fidl_fuchsia_storage_block as fblock;
552    use fuchsia_async as fasync;
553
554    struct MockInterface {
555        request_sender: std::sync::mpsc::Sender<Request>,
556    }
557
558    impl Interface for MockInterface {
559        type Orchestrator = SessionManager<Self>;
560
561        fn get_info(&self) -> Cow<'_, DeviceInfo> {
562            Cow::Owned(DeviceInfo::Block(BlockInfo { block_count: 1024, ..Default::default() }))
563        }
564
565        fn spawn_session(&self, session: Arc<Session<Self>>) {
566            std::thread::spawn(move || {
567                session.run();
568            });
569        }
570
571        fn on_requests(&self, requests: &[Request]) {
572            for request in requests {
573                self.request_sender.send(request.clone()).unwrap();
574            }
575        }
576    }
577
578    #[fasync::run_singlethreaded(test)]
579    async fn test_basic_request() {
580        let (tx, rx) = std::sync::mpsc::channel();
581        let interface = Arc::new(MockInterface { request_sender: tx });
582        let session_manager = Arc::new(SessionManager::new(interface.clone()));
583
584        let sm_clone = session_manager.clone();
585        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
586        let _server_task = fasync::Task::spawn(async move {
587            let server = crate::BlockServer::new(512, sm_clone);
588            server.handle_requests(stream).await.unwrap();
589        })
590        .detach();
591
592        let (session_proxy, session_server_end) =
593            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
594        proxy.open_session(session_server_end).unwrap();
595
596        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
597        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
598
599        let vmo = zx::Vmo::create(8192).unwrap();
600        let vmo_id = session_proxy
601            .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
602            .await
603            .unwrap()
604            .unwrap();
605
606        let req = BlockFifoRequest {
607            command: BlockFifoCommand {
608                opcode: fblock::BlockOpcode::Read.into_primitive(),
609                ..Default::default()
610            },
611            reqid: 123,
612            group: 0,
613            vmoid: vmo_id.id,
614            length: 1,
615            vmo_offset: 0,
616            dev_offset: 0,
617            trace_flow_id: 0,
618            ..Default::default()
619        };
620        fifo.write(&[req]).unwrap();
621
622        let r = rx.recv().unwrap();
623        assert_eq!(r.request_id.0, 0);
624        assert!(matches!(r.operation, Operation::Read { .. }));
625
626        session_manager.complete_request(r.request_id, zx::Status::OK);
627
628        let signals =
629            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
630        assert!(signals.contains(zx::Signals::FIFO_READABLE));
631
632        let mut resp = [BlockFifoResponse::default()];
633        fifo.read(&mut resp).unwrap();
634        assert_eq!(resp[0].reqid, 123);
635        assert_eq!(resp[0].status, zx::sys::ZX_OK);
636
637        std::mem::drop(proxy);
638    }
639    #[fasync::run_singlethreaded(test)]
640    async fn test_write_request() {
641        let (tx, rx) = std::sync::mpsc::channel();
642        let interface = Arc::new(MockInterface { request_sender: tx });
643        let session_manager = Arc::new(SessionManager::new(interface.clone()));
644
645        let sm_clone = session_manager.clone();
646        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
647        let _server_task = fasync::Task::spawn(async move {
648            let server = crate::BlockServer::new(512, sm_clone);
649            server.handle_requests(stream).await.unwrap();
650        })
651        .detach();
652
653        let (session_proxy, session_server_end) =
654            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
655        proxy.open_session(session_server_end).unwrap();
656
657        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
658        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
659
660        let vmo = zx::Vmo::create(8192).unwrap();
661        let vmo_id = session_proxy
662            .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
663            .await
664            .unwrap()
665            .unwrap();
666
667        let req = BlockFifoRequest {
668            command: BlockFifoCommand {
669                opcode: fblock::BlockOpcode::Write.into_primitive(),
670                ..Default::default()
671            },
672            reqid: 124,
673            group: 0,
674            vmoid: vmo_id.id,
675            length: 1,
676            vmo_offset: 0,
677            dev_offset: 0,
678            trace_flow_id: 0,
679            ..Default::default()
680        };
681        fifo.write(&[req]).unwrap();
682
683        let r = rx.recv().unwrap();
684        assert_eq!(r.request_id.0, 0);
685        assert!(matches!(r.operation, Operation::Write { .. }));
686
687        session_manager.complete_request(r.request_id, zx::Status::OK);
688
689        let signals =
690            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
691        assert!(signals.contains(zx::Signals::FIFO_READABLE));
692
693        let mut resp = [BlockFifoResponse::default()];
694        fifo.read(&mut resp).unwrap();
695        assert_eq!(resp[0].reqid, 124);
696        assert_eq!(resp[0].status, zx::sys::ZX_OK);
697    }
698
699    #[fasync::run_singlethreaded(test)]
700    async fn test_flush_request() {
701        let (tx, rx) = std::sync::mpsc::channel();
702        let interface = Arc::new(MockInterface { request_sender: tx });
703        let session_manager = Arc::new(SessionManager::new(interface.clone()));
704
705        let sm_clone = session_manager.clone();
706        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
707        let _server_task = fasync::Task::spawn(async move {
708            let server = crate::BlockServer::new(512, sm_clone);
709            server.handle_requests(stream).await.unwrap();
710        })
711        .detach();
712
713        let (session_proxy, session_server_end) =
714            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
715        proxy.open_session(session_server_end).unwrap();
716
717        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
718        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
719
720        let req = BlockFifoRequest {
721            command: BlockFifoCommand {
722                opcode: fblock::BlockOpcode::Flush.into_primitive(),
723                ..Default::default()
724            },
725            reqid: 125,
726            group: 0,
727            vmoid: fblock::VMOID_INVALID,
728            length: 0,
729            vmo_offset: 0,
730            dev_offset: 0,
731            trace_flow_id: 0,
732            ..Default::default()
733        };
734        fifo.write(&[req]).unwrap();
735
736        let r = rx.recv().unwrap();
737        assert_eq!(r.request_id.0, 0);
738        assert!(matches!(r.operation, Operation::Flush { .. }));
739
740        session_manager.complete_request(r.request_id, zx::Status::OK);
741
742        let signals =
743            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
744        assert!(signals.contains(zx::Signals::FIFO_READABLE));
745
746        let mut resp = [BlockFifoResponse::default()];
747        fifo.read(&mut resp).unwrap();
748        assert_eq!(resp[0].reqid, 125);
749        assert_eq!(resp[0].status, zx::sys::ZX_OK);
750    }
751
752    #[fasync::run_singlethreaded(test)]
753    async fn test_trim_request() {
754        let (tx, rx) = std::sync::mpsc::channel();
755        let interface = Arc::new(MockInterface { request_sender: tx });
756        let session_manager = Arc::new(SessionManager::new(interface.clone()));
757
758        let sm_clone = session_manager.clone();
759        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
760        let _server_task = fasync::Task::spawn(async move {
761            let server = crate::BlockServer::new(512, sm_clone);
762            server.handle_requests(stream).await.unwrap();
763        })
764        .detach();
765
766        let (session_proxy, session_server_end) =
767            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
768        proxy.open_session(session_server_end).unwrap();
769
770        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
771        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
772
773        let req = BlockFifoRequest {
774            command: BlockFifoCommand {
775                opcode: fblock::BlockOpcode::Trim.into_primitive(),
776                ..Default::default()
777            },
778            reqid: 126,
779            group: 0,
780            vmoid: fblock::VMOID_INVALID,
781            length: 1,
782            vmo_offset: 0,
783            dev_offset: 0,
784            trace_flow_id: 0,
785            ..Default::default()
786        };
787        fifo.write(&[req]).unwrap();
788
789        let r = rx.recv().unwrap();
790        assert_eq!(r.request_id.0, 0);
791        assert!(matches!(r.operation, Operation::Trim { .. }));
792
793        session_manager.complete_request(r.request_id, zx::Status::OK);
794
795        let signals =
796            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
797        assert!(signals.contains(zx::Signals::FIFO_READABLE));
798
799        let mut resp = [BlockFifoResponse::default()];
800        fifo.read(&mut resp).unwrap();
801        assert_eq!(resp[0].reqid, 126);
802        assert_eq!(resp[0].status, zx::sys::ZX_OK);
803    }
804
805    #[fasync::run_singlethreaded(test)]
806    async fn test_close_vmo() {
807        let (tx, rx) = std::sync::mpsc::channel();
808        let interface = Arc::new(MockInterface { request_sender: tx });
809        let session_manager = Arc::new(SessionManager::new(interface.clone()));
810
811        let sm_clone = session_manager.clone();
812        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
813        let _server_task = fasync::Task::spawn(async move {
814            let server = crate::BlockServer::new(512, sm_clone);
815            server.handle_requests(stream).await.unwrap();
816        })
817        .detach();
818
819        let (session_proxy, session_server_end) =
820            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
821        proxy.open_session(session_server_end).unwrap();
822
823        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
824        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
825
826        let vmo = zx::Vmo::create(8192).unwrap();
827        let vmo_id = session_proxy
828            .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
829            .await
830            .unwrap()
831            .unwrap();
832
833        let req = BlockFifoRequest {
834            command: BlockFifoCommand {
835                opcode: fblock::BlockOpcode::CloseVmo.into_primitive(),
836                ..Default::default()
837            },
838            reqid: 127,
839            group: 0,
840            vmoid: vmo_id.id,
841            length: 0,
842            vmo_offset: 0,
843            dev_offset: 0,
844            trace_flow_id: 0,
845            ..Default::default()
846        };
847        fifo.write(&[req]).unwrap();
848
849        let signals =
850            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
851        assert!(signals.contains(zx::Signals::FIFO_READABLE));
852
853        let mut resp = [BlockFifoResponse::default()];
854        fifo.read(&mut resp).unwrap();
855        assert_eq!(resp[0].reqid, 127);
856        assert_eq!(resp[0].status, zx::sys::ZX_OK);
857
858        // CloseVmo is handled automatically. Interface should not receive anything.
859        assert!(rx.try_recv().is_err());
860    }
861
862    #[fasync::run_singlethreaded(test)]
863    async fn test_error() {
864        let (tx, rx) = std::sync::mpsc::channel();
865        let interface = Arc::new(MockInterface { request_sender: tx });
866        let session_manager = Arc::new(SessionManager::new(interface.clone()));
867
868        let sm_clone = session_manager.clone();
869        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
870        let _server_task = fasync::Task::spawn(async move {
871            let server = crate::BlockServer::new(512, sm_clone);
872            server.handle_requests(stream).await.unwrap();
873        })
874        .detach();
875
876        let (session_proxy, session_server_end) =
877            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
878        proxy.open_session(session_server_end).unwrap();
879
880        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
881        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
882
883        let req = BlockFifoRequest {
884            command: BlockFifoCommand {
885                opcode: fblock::BlockOpcode::Flush.into_primitive(),
886                ..Default::default()
887            },
888            reqid: 128,
889            group: 0,
890            vmoid: fblock::VMOID_INVALID,
891            length: 0,
892            vmo_offset: 0,
893            dev_offset: 0,
894            trace_flow_id: 0,
895            ..Default::default()
896        };
897        fifo.write(&[req]).unwrap();
898
899        let r = rx.recv().unwrap();
900        session_manager.complete_request(r.request_id, zx::Status::IO);
901
902        let signals =
903            fifo.wait_one(zx::Signals::FIFO_READABLE, zx::MonotonicInstant::INFINITE).unwrap();
904        assert!(signals.contains(zx::Signals::FIFO_READABLE));
905
906        let mut resp = [BlockFifoResponse::default()];
907        fifo.read(&mut resp).unwrap();
908        assert_eq!(resp[0].reqid, 128);
909        assert_eq!(resp[0].status, zx::sys::ZX_ERR_IO);
910    }
911
912    #[fasync::run_singlethreaded(test)]
913    async fn test_teardown_with_active_requests() {
914        let (tx, rx) = std::sync::mpsc::channel();
915        let interface = Arc::new(MockInterface { request_sender: tx });
916        let session_manager = Arc::new(SessionManager::new(interface.clone()));
917
918        let sm_clone = session_manager.clone();
919        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
920        let server_task = fasync::Task::spawn(async move {
921            let server = crate::BlockServer::new(512, sm_clone);
922            server.handle_requests(stream).await.unwrap();
923        });
924
925        let (session_proxy, session_server_end) =
926            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
927        proxy.open_session(session_server_end).unwrap();
928
929        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
930        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
931
932        let vmo = zx::Vmo::create(8192).unwrap();
933        let vmo_id = session_proxy
934            .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
935            .await
936            .unwrap()
937            .unwrap();
938
939        let req = BlockFifoRequest {
940            command: BlockFifoCommand {
941                opcode: fblock::BlockOpcode::Read.into_primitive(),
942                ..Default::default()
943            },
944            reqid: 129,
945            group: 0,
946            vmoid: vmo_id.id,
947            length: 1,
948            vmo_offset: 0,
949            dev_offset: 0,
950            trace_flow_id: 0,
951            ..Default::default()
952        };
953        // Start a request and wait until it's been submitted.
954        fifo.write(&[req]).unwrap();
955        let r = rx.recv().unwrap();
956
957        // Close the client, which will eventually cause the FIFO loop to exit.
958        drop(session_proxy);
959        fasync::Timer::new(std::time::Duration::from_millis(50)).await;
960
961        // Complete the request, simulating a completion after the FIFO loop has exited.
962        session_manager.complete_request(r.request_id, zx::Status::OK);
963
964        drop(proxy);
965        fasync::unblock(move || session_manager.terminate()).await;
966        server_task.await;
967    }
968
969    #[fasync::run_singlethreaded(test)]
970    async fn test_teardown_with_active_grouped_requests() {
971        let (tx, rx) = std::sync::mpsc::channel();
972        let interface = Arc::new(MockInterface { request_sender: tx });
973        let session_manager = Arc::new(SessionManager::new(interface.clone()));
974
975        let sm_clone = session_manager.clone();
976        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
977        let server_task = fasync::Task::spawn(async move {
978            let server = crate::BlockServer::new(512, sm_clone);
979            server.handle_requests(stream).await.unwrap();
980        });
981
982        let (session_proxy, session_server_end) =
983            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
984        proxy.open_session(session_server_end).unwrap();
985
986        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
987        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
988
989        let vmo = zx::Vmo::create(8192).unwrap();
990        let vmo_id = session_proxy
991            .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
992            .await
993            .unwrap()
994            .unwrap();
995
996        let mut req = BlockFifoRequest {
997            command: BlockFifoCommand {
998                opcode: fblock::BlockOpcode::Read.into_primitive(),
999                flags: fblock::BlockIoFlag::GROUP_ITEM.bits(),
1000                ..Default::default()
1001            },
1002            reqid: 1,
1003            group: 1,
1004            vmoid: vmo_id.id,
1005            length: 1,
1006            vmo_offset: 0,
1007            dev_offset: 0,
1008            trace_flow_id: 0,
1009            ..Default::default()
1010        };
1011
1012        // Start two groups of requests and wait until they're been submitted.
1013        fifo.write(&[req]).unwrap();
1014        req.reqid = 2;
1015        req.group = 2;
1016        fifo.write(&[req]).unwrap();
1017        let r1 = rx.recv().unwrap();
1018        let r2 = rx.recv().unwrap();
1019        req.reqid = 3;
1020        req.group = 2;
1021        fifo.write(&[req]).unwrap();
1022        let r3 = rx.recv().unwrap();
1023
1024        // Complete requests 1,2.  This leaves two groups in the following states:
1025        // - Group 1: 0 active requests, still waiting for END
1026        // - Group 2: 1 active request, still waiting for END
1027        // Neither group will be able to complete yet.
1028        session_manager.complete_request(r1.request_id, zx::Status::OK);
1029        session_manager.complete_request(r2.request_id, zx::Status::OK);
1030
1031        // Close the client, which will eventually cause the FIFO loop to exit.
1032        // Group 1 should complete now.  Group 2 can't yet.
1033        drop(session_proxy);
1034        fasync::Timer::new(std::time::Duration::from_millis(50)).await;
1035
1036        // At some later time, complete request 3, which should complete group 2.
1037        session_manager.complete_request(r3.request_id, zx::Status::OK);
1038
1039        drop(proxy);
1040
1041        fasync::unblock(move || session_manager.terminate()).await;
1042        server_task.await;
1043    }
1044
1045    #[fasync::run_singlethreaded(test)]
1046    async fn test_session_close_is_synchronous() {
1047        use futures::FutureExt as _;
1048
1049        let (tx, rx) = std::sync::mpsc::channel();
1050        let interface = Arc::new(MockInterface { request_sender: tx });
1051        let session_manager = Arc::new(SessionManager::new(interface.clone()));
1052
1053        let sm_clone = session_manager.clone();
1054        let (proxy, stream) = create_proxy_and_stream::<fblock::BlockMarker>();
1055        let server_task = fasync::Task::spawn(async move {
1056            let server = crate::BlockServer::new(512, sm_clone);
1057            server.handle_requests(stream).await.unwrap();
1058        });
1059
1060        let (session_proxy, session_server_end) =
1061            fidl::endpoints::create_proxy::<fblock::SessionMarker>();
1062        proxy.open_session(session_server_end).unwrap();
1063
1064        let fifo_handle = session_proxy.get_fifo().await.unwrap().unwrap();
1065        let fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest> = zx::Fifo::from(fifo_handle);
1066
1067        let vmo = zx::Vmo::create(8192).unwrap();
1068        let vmo_id = session_proxy
1069            .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1070            .await
1071            .unwrap()
1072            .unwrap();
1073
1074        let req = BlockFifoRequest {
1075            command: BlockFifoCommand {
1076                opcode: fblock::BlockOpcode::Read.into_primitive(),
1077                ..Default::default()
1078            },
1079            reqid: 123,
1080            group: 0,
1081            vmoid: vmo_id.id,
1082            length: 1,
1083            vmo_offset: 0,
1084            dev_offset: 0,
1085            trace_flow_id: 0,
1086            ..Default::default()
1087        };
1088        fifo.write(&[req]).unwrap();
1089
1090        let r = rx.recv().unwrap();
1091
1092        // The close request shouldn't complete yet because the read is still hanging.
1093        let mut close_fut = std::pin::pin!(session_proxy.close().fuse());
1094        let mut timer_fut =
1095            std::pin::pin!(fasync::Timer::new(std::time::Duration::from_millis(100)).fuse());
1096        futures::select! {
1097            res = close_fut => panic!("close completed too early: {:?}", res),
1098            _ = timer_fut => {}
1099        }
1100
1101        session_manager.complete_request(r.request_id, zx::Status::OK);
1102
1103        // Verify that close() now completes.
1104        close_fut.await.unwrap().unwrap();
1105
1106        std::mem::drop(proxy);
1107        server_task.await;
1108    }
1109}