virtio_device/
util.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Small utility wrappers and trait implementations.
6//!
7//! This contains some commonly useful wrappers and trait implementations that are obvious enough to
8//! warrant an implementation, but have an amount of policy that you may want to opt out of.
9
10use crate::queue::{DescChain, DriverNotify, Queue};
11use futures::task::AtomicWaker;
12use futures::Stream;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
15use std::sync::Arc;
16use std::task::{Context, Poll};
17
18struct BufferedNotifyInner<N: DriverNotify> {
19    notify: N,
20    was_notified: AtomicBool,
21}
22
23impl<N: DriverNotify> Drop for BufferedNotifyInner<N> {
24    fn drop(&mut self) {
25        self.flush()
26    }
27}
28
29impl<N: DriverNotify> BufferedNotifyInner<N> {
30    fn flush(&self) {
31        if self.was_notified.swap(false, Ordering::Relaxed) {
32            self.notify.notify();
33        }
34    }
35}
36
37/// Buffering wrapper for [`DriverNotify`]
38///
39/// Typically notifying a driver is an expensive operation and when processing large numbers of
40/// chains a device may wish to trade of latency for throughput and delay submitting a notification.
41/// [`BufferedNotify`] implements [`DriverNotify`], but just stores the notification locally until
42/// the [`BufferedNotify::flush`] method is called, which then calls the underlying
43/// [`DriverNotify::notify`].
44///
45/// Any outstanding notifications will automatically be flushed on `drop`.
46///
47/// The [`BufferedNotify`] uses an [`Arc`] internally and so can be [`Clone`] to easily give to
48/// multiple queues.
49pub struct BufferedNotify<N: DriverNotify>(Arc<BufferedNotifyInner<N>>);
50
51impl<N: DriverNotify> Clone for BufferedNotify<N> {
52    fn clone(&self) -> BufferedNotify<N> {
53        BufferedNotify(self.0.clone())
54    }
55}
56
57impl<N: DriverNotify> BufferedNotify<N> {
58    /// Construct a [`BufferedNotify`] wrapping a [`DriverNotify`]
59    pub fn new(notify: N) -> BufferedNotify<N> {
60        BufferedNotify(Arc::new(BufferedNotifyInner {
61            notify,
62            was_notified: AtomicBool::new(false),
63        }))
64    }
65
66    /// Flush any pending notification.
67    ///
68    /// If this [`BufferedNotify`] has been [`notify`](#notify) since the last call to `flush`,
69    /// calls [`DriverNotify::notify`] on the wrapped [`DriverNotify`].
70    pub fn flush(&self) {
71        self.0.flush()
72    }
73}
74
75impl<N: DriverNotify> DriverNotify for BufferedNotify<N> {
76    fn notify(&self) {
77        self.0.was_notified.store(true, Ordering::Relaxed);
78    }
79}
80
81/// Counting version of [`DriverNotify`]
82///
83/// [`NotificationCounter`] is largely aimed at writing unit tests and it just records how many
84/// times it has been notified, providing an interface to [`NotificationCounter::get`] and [reset]
85/// (NotificationCounter::set) the count.
86#[derive(Debug, Clone)]
87pub struct NotificationCounter {
88    notify_count: Arc<AtomicU32>,
89}
90
91impl DriverNotify for NotificationCounter {
92    fn notify(&self) {
93        self.notify_count.fetch_add(1, Ordering::SeqCst);
94    }
95}
96
97impl NotificationCounter {
98    /// Construct a new [`NotificationCounter`]
99    pub fn new() -> NotificationCounter {
100        NotificationCounter { notify_count: Arc::new(AtomicU32::new(0)) }
101    }
102
103    /// Retrieve the current notification count.
104    pub fn get(&self) -> u32 {
105        self.notify_count.load(Ordering::SeqCst)
106    }
107
108    /// Set the stored notification count to the given value.
109    pub fn set(&self, val: u32) {
110        self.notify_count.store(val, Ordering::SeqCst)
111    }
112}
113
114/// Async [`Stream`] implementation that resolves [`DescChain`]
115///
116/// This allows for treating a [`Queue`] as something that asynchronously produces [`DescChain`].
117/// As this library has no knowledge of the underlying virtio transport the user is still
118/// responsible for hooking up the provided [`DescChainStream::waker`] for the stream to function
119/// correctly.
120pub struct DescChainStream<'a, 'b, N> {
121    queue: &'b Queue<'a, N>,
122    task: Arc<AtomicWaker>,
123}
124impl<'a, 'b, N> DescChainStream<'a, 'b, N> {
125    /// Create a [`Stream`] for a [`Queue`].
126    ///
127    /// The produced [`DescChainStream`] must have its [`waker`](#waker) signaled by the virtio
128    /// transport that receives guest notifications, otherwise the stream will not work.
129    pub fn new(queue: &'b Queue<'a, N>) -> DescChainStream<'a, 'b, N> {
130        DescChainStream { queue, task: Arc::new(AtomicWaker::new()) }
131    }
132
133    /// Retrieve the internal [`AtomicWaker`].
134    ///
135    /// This should be signaled by the virtio transport when a guest notification is received for
136    /// the underlying queue.
137    pub fn waker(&self) -> Arc<AtomicWaker> {
138        self.task.clone()
139    }
140}
141
142impl<'a, 'b, N: DriverNotify> Stream for DescChainStream<'a, 'b, N> {
143    type Item = DescChain<'a, 'b, N>;
144    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
145        if let Some(desc) = self.queue.next_chain() {
146            return Poll::Ready(Some(desc));
147        }
148        self.task.register(cx.waker());
149        match self.queue.next_chain() {
150            Some(desc) => Poll::Ready(Some(desc)),
151            None => Poll::Pending,
152        }
153    }
154}