watch_handler/
lib.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6
7//! A common handler for hanging_gets
8
9use thiserror::Error;
10
11/// Function used to determine whether a change should cause any parked watchers to return.
12type ChangeFunction<T> = Box<dyn Fn(&T, &T) -> bool + Send + Sync + 'static>;
13
14/// Trait that should be implemented to send data to the hanging get watcher.
15pub trait Sender<T> {
16    /// Should send a response immediately to the sender.
17    fn send_response(self, data: T);
18}
19
20/// Default change function that checks for equality
21pub fn equality_change_function<T: PartialEq>() -> ChangeFunction<T> {
22    Box::new(|old: &T, new: &T| old != new)
23}
24
25/// Handler for hanging gets.
26/// The common way to use this is to implement the Sender<T> trait for the FIDL responder that the handler is used for.
27/// There should be one instance of HangingGetHandler per interface per connection, as the handler needs to keep track
28/// of state per connection.
29pub struct WatchHandler<T, ST> {
30    /// The callback to be parked in the handler.
31    /// There can only be one.
32    watch_responder: Option<ST>,
33    /// Represents the current state of the system.
34    current_value: Option<T>,
35    /// The last value that was sent to the client.
36    last_sent_value: Option<T>,
37    /// Function called on change. If function returns true, tells the handler that it should send to the hanging get.
38    change_function: ChangeFunction<T>,
39}
40
41impl<T, ST> WatchHandler<T, ST>
42where
43    T: Clone + PartialEq + 'static,
44    ST: Sender<T> + 'static,
45{
46    /// Creates a new instance of WatchHandler. If |initial_value| is provided, it will return
47    /// immediately on first watch. Otherwise, the first watch returns when the value is set for
48    /// the first time.
49    /// Uses default change function which just checks for any change.
50    pub fn create(initial_value: Option<T>) -> Self {
51        Self::create_with_change_fn(equality_change_function(), initial_value)
52    }
53}
54
55impl<T, ST> WatchHandler<T, ST>
56where
57    T: Clone + 'static,
58    ST: Sender<T> + 'static,
59{
60    /// Creates a new instance of WatchHandler. If |initial_value| is provided, it will return
61    /// immediately on first watch. Otherwise, the first watch returns when the value is set for
62    /// the first time.
63    pub fn create_with_change_fn(
64        change_function: ChangeFunction<T>,
65        initial_value: Option<T>,
66    ) -> Self {
67        Self {
68            watch_responder: None,
69            last_sent_value: None,
70            current_value: initial_value,
71            change_function: change_function,
72        }
73    }
74
75    /// Park a new hanging get in the handler. If a hanging get is already parked, returns the
76    /// new responder.
77    pub fn watch(&mut self, responder: ST) -> Result<(), WatchError<ST>> {
78        if let None = self.watch_responder {
79            self.watch_responder = Some(responder);
80            self.send_if_needed();
81            Ok(())
82        } else {
83            Err(WatchError { responder })
84        }
85    }
86
87    /// Sets a new change function.
88    /// The hanging get will only return when the change function evaluates to true when comparing the last value sent to
89    /// the client and the current value. Takes effect immediately; if change function evaluates to true then the pending
90    /// responder will be called.
91    pub fn set_change_function(&mut self, change_function: ChangeFunction<T>) {
92        self.change_function = change_function;
93        self.send_if_needed();
94    }
95
96    /// Called to update the current value of the handler, sending changes using the watcher if needed.
97    pub fn set_value(&mut self, new_value: T) {
98        self.current_value = Some(new_value);
99        self.send_if_needed();
100    }
101
102    /// Called when receiving a notification that value has changed.
103    fn send_if_needed(&mut self) {
104        let value_to_send = match (self.last_sent_value.as_ref(), self.current_value.as_ref()) {
105            (Some(last), Some(current)) if (self.change_function)(last, current) => Some(current),
106            (Some(_), Some(_)) => None,
107            (None, Some(current)) => Some(current),
108            (_, None) => None,
109        };
110
111        if let Some(value) = value_to_send {
112            if let Some(responder) = self.watch_responder.take() {
113                responder.send_response(value.clone());
114                self.last_sent_value = Some(value.clone());
115            }
116        }
117    }
118}
119
120/// Error returned if watch fails.
121#[derive(Error)]
122#[error("Inconsistent state; existing handler in state")]
123pub struct WatchError<ST> {
124    /// The responder that could not be parked.
125    pub responder: ST,
126}
127
128impl<ST> std::fmt::Debug for WatchError<ST> {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
130        write!(f, "{}", self)
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use std::cell::RefCell;
138    use std::rc::Rc;
139
140    const ID_INVALID: i32 = 0;
141    const ID1: i32 = 1;
142    const ID2: i32 = 2;
143    const ID3: i32 = 3;
144    const ID4: i32 = 4;
145
146    #[derive(Clone, PartialEq)]
147    struct TestStruct {
148        id: i32,
149    }
150
151    #[derive(Debug, PartialEq)]
152    struct TestSender {
153        sent_value: Rc<RefCell<i32>>,
154    }
155
156    impl Sender<TestStruct> for TestSender {
157        fn send_response(self, data: TestStruct) {
158            self.sent_value.replace(data.id);
159        }
160    }
161
162    #[test]
163    fn test_watch() {
164        let mut handler =
165            WatchHandler::<TestStruct, TestSender>::create(Some(TestStruct { id: ID1 }));
166
167        let sent_value = Rc::new(RefCell::new(ID_INVALID));
168        handler.watch(TestSender { sent_value: sent_value.clone() }).unwrap();
169
170        // First call should return immediately
171        assert_eq!((*sent_value.borrow()), ID1);
172
173        let sent_value = Rc::new(RefCell::new(ID_INVALID));
174        handler.watch(TestSender { sent_value: sent_value.clone() }).unwrap();
175
176        // Second call doesn't return because value hasn't changed
177        assert_eq!((*sent_value.borrow()), ID_INVALID);
178
179        handler.set_value(TestStruct { id: ID2 });
180
181        // When value changes, returns immediately
182        assert_eq!((*sent_value.borrow()), ID2);
183
184        handler.set_value(TestStruct { id: ID3 });
185
186        let sent_value = Rc::new(RefCell::new(ID_INVALID));
187        handler.watch(TestSender { sent_value: sent_value.clone() }).unwrap();
188
189        // Call after change also returns immediately
190        assert_eq!((*sent_value.borrow()), ID3);
191    }
192
193    #[test]
194    fn test_watch_no_initial() {
195        let mut handler = WatchHandler::<TestStruct, TestSender>::create(None);
196
197        let sent_value = Rc::new(RefCell::new(ID_INVALID));
198        handler.watch(TestSender { sent_value: sent_value.clone() }).unwrap();
199
200        // First call does not return until value is set
201        assert_eq!((*sent_value.borrow()), ID_INVALID);
202
203        handler.set_value(TestStruct { id: ID1 });
204        assert_eq!((*sent_value.borrow()), ID1);
205
206        let mut handler = WatchHandler::<TestStruct, TestSender>::create(None);
207        handler.set_value(TestStruct { id: ID2 });
208
209        let sent_value = Rc::new(RefCell::new(ID_INVALID));
210        handler.watch(TestSender { sent_value: sent_value.clone() }).unwrap();
211
212        // First call returns immediately if value is already set.
213        assert_eq!((*sent_value.borrow()), ID2);
214    }
215
216    #[test]
217    fn test_watch_fails() {
218        let mut handler =
219            WatchHandler::<TestStruct, TestSender>::create(Some(TestStruct { id: ID1 }));
220
221        let sent_value = Rc::new(RefCell::new(ID_INVALID));
222        // first watch returns immediately
223        handler.watch(TestSender { sent_value: sent_value.clone() }).unwrap();
224
225        let sent_value = Rc::new(RefCell::new(ID_INVALID));
226        // second watch hangs
227        handler.watch(TestSender { sent_value: sent_value.clone() }).unwrap();
228
229        let sent_value = Rc::new(RefCell::new(ID4));
230        // third results in an error
231        let result = handler.watch(TestSender { sent_value: sent_value.clone() });
232
233        assert_eq!(result.unwrap_err().responder, TestSender { sent_value: sent_value.clone() });
234    }
235
236    #[test]
237    fn test_watch_with_change_function() {
238        let mut handler = WatchHandler::<TestStruct, TestSender>::create_with_change_fn(
239            equality_change_function(),
240            Some(TestStruct { id: ID1 }),
241        );
242        let sent_value = Rc::new(RefCell::new(ID_INVALID));
243
244        handler.set_change_function(Box::new(|_old, _new| false));
245        handler.watch(TestSender { sent_value: sent_value.clone() }).unwrap();
246
247        // first watch should return immediately regardless
248        assert_eq!((*sent_value.borrow()), ID1);
249
250        let sent_value = Rc::new(RefCell::new(ID_INVALID));
251        handler.set_change_function(Box::new(|old, new| new.id - old.id > 1));
252        handler.watch(TestSender { sent_value: sent_value.clone() }).unwrap();
253        handler.set_value(TestStruct { id: ID2 });
254
255        // If change function returns false, will not return
256        assert_eq!((*sent_value.borrow()), ID_INVALID);
257
258        handler.set_value(TestStruct { id: ID3 });
259
260        // But subsequent change that satsifies change function will cause return
261        assert_eq!((*sent_value.borrow()), ID3);
262
263        // Setting the change function with a pending watch should cause sender to send.
264        let sent_value = Rc::new(RefCell::new(ID_INVALID));
265        handler.set_change_function(Box::new(|_old, _new| false));
266        handler.watch(TestSender { sent_value: sent_value.clone() }).unwrap();
267        handler.set_value(TestStruct { id: ID4 });
268
269        assert_eq!((*sent_value.borrow()), ID_INVALID);
270
271        handler.set_change_function(Box::new(|_old, _new| true));
272        assert_eq!((*sent_value.borrow()), ID4);
273    }
274
275    #[test]
276    fn test_watch_with_change_fn_no_initial() {
277        let mut handler = WatchHandler::<TestStruct, TestSender>::create_with_change_fn(
278            Box::new(|_old, _new| false),
279            None,
280        );
281
282        let sent_value = Rc::new(RefCell::new(ID_INVALID));
283        handler.watch(TestSender { sent_value: sent_value.clone() }).unwrap();
284
285        // First call does not return until value is set
286        assert_eq!((*sent_value.borrow()), ID_INVALID);
287
288        // First value returns after set regardless of change fn
289        handler.set_value(TestStruct { id: ID1 });
290        assert_eq!((*sent_value.borrow()), ID1);
291
292        let mut handler = WatchHandler::<TestStruct, TestSender>::create_with_change_fn(
293            Box::new(|_old, _new| false),
294            None,
295        );
296        handler.set_value(TestStruct { id: ID2 });
297
298        let sent_value = Rc::new(RefCell::new(ID_INVALID));
299        handler.watch(TestSender { sent_value: sent_value.clone() }).unwrap();
300
301        // First call returns immediately if value is already set regardless of change fn
302        assert_eq!((*sent_value.borrow()), ID2);
303    }
304}