overnet_core/
future_help.rs
1use anyhow::Error;
6use async_lock::RwLock;
7use event_listener::Event;
8use futures::prelude::*;
9use std::sync::{Arc, Weak};
10
11pub async fn log_errors(
13 f: impl Send + Future<Output = Result<(), Error>>,
14 message: impl std::fmt::Display,
15) {
16 if let Err(e) = f.await {
17 log::warn!("{}: {:?}", message, e);
18 }
19}
20
21struct Stamped<T> {
22 current: T,
23 stamp: u64,
24}
25
26struct ObservableState<T> {
27 value: RwLock<Stamped<T>>,
28 next_version: Event,
29}
30
31impl<T> Drop for ObservableState<T> {
32 fn drop(&mut self) {
33 self.next_version.notify_relaxed(usize::MAX);
34 }
35}
36
37pub struct Observable<T> {
38 state: Arc<ObservableState<T>>,
39}
40
41pub struct Observer<T: 'static> {
42 state: Weak<ObservableState<T>>,
43 observed: u64,
44}
45
46impl<T: std::fmt::Debug> Observable<T> {
51 pub fn new(current: T) -> Self {
52 Self {
53 state: Arc::new(ObservableState {
54 value: RwLock::new(Stamped { current, stamp: 1 }),
55 next_version: Event::new(),
56 }),
57 }
58 }
59
60 pub fn new_observer(&self) -> Observer<T> {
61 Observer { state: Arc::downgrade(&self.state), observed: 0 }
62 }
63
64 async fn maybe_mutate(&self, f: impl FnOnce(&mut T) -> bool) -> bool {
65 let mut lock = self.state.value.write().await;
66 let changed = f(&mut lock.current);
67 if changed {
68 lock.stamp += 1;
69 self.state.next_version.notify_relaxed(usize::MAX);
70 }
71 changed
72 }
73
74 #[cfg(test)]
75 async fn edit(&self, f: impl FnOnce(&mut T)) {
76 self.maybe_mutate(move |v| {
77 f(v);
78 true
79 })
80 .await;
81 }
82
83 #[cfg(test)]
84 pub async fn push(&self, new: T) {
85 self.edit(|current| *current = new).await
86 }
87
88 pub async fn maybe_push(&self, new: T) -> bool
89 where
90 T: std::cmp::PartialEq,
91 {
92 self.maybe_mutate(move |current| {
93 let change = *current != new;
94 if change {
95 *current = new;
96 }
97 change
98 })
99 .await
100 }
101}
102
103impl<T: Clone + std::fmt::Debug> Observer<T> {
104 pub async fn next(&mut self) -> Option<T> {
105 while let Some(state) = Weak::upgrade(&self.state) {
106 let lock = state.value.read().await;
107 if lock.stamp != self.observed {
108 self.observed = lock.stamp;
109 return Some(lock.current.clone());
110 }
111 let next_version = state.next_version.listen();
112 drop(lock);
113 drop(state);
114 next_version.await;
115 }
116 None
117 }
118}
119
120#[cfg(test)]
121mod test {
122 use super::*;
123
124 #[fuchsia::test]
125 async fn observable_basics() {
126 let observable = Observable::new(1);
127 let mut observer = observable.new_observer();
128 assert_eq!(observer.next().await, Some(1));
129 observable.push(2).await;
130 assert_eq!(observer.next().await, Some(2));
131 observable.push(3).await;
132 observable.push(4).await;
133 assert_eq!(observer.next().await, Some(4));
134 drop(observable);
135 assert_eq!(observer.next().await, None);
136 }
137}