recovery_util/ota/
controller.rs1use crate::ota::state_machine::{Event, EventProcessor, StateHandler};
6use fuchsia_async::{self as fasync, Task};
7use futures::channel::mpsc;
8use futures::{SinkExt, StreamExt};
9#[cfg(test)]
10use mockall::automock;
11
12#[derive(Clone)]
13pub struct EventSender {
14 sender: mpsc::Sender<Event>,
15}
16
17impl EventSender {
18 pub fn new(sender: mpsc::Sender<Event>) -> Self {
19 Self { sender }
20 }
21}
22
23#[cfg_attr(test, automock)]
24pub trait SendEvent {
25 fn send(&mut self, event: Event);
26}
27
28impl SendEvent for EventSender {
29 fn send(&mut self, event: Event) {
30 let mut sender = self.sender.clone();
31 let event_clone = event.clone();
32 fasync::Task::local(async move {
33 if let Err(error) = sender.send(event).await {
34 eprintln!("Failed to send event {:?}: {}", event_clone, error);
35 }
36 })
37 .detach();
38 }
39 }
42
43#[cfg_attr(test, automock)]
44pub trait Controller {
45 fn add_event_observer(&mut self, sender: mpsc::Sender<Event>);
46 fn add_state_handler(&mut self, handler: Box<dyn StateHandler>);
47 fn get_event_sender(&self) -> EventSender;
48 fn start(&mut self, state_machine: Box<dyn EventProcessor>);
49}
50
51pub struct ControllerImpl {
52 sender: mpsc::Sender<Event>,
53 receiver: Option<mpsc::Receiver<Event>>,
54 state_handlers: Option<Vec<Box<dyn StateHandler>>>,
55 event_observers: Option<Vec<mpsc::Sender<Event>>>,
56}
57
58impl ControllerImpl {
59 pub fn new() -> Self {
60 let (sender, receiver) = mpsc::channel::<Event>(10);
61 Self {
62 sender,
63 receiver: Some(receiver),
64 state_handlers: Some(Vec::new()),
65 event_observers: Some(Vec::new()),
66 }
67 }
68}
69
70impl Controller for ControllerImpl {
71 fn add_event_observer(&mut self, observer: mpsc::Sender<Event>) {
72 if let Some(ref mut event_observers) = self.event_observers {
73 event_observers.push(observer);
74 }
75 }
76
77 fn add_state_handler(&mut self, handler: Box<dyn StateHandler>) {
78 if let Some(ref mut state_handlers) = self.state_handlers {
79 state_handlers.push(handler);
80 }
81 }
82
83 fn get_event_sender(&self) -> EventSender {
84 EventSender::new(self.sender.clone())
85 }
86
87 fn start(&mut self, mut state_machine: Box<dyn EventProcessor>) {
88 let mut receiver = self.receiver.take().unwrap();
89 let mut state_handlers = self.state_handlers.take().unwrap();
90 let mut event_observers = self.event_observers.take().unwrap();
91 let main_loop = async move {
92 loop {
93 match receiver.next().await {
94 Some(event) => {
95 let event_clone = event.clone();
96 for sender in event_observers.iter_mut() {
97 if let Err(e) = sender.send(event.clone()).await {
98 eprintln!("Error sending observer event: {:#?}", e);
99 }
100 }
101 let state = state_machine.process_event(event);
102 println!("Controller: event {:?} -> state {:?}", event_clone, state);
103 if let Some(state) = &state {
104 for state_handler in state_handlers.iter_mut() {
105 state_handler.handle_state(state.clone());
106 }
107 }
108 }
109 None => {
110 eprintln!("event reader returned: None")
111 }
112 }
113 }
114 };
115 Task::local(main_loop).detach();
116 }
117}
118
119#[cfg(test)]
120mod test {
121 use crate::ota::controller::{Controller, ControllerImpl, SendEvent};
122 use crate::ota::state_machine::{
123 Event, MockEventProcessor, MockStateHandler, State, StateHandler,
124 };
125 use assert_matches::assert_matches;
126 use fuchsia_async as fasync;
127 use futures::StreamExt;
128 use futures::channel::mpsc;
129 use mockall::predicate::*;
130
131 #[test]
132 fn send_states() {
133 let mut exec = fasync::TestExecutor::new();
134 let mut state_machine = MockEventProcessor::new();
135 state_machine
136 .expect_process_event()
137 .with(eq(Event::Cancel))
138 .return_const(Some(State::Home));
139
140 let mut state_handler = MockStateHandler::new();
141 state_handler.expect_handle_state().with(eq(State::Home)).times(1).return_const(());
142 let state_handler: Box<dyn StateHandler> = Box::new(state_handler);
143
144 let mut controller = ControllerImpl::new();
145 controller.add_state_handler(state_handler);
146 let mut event_sender: Box<dyn SendEvent> = Box::new(controller.get_event_sender());
147 controller.start(Box::new(state_machine));
148 event_sender.send(Event::Cancel);
149
150 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
151 }
152
153 #[fuchsia::test]
154 async fn test_observe_events() {
155 let (sender, mut receiver) = mpsc::channel::<Event>(5);
156 let mut state_machine = MockEventProcessor::new();
157 state_machine.expect_process_event().times(3).return_const(Some(State::Home));
158 let mut controller = ControllerImpl::new();
159 controller.add_event_observer(sender);
160 let mut event_sender: Box<dyn SendEvent> = Box::new(controller.get_event_sender());
161
162 controller.start(Box::new(state_machine));
163 event_sender.send(Event::Error("123".to_string()));
164 event_sender.send(Event::DebugLog("test".to_string()));
165 event_sender.send(Event::WiFiConnected);
166
167 assert_matches!(receiver.next().await.unwrap(), Event::Error(_));
168 assert_matches!(receiver.next().await.unwrap(), Event::DebugLog(_));
169 assert_matches!(receiver.next().await.unwrap(), Event::WiFiConnected);
170 }
171}