overnet_core/
future_help.rsuse anyhow::Error;
use async_lock::RwLock;
use event_listener::Event;
use futures::prelude::*;
use std::sync::{Arc, Weak};
pub async fn log_errors(
f: impl Send + Future<Output = Result<(), Error>>,
message: impl std::fmt::Display,
) {
if let Err(e) = f.await {
tracing::warn!("{}: {:?}", message, e);
}
}
struct Stamped<T> {
current: T,
stamp: u64,
}
struct ObservableState<T> {
value: RwLock<Stamped<T>>,
next_version: Event,
}
impl<T> Drop for ObservableState<T> {
fn drop(&mut self) {
self.next_version.notify_relaxed(usize::MAX);
}
}
pub struct Observable<T> {
state: Arc<ObservableState<T>>,
}
pub struct Observer<T: 'static> {
state: Weak<ObservableState<T>>,
observed: u64,
}
impl<T: std::fmt::Debug> Observable<T> {
pub fn new(current: T) -> Self {
Self {
state: Arc::new(ObservableState {
value: RwLock::new(Stamped { current, stamp: 1 }),
next_version: Event::new(),
}),
}
}
pub fn new_observer(&self) -> Observer<T> {
Observer { state: Arc::downgrade(&self.state), observed: 0 }
}
async fn maybe_mutate(&self, f: impl FnOnce(&mut T) -> bool) -> bool {
let mut lock = self.state.value.write().await;
let changed = f(&mut lock.current);
if changed {
lock.stamp += 1;
self.state.next_version.notify_relaxed(usize::MAX);
}
changed
}
#[cfg(test)]
async fn edit(&self, f: impl FnOnce(&mut T)) {
self.maybe_mutate(move |v| {
f(v);
true
})
.await;
}
#[cfg(test)]
pub async fn push(&self, new: T) {
self.edit(|current| *current = new).await
}
pub async fn maybe_push(&self, new: T) -> bool
where
T: std::cmp::PartialEq,
{
self.maybe_mutate(move |current| {
let change = *current != new;
if change {
*current = new;
}
change
})
.await
}
}
impl<T: Clone + std::fmt::Debug> Observer<T> {
pub async fn next(&mut self) -> Option<T> {
while let Some(state) = Weak::upgrade(&self.state) {
let lock = state.value.read().await;
if lock.stamp != self.observed {
self.observed = lock.stamp;
return Some(lock.current.clone());
}
let next_version = state.next_version.listen();
drop(lock);
drop(state);
next_version.await;
}
None
}
}
#[cfg(test)]
mod test {
use super::*;
#[fuchsia::test]
async fn observable_basics() {
let observable = Observable::new(1);
let mut observer = observable.new_observer();
assert_eq!(observer.next().await, Some(1));
observable.push(2).await;
assert_eq!(observer.next().await, Some(2));
observable.push(3).await;
observable.push(4).await;
assert_eq!(observer.next().await, Some(4));
drop(observable);
assert_eq!(observer.next().await, None);
}
}