use crate::hanging_get::error::HangingGetServerError;
use core::hash::Hash;
use fuchsia_sync::Mutex;
use std::collections::HashMap;
use std::sync::Arc;
pub struct HangingGet<S, O, F: Fn(&S, O) -> bool> {
inner: Arc<Mutex<HangingGetInner<S, subscriber_key::Key, O, F>>>,
subscriber_key_generator: subscriber_key::Generator,
}
impl<S, O, F> HangingGet<S, O, F>
where
F: Fn(&S, O) -> bool,
{
pub fn new(state: S, notify: F) -> Self {
Self {
inner: Arc::new(Mutex::new(HangingGetInner::new(Some(state), notify))),
subscriber_key_generator: subscriber_key::Generator::default(),
}
}
pub fn new_unknown_state(notify: F) -> Self {
Self {
inner: Arc::new(Mutex::new(HangingGetInner::new(None, notify))),
subscriber_key_generator: subscriber_key::Generator::default(),
}
}
pub fn new_publisher(&self) -> Publisher<S, O, F> {
Publisher { inner: self.inner.clone() }
}
pub fn new_subscriber(&mut self) -> Subscriber<S, O, F> {
Subscriber { inner: self.inner.clone(), key: self.subscriber_key_generator.next().unwrap() }
}
}
pub struct Subscriber<S, O, F: Fn(&S, O) -> bool> {
inner: Arc<Mutex<HangingGetInner<S, subscriber_key::Key, O, F>>>,
key: subscriber_key::Key,
}
impl<S, O, F> Subscriber<S, O, F>
where
F: Fn(&S, O) -> bool,
{
pub fn register(&self, observation: O) -> Result<(), HangingGetServerError> {
self.inner.lock().subscribe(self.key, observation)
}
}
pub struct Publisher<S, O, F: Fn(&S, O) -> bool> {
inner: Arc<Mutex<HangingGetInner<S, subscriber_key::Key, O, F>>>,
}
impl<S, O, F: Fn(&S, O) -> bool> Clone for Publisher<S, O, F> {
fn clone(&self) -> Self {
Self { inner: self.inner.clone() }
}
}
impl<S, O, F> Publisher<S, O, F>
where
F: Fn(&S, O) -> bool,
{
pub fn set(&self, state: S) {
self.inner.lock().set(state)
}
pub fn update<UpdateFn>(&self, update: UpdateFn)
where
UpdateFn: FnOnce(&mut Option<S>) -> bool,
{
self.inner.lock().update(update)
}
}
pub struct HangingGetInner<S, K, O, F: Fn(&S, O) -> bool> {
state: Option<S>,
notify: F,
observers: HashMap<K, Window<O>>,
}
impl<S, K, O, F> HangingGetInner<S, K, O, F>
where
K: Eq + Hash,
F: Fn(&S, O) -> bool,
{
fn notify_all(&mut self) {
for window in self.observers.values_mut() {
window.notify(&self.notify, self.state.as_ref().unwrap());
}
}
pub fn new(state: Option<S>, notify: F) -> Self {
Self { state, notify, observers: HashMap::new() }
}
pub fn set(&mut self, state: S) {
self.state = Some(state);
self.notify_all();
}
pub fn update(&mut self, state_update: impl FnOnce(&mut Option<S>) -> bool) {
if state_update(&mut self.state) {
self.notify_all();
}
}
pub fn subscribe(&mut self, key: K, observer: O) -> Result<(), HangingGetServerError> {
let entry = self.observers.entry(key).or_insert_with(Window::new);
entry.observe(observer, &self.notify, self.state.as_ref())
}
pub fn unsubscribe(&mut self, key: K) {
drop(self.observers.remove(&key));
}
}
struct Window<O> {
dirty: bool,
observer: Option<O>,
}
impl<O> Window<O> {
pub fn new() -> Self {
Window { dirty: true, observer: None }
}
pub fn observe<S>(
&mut self,
observer: O,
f: impl Fn(&S, O) -> bool,
current_state: Option<&S>,
) -> Result<(), HangingGetServerError> {
if self.observer.is_some() {
return Err(HangingGetServerError::MultipleObservers);
}
self.observer = Some(observer);
if let Some(current_state) = current_state {
if self.dirty {
self.notify(f, current_state);
}
}
Ok(())
}
pub fn notify<S>(&mut self, f: impl Fn(&S, O) -> bool, state: &S) {
match self.observer.take() {
Some(observer) => {
if f(state, observer) {
self.dirty = false;
}
}
None => self.dirty = true,
}
}
}
mod subscriber_key {
pub struct Generator {
next: Key,
}
impl Default for Generator {
fn default() -> Self {
Self { next: Key(0) }
}
}
impl Generator {
pub fn next(&mut self) -> Option<Key> {
let key = self.next.clone();
if let Some(next) = self.next.0.checked_add(1) {
self.next.0 = next;
Some(key)
} else {
None
}
}
}
#[derive(PartialEq, Eq, Hash, Debug, Clone, Copy)]
pub struct Key(u64);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hanging_get::test_util::TestObserver;
use crate::PollExt;
use fuchsia_async as fasync;
use futures::channel::oneshot;
#[test]
fn subscriber_key_generator_creates_unique_keys() {
let mut gen = subscriber_key::Generator::default();
let key1 = gen.next();
let key2 = gen.next();
assert!(key1 != key2);
}
#[test]
fn window_add_first_observer_notifies() {
let state = 0;
let mut window = Window::new();
window
.observe(TestObserver::expect_value(state), TestObserver::observe, Some(&state))
.unwrap();
}
#[test]
fn window_none_state_does_not_notify() {
let mut window = Window::new();
window
.observe::<i32>(TestObserver::expect_no_value(), TestObserver::observe, None)
.unwrap();
}
#[test]
fn window_add_second_observer_does_not_notify() {
let state = 0;
let mut window = Window::new();
window
.observe(TestObserver::expect_value(state), TestObserver::observe, Some(&state))
.unwrap();
window
.observe(TestObserver::expect_no_value(), TestObserver::observe, Some(&state))
.unwrap();
}
#[test]
fn window_add_second_observer_notifies_after_notify_call() {
let mut state = 0;
let mut window = Window::new();
window
.observe(TestObserver::expect_value(state), TestObserver::observe, Some(&state))
.unwrap();
state = 1;
window.notify(TestObserver::observe, &state);
window
.observe(TestObserver::expect_value(state), TestObserver::observe, Some(&state))
.unwrap();
}
#[test]
fn window_add_multiple_observers_are_notified() {
let mut state = 0;
let mut window = Window::new();
window
.observe(TestObserver::expect_value(state), TestObserver::observe, Some(&state))
.unwrap();
let o1 = TestObserver::expect_value(1);
let o2 = TestObserver::expect_no_value();
window.observe(o1.clone(), TestObserver::observe, Some(&state)).unwrap();
let result = window.observe(o2.clone(), TestObserver::observe, Some(&state));
assert_eq!(result, Err(HangingGetServerError::MultipleObservers));
assert!(!o1.has_value());
state = 1;
window.notify(TestObserver::observe, &state);
}
#[test]
fn window_dirty_flag_state() {
let state = 0;
let mut window = Window::new();
let o = TestObserver::expect_value(state);
window.observe(o, TestObserver::observe, Some(&state)).unwrap();
assert!(window.observer.is_none());
assert!(!window.dirty);
window.notify(TestObserver::observe, &state);
assert!(window.dirty);
let o = TestObserver::expect_value(state);
window.observe(o, TestObserver::observe, Some(&state)).unwrap();
assert!(!window.dirty);
}
#[test]
fn window_dirty_flag_respects_consumed_flag() {
let state = 0;
let mut window = Window::new();
let o = TestObserver::expect_value(state);
window.observe(o, TestObserver::observe_incomplete, Some(&state)).unwrap();
assert!(window.dirty);
}
#[test]
fn hanging_get_inner_subscribe() {
let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
let o = TestObserver::expect_value(0);
assert!(!o.has_value());
hanging.subscribe(0, o.clone()).unwrap();
}
#[test]
fn hanging_get_inner_subscribe_then_set() {
let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
let o = TestObserver::expect_value(0);
hanging.subscribe(0, o.clone()).unwrap();
hanging.set(1);
}
#[test]
fn hanging_get_inner_subscribe_twice_then_set() {
let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
hanging.set(1);
}
#[test]
fn hanging_get_inner_subscribe_multiple_then_set() {
let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
let o2 = TestObserver::expect_value(1);
hanging.subscribe(0, o2.clone()).unwrap();
assert!(!o2.has_value());
let _ = hanging.subscribe(0, TestObserver::expect_no_value()).unwrap_err();
hanging.set(1);
}
#[test]
fn hanging_get_inner_subscribe_with_two_clients_then_set() {
let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
hanging.subscribe(1, TestObserver::expect_value(0)).unwrap();
hanging.subscribe(1, TestObserver::expect_value(1)).unwrap();
hanging.set(1);
}
#[test]
fn hanging_get_inner_unsubscribe() {
let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
hanging.subscribe(0, TestObserver::expect_no_value()).unwrap();
hanging.unsubscribe(0);
hanging.set(1);
}
#[test]
fn hanging_get_inner_unsubscribe_one_of_many() {
let mut hanging = HangingGetInner::new(Some(0), TestObserver::observe);
hanging.subscribe(0, TestObserver::expect_value(0)).unwrap();
hanging.subscribe(0, TestObserver::expect_no_value()).unwrap();
hanging.subscribe(1, TestObserver::expect_value(0)).unwrap();
hanging.subscribe(1, TestObserver::expect_no_value()).unwrap();
hanging.unsubscribe(0);
assert!(!hanging.observers.contains_key(&0));
assert!(hanging.observers.contains_key(&1));
}
#[test]
fn hanging_get_inner_delayed_subscribe() {
let mut hanging = HangingGetInner::new(None, TestObserver::<u8>::observe);
let o = TestObserver::expect_no_value();
assert!(!o.has_value());
hanging.subscribe(0, o.clone()).unwrap();
}
#[test]
fn hanging_get_inner_delayed_subscribe_then_set() {
let mut hanging = HangingGetInner::new(None, TestObserver::observe);
let o = TestObserver::expect_value(1);
hanging.subscribe(0, o.clone()).unwrap();
hanging.set(1);
}
#[test]
fn hanging_get_inner_delayed_subscribe_twice_then_set() {
let mut hanging = HangingGetInner::new(None, TestObserver::observe);
hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
let result = hanging.subscribe(0, TestObserver::expect_no_value());
assert_eq!(result, Err(HangingGetServerError::MultipleObservers));
hanging.set(1);
}
#[test]
fn hanging_get_inner_delayed_subscribe_multiple_then_set() {
let mut hanging = HangingGetInner::new(None, TestObserver::observe);
hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
let o2 = TestObserver::expect_no_value();
let result = hanging.subscribe(0, o2.clone());
assert_eq!(result, Err(HangingGetServerError::MultipleObservers));
assert!(!o2.has_value());
let result = hanging.subscribe(0, TestObserver::expect_no_value());
assert_eq!(result, Err(HangingGetServerError::MultipleObservers));
hanging.set(1);
}
#[test]
fn hanging_get_inner_delayed_subscribe_with_two_clients_then_set() {
let mut hanging = HangingGetInner::new(None, TestObserver::observe);
hanging.subscribe(0, TestObserver::expect_value(1)).unwrap();
let result = hanging.subscribe(0, TestObserver::expect_no_value());
assert_eq!(result, Err(HangingGetServerError::MultipleObservers));
hanging.subscribe(1, TestObserver::expect_value(1)).unwrap();
let result = hanging.subscribe(1, TestObserver::expect_no_value());
assert_eq!(result, Err(HangingGetServerError::MultipleObservers));
hanging.set(1);
}
#[test]
fn hanging_get_inner_delayed_unsubscribe() {
let mut hanging = HangingGetInner::new(None, TestObserver::observe);
hanging.subscribe(0, TestObserver::expect_no_value()).unwrap();
let result = hanging.subscribe(0, TestObserver::expect_no_value());
assert_eq!(result, Err(HangingGetServerError::MultipleObservers));
hanging.unsubscribe(0);
hanging.set(1);
}
#[test]
fn hanging_get_inner_delayed_unsubscribe_one_of_many() {
let mut hanging = HangingGetInner::new(None, TestObserver::observe);
hanging.subscribe(0, TestObserver::<i32>::expect_no_value()).unwrap();
let result = hanging.subscribe(0, TestObserver::expect_no_value());
assert_eq!(result, Err(HangingGetServerError::MultipleObservers));
hanging.subscribe(1, TestObserver::expect_no_value()).unwrap();
let result = hanging.subscribe(1, TestObserver::expect_no_value());
assert_eq!(result, Err(HangingGetServerError::MultipleObservers));
hanging.unsubscribe(0);
assert!(!hanging.observers.contains_key(&0));
assert!(hanging.observers.contains_key(&1));
}
#[test]
fn sync_pub_sub_updates_and_observes() {
let mut ex = fasync::TestExecutor::new();
let mut broker = HangingGet::new(0i32, |s, o: oneshot::Sender<_>| {
o.send(s.clone()).map(|()| true).unwrap()
});
let publisher = broker.new_publisher();
let subscriber = broker.new_subscriber();
let (sender, mut receiver) = oneshot::channel();
subscriber.register(sender).unwrap();
let observation =
ex.run_until_stalled(&mut receiver).expect("received initial observation");
assert_eq!(observation, Ok(0));
let (sender, mut receiver) = oneshot::channel();
subscriber.register(sender).unwrap();
assert!(ex.run_until_stalled(&mut receiver).is_pending());
publisher.set(1);
let observation =
ex.run_until_stalled(&mut receiver).expect("received subsequent observation");
assert_eq!(observation, Ok(1));
}
#[test]
fn sync_pub_sub_multiple_subscribers() {
let mut ex = fasync::TestExecutor::new();
let mut broker = HangingGet::new(0i32, |s, o: oneshot::Sender<_>| {
o.send(s.clone()).map(|()| true).unwrap()
});
let publisher = broker.new_publisher();
let sub1 = broker.new_subscriber();
let sub2 = broker.new_subscriber();
let (sender, mut receiver) = oneshot::channel();
sub1.register(sender).unwrap();
let observation =
ex.run_until_stalled(&mut receiver).expect("received initial observation");
assert_eq!(observation, Ok(0));
let (sender, mut receiver) = oneshot::channel();
sub2.register(sender).unwrap();
let observation =
ex.run_until_stalled(&mut receiver).expect("received initial observation");
assert_eq!(observation, Ok(0));
let (sender, mut recv1) = oneshot::channel();
sub1.register(sender).unwrap();
assert!(ex.run_until_stalled(&mut recv1).is_pending());
let (sender, mut recv2) = oneshot::channel();
sub2.register(sender).unwrap();
assert!(ex.run_until_stalled(&mut recv2).is_pending());
publisher.set(1);
let obs1 =
ex.run_until_stalled(&mut recv1).expect("receiver 1 received subsequent observation");
assert_eq!(obs1, Ok(1));
let obs2 =
ex.run_until_stalled(&mut recv2).expect("receiver 2 received subsequent observation");
assert_eq!(obs2, Ok(1));
}
#[test]
fn sync_pub_sub_delayed_updates_and_observes() {
let mut ex = fasync::TestExecutor::new();
let mut broker = HangingGet::<i32, _, _>::new_unknown_state(|s, o: oneshot::Sender<_>| {
o.send(s.clone()).map(|()| true).unwrap()
});
let publisher = broker.new_publisher();
let subscriber = broker.new_subscriber();
let (sender, mut recv1) = oneshot::channel();
subscriber.register(sender).unwrap();
assert!(ex.run_until_stalled(&mut recv1).is_pending());
let (sender, mut receiver) = oneshot::channel();
assert!(subscriber.register(sender).is_err());
assert!(ex.run_until_stalled(&mut receiver).expect("sender closed").is_err());
publisher.set(1);
let observation =
ex.run_until_stalled(&mut recv1).expect("received subsequent observation");
assert_eq!(observation, Ok(1));
}
#[test]
fn sync_pub_sub_delayed_multiple_subscribers() {
let mut ex = fasync::TestExecutor::new();
let mut broker = HangingGet::<i32, _, _>::new_unknown_state(|s, o: oneshot::Sender<_>| {
o.send(s.clone()).map(|()| true).unwrap()
});
let publisher = broker.new_publisher();
let sub1 = broker.new_subscriber();
let sub2 = broker.new_subscriber();
let (sender, mut recv1) = oneshot::channel();
sub1.register(sender).unwrap();
assert!(ex.run_until_stalled(&mut recv1).is_pending());
let (sender, mut recv2) = oneshot::channel();
sub2.register(sender).unwrap();
assert!(ex.run_until_stalled(&mut recv2).is_pending());
let (sender, mut recv3) = oneshot::channel();
assert!(sub1.register(sender).is_err());
assert!(ex.run_until_stalled(&mut recv3).expect("sender 3 closed").is_err());
let (sender, mut recv4) = oneshot::channel();
assert!(sub2.register(sender).is_err());
assert!(ex.run_until_stalled(&mut recv4).expect("sender 4 closed").is_err());
publisher.set(1);
let obs1 =
ex.run_until_stalled(&mut recv1).expect("receiver 1 received subsequent observation");
assert_eq!(obs1, Ok(1));
let obs2 =
ex.run_until_stalled(&mut recv2).expect("receiver 2 received subsequent observation");
assert_eq!(obs2, Ok(1));
}
}