virtio_device/util.rs
1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Small utility wrappers and trait implementations.
6//!
7//! This contains some commonly useful wrappers and trait implementations that are obvious enough to
8//! warrant an implementation, but have an amount of policy that you may want to opt out of.
9
10use crate::queue::{DescChain, DriverNotify, Queue};
11use futures::task::AtomicWaker;
12use futures::Stream;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
15use std::sync::Arc;
16use std::task::{Context, Poll};
17
18struct BufferedNotifyInner<N: DriverNotify> {
19 notify: N,
20 was_notified: AtomicBool,
21}
22
23impl<N: DriverNotify> Drop for BufferedNotifyInner<N> {
24 fn drop(&mut self) {
25 self.flush()
26 }
27}
28
29impl<N: DriverNotify> BufferedNotifyInner<N> {
30 fn flush(&self) {
31 if self.was_notified.swap(false, Ordering::Relaxed) {
32 self.notify.notify();
33 }
34 }
35}
36
37/// Buffering wrapper for [`DriverNotify`]
38///
39/// Typically notifying a driver is an expensive operation and when processing large numbers of
40/// chains a device may wish to trade of latency for throughput and delay submitting a notification.
41/// [`BufferedNotify`] implements [`DriverNotify`], but just stores the notification locally until
42/// the [`BufferedNotify::flush`] method is called, which then calls the underlying
43/// [`DriverNotify::notify`].
44///
45/// Any outstanding notifications will automatically be flushed on `drop`.
46///
47/// The [`BufferedNotify`] uses an [`Arc`] internally and so can be [`Clone`] to easily give to
48/// multiple queues.
49pub struct BufferedNotify<N: DriverNotify>(Arc<BufferedNotifyInner<N>>);
50
51impl<N: DriverNotify> Clone for BufferedNotify<N> {
52 fn clone(&self) -> BufferedNotify<N> {
53 BufferedNotify(self.0.clone())
54 }
55}
56
57impl<N: DriverNotify> BufferedNotify<N> {
58 /// Construct a [`BufferedNotify`] wrapping a [`DriverNotify`]
59 pub fn new(notify: N) -> BufferedNotify<N> {
60 BufferedNotify(Arc::new(BufferedNotifyInner {
61 notify,
62 was_notified: AtomicBool::new(false),
63 }))
64 }
65
66 /// Flush any pending notification.
67 ///
68 /// If this [`BufferedNotify`] has been [`notify`](#notify) since the last call to `flush`,
69 /// calls [`DriverNotify::notify`] on the wrapped [`DriverNotify`].
70 pub fn flush(&self) {
71 self.0.flush()
72 }
73}
74
75impl<N: DriverNotify> DriverNotify for BufferedNotify<N> {
76 fn notify(&self) {
77 self.0.was_notified.store(true, Ordering::Relaxed);
78 }
79}
80
81/// Counting version of [`DriverNotify`]
82///
83/// [`NotificationCounter`] is largely aimed at writing unit tests and it just records how many
84/// times it has been notified, providing an interface to [`NotificationCounter::get`] and [reset]
85/// (NotificationCounter::set) the count.
86#[derive(Debug, Clone)]
87pub struct NotificationCounter {
88 notify_count: Arc<AtomicU32>,
89}
90
91impl DriverNotify for NotificationCounter {
92 fn notify(&self) {
93 self.notify_count.fetch_add(1, Ordering::SeqCst);
94 }
95}
96
97impl NotificationCounter {
98 /// Construct a new [`NotificationCounter`]
99 pub fn new() -> NotificationCounter {
100 NotificationCounter { notify_count: Arc::new(AtomicU32::new(0)) }
101 }
102
103 /// Retrieve the current notification count.
104 pub fn get(&self) -> u32 {
105 self.notify_count.load(Ordering::SeqCst)
106 }
107
108 /// Set the stored notification count to the given value.
109 pub fn set(&self, val: u32) {
110 self.notify_count.store(val, Ordering::SeqCst)
111 }
112}
113
114/// Async [`Stream`] implementation that resolves [`DescChain`]
115///
116/// This allows for treating a [`Queue`] as something that asynchronously produces [`DescChain`].
117/// As this library has no knowledge of the underlying virtio transport the user is still
118/// responsible for hooking up the provided [`DescChainStream::waker`] for the stream to function
119/// correctly.
120pub struct DescChainStream<'a, 'b, N> {
121 queue: &'b Queue<'a, N>,
122 task: Arc<AtomicWaker>,
123}
124impl<'a, 'b, N> DescChainStream<'a, 'b, N> {
125 /// Create a [`Stream`] for a [`Queue`].
126 ///
127 /// The produced [`DescChainStream`] must have its [`waker`](#waker) signaled by the virtio
128 /// transport that receives guest notifications, otherwise the stream will not work.
129 pub fn new(queue: &'b Queue<'a, N>) -> DescChainStream<'a, 'b, N> {
130 DescChainStream { queue, task: Arc::new(AtomicWaker::new()) }
131 }
132
133 /// Retrieve the internal [`AtomicWaker`].
134 ///
135 /// This should be signaled by the virtio transport when a guest notification is received for
136 /// the underlying queue.
137 pub fn waker(&self) -> Arc<AtomicWaker> {
138 self.task.clone()
139 }
140}
141
142impl<'a, 'b, N: DriverNotify> Stream for DescChainStream<'a, 'b, N> {
143 type Item = DescChain<'a, 'b, N>;
144 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
145 if let Some(desc) = self.queue.next_chain() {
146 return Poll::Ready(Some(desc));
147 }
148 self.task.register(cx.waker());
149 match self.queue.next_chain() {
150 Some(desc) => Poll::Ready(Some(desc)),
151 None => Poll::Pending,
152 }
153 }
154}