overnet_core/
future_help.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Error;
6use async_lock::RwLock;
7use event_listener::Event;
8use futures::prelude::*;
9use std::sync::{Arc, Weak};
10
11/// Takes a future that returns an error, and transforms it to a future that logs said error
12pub async fn log_errors(
13    f: impl Send + Future<Output = Result<(), Error>>,
14    message: impl std::fmt::Display,
15) {
16    if let Err(e) = f.await {
17        log::warn!("{}: {:?}", message, e);
18    }
19}
20
21struct Stamped<T> {
22    current: T,
23    stamp: u64,
24}
25
26struct ObservableState<T> {
27    value: RwLock<Stamped<T>>,
28    next_version: Event,
29}
30
31impl<T> Drop for ObservableState<T> {
32    fn drop(&mut self) {
33        self.next_version.notify_relaxed(usize::MAX);
34    }
35}
36
37pub struct Observable<T> {
38    state: Arc<ObservableState<T>>,
39}
40
41pub struct Observer<T: 'static> {
42    state: Weak<ObservableState<T>>,
43    observed: u64,
44}
45
46/// Asynchronously observable item... observers will be notified on changes, and see a None when
47/// the Observable blinks out of existance.
48/// If an Observer misses an edit, that Observer never sees the edit (i.e. Observer's only see the
49/// most recent change).
50impl<T: std::fmt::Debug> Observable<T> {
51    pub fn new(current: T) -> Self {
52        Self {
53            state: Arc::new(ObservableState {
54                value: RwLock::new(Stamped { current, stamp: 1 }),
55                next_version: Event::new(),
56            }),
57        }
58    }
59
60    pub fn new_observer(&self) -> Observer<T> {
61        Observer { state: Arc::downgrade(&self.state), observed: 0 }
62    }
63
64    async fn maybe_mutate(&self, f: impl FnOnce(&mut T) -> bool) -> bool {
65        let mut lock = self.state.value.write().await;
66        let changed = f(&mut lock.current);
67        if changed {
68            lock.stamp += 1;
69            self.state.next_version.notify_relaxed(usize::MAX);
70        }
71        changed
72    }
73
74    #[cfg(test)]
75    async fn edit(&self, f: impl FnOnce(&mut T)) {
76        self.maybe_mutate(move |v| {
77            f(v);
78            true
79        })
80        .await;
81    }
82
83    #[cfg(test)]
84    pub async fn push(&self, new: T) {
85        self.edit(|current| *current = new).await
86    }
87
88    pub async fn maybe_push(&self, new: T) -> bool
89    where
90        T: std::cmp::PartialEq,
91    {
92        self.maybe_mutate(move |current| {
93            let change = *current != new;
94            if change {
95                *current = new;
96            }
97            change
98        })
99        .await
100    }
101}
102
103impl<T: Clone + std::fmt::Debug> Observer<T> {
104    pub async fn next(&mut self) -> Option<T> {
105        while let Some(state) = Weak::upgrade(&self.state) {
106            let lock = state.value.read().await;
107            if lock.stamp != self.observed {
108                self.observed = lock.stamp;
109                return Some(lock.current.clone());
110            }
111            let next_version = state.next_version.listen();
112            drop(lock);
113            drop(state);
114            next_version.await;
115        }
116        None
117    }
118}
119
120#[cfg(test)]
121mod test {
122    use super::*;
123
124    #[fuchsia::test]
125    async fn observable_basics() {
126        let observable = Observable::new(1);
127        let mut observer = observable.new_observer();
128        assert_eq!(observer.next().await, Some(1));
129        observable.push(2).await;
130        assert_eq!(observer.next().await, Some(2));
131        observable.push(3).await;
132        observable.push(4).await;
133        assert_eq!(observer.next().await, Some(4));
134        drop(observable);
135        assert_eq!(observer.next().await, None);
136    }
137}