settings/handler/
setting_handler_factory_impl.rs
1use crate::base::SettingType;
5use crate::handler::base::{
6 Context, Environment, GenerateHandler, SettingHandlerFactory, SettingHandlerFactoryError,
7};
8use crate::handler::setting_handler::{Command, Payload, State};
9use crate::message::base::{Audience, MessageEvent, MessengerType};
10use crate::service;
11use crate::service::message::{Delegate, Signature};
12use crate::service_context::ServiceContext;
13use async_trait::async_trait;
14use futures::StreamExt;
15use std::collections::{HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::atomic::{AtomicU64, Ordering};
18
19pub(crate) struct SettingHandlerFactoryImpl {
22 environment: Environment,
23 generators: HashMap<SettingType, GenerateHandler>,
24
25 context_id_counter: Rc<AtomicU64>,
27}
28
29#[async_trait(?Send)]
30impl SettingHandlerFactory for SettingHandlerFactoryImpl {
31 async fn generate(
32 &mut self,
33 setting_type: SettingType,
34 delegate: Delegate,
35 notifier_signature: Signature,
36 ) -> Result<Signature, SettingHandlerFactoryError> {
37 if !self.environment.settings.contains(&setting_type) {
38 return Err(SettingHandlerFactoryError::SettingNotFound(setting_type));
39 }
40
41 let (messenger, receptor) = delegate
42 .create(MessengerType::Unbound)
43 .await
44 .map_err(|_| SettingHandlerFactoryError::HandlerMessengerError)?;
45 let signature = receptor.get_signature();
46
47 let generate_function = self
48 .generators
49 .get(&setting_type)
50 .ok_or(SettingHandlerFactoryError::GeneratorNotFound(setting_type))?;
51
52 (generate_function)(Context::new(
53 setting_type,
54 messenger,
55 receptor,
56 notifier_signature,
57 self.environment.clone(),
58 self.context_id_counter.fetch_add(1, Ordering::Relaxed),
59 ))
60 .await
61 .map_err(|e| {
62 SettingHandlerFactoryError::HandlerStartupError(
63 setting_type,
64 format!("Failed to generate setting handler: {e:?}").into(),
65 )
66 })?;
67
68 let (controller_messenger, _) = delegate
69 .create(MessengerType::Unbound)
70 .await
71 .map_err(|_| SettingHandlerFactoryError::ControllerMessengerError)?;
72
73 let mut controller_receptor = controller_messenger.message(
76 Payload::Command(Command::ChangeState(State::Startup)).into(),
77 Audience::Messenger(signature),
78 );
79
80 while let Some(message_event) = controller_receptor.next().await {
82 match message_event {
83 MessageEvent::Status(_) => {} MessageEvent::Message(service::Payload::Controller(Payload::Result(result)), _) => {
85 return result.map(|_| signature).map_err(|e| {
89 SettingHandlerFactoryError::HandlerStartupError(
90 setting_type,
91 format!("Got bad startup response: {e:?}").into(),
92 )
93 });
94 }
95 e => {
96 return Err(SettingHandlerFactoryError::HandlerStartupError(
97 setting_type,
98 format!("Unexpected startup response: {e:?}").into(),
99 ));
100 }
101 }
102 }
103
104 panic!("Did not get any responses from {setting_type:?} controller startup");
105 }
106}
107
108impl SettingHandlerFactoryImpl {
109 pub(crate) fn new(
110 settings: HashSet<SettingType>,
111 service_context: Rc<ServiceContext>,
112 context_id_counter: Rc<AtomicU64>,
113 ) -> SettingHandlerFactoryImpl {
114 SettingHandlerFactoryImpl {
115 environment: Environment::new(settings, service_context),
116 generators: HashMap::new(),
117 context_id_counter,
118 }
119 }
120
121 pub(crate) fn register(
122 &mut self,
123 setting_type: SettingType,
124 generate_function: GenerateHandler,
125 ) {
126 let _ = self.generators.insert(setting_type, generate_function);
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134 use crate::handler::base::Request;
135 use crate::handler::setting_handler::controller::{Create, Handle};
136 use crate::handler::setting_handler::{
137 BoxedController, ClientImpl, ControllerError, ControllerStateResult, SettingHandlerResult,
138 };
139 use fuchsia_async as fasync;
140 use futures::future::FutureExt;
141 use futures::select;
142
143 struct TestController {
145 client: Rc<ClientImpl>,
146 }
147
148 #[async_trait(?Send)]
149 impl Create for TestController {
150 async fn create(client: Rc<ClientImpl>) -> Result<Self, ControllerError> {
151 Ok(Self { client })
152 }
153 }
154
155 #[async_trait(?Send)]
156 impl Handle for TestController {
157 async fn handle(&self, _request: Request) -> Option<SettingHandlerResult> {
159 None
160 }
161
162 async fn change_state(&mut self, state: State) -> Option<ControllerStateResult> {
164 let _ = self.client.emit_state_event(state);
165 match state {
166 State::Startup => Some(Ok(())),
167 _ => None,
168 }
169 }
170 }
171
172 #[fasync::run_until_stalled(test)]
173 async fn ensure_startup_is_awaited() {
174 let delegate = service::MessageHub::create_hub();
175 let mut factory_impl = SettingHandlerFactoryImpl::new(
176 [SettingType::Unknown].into(),
177 Rc::new(ServiceContext::new(None, Some(delegate.clone()))),
178 Rc::new(AtomicU64::new(0)),
179 );
180
181 let generate_handler: GenerateHandler = Box::new(move |context| {
183 Box::pin(ClientImpl::create(
184 context,
185 Box::new(move |proxy| {
186 Box::pin(async move {
187 Ok(Box::new(TestController::create(proxy).await.unwrap())
188 as BoxedController)
189 })
190 }),
191 ))
192 });
193 factory_impl.register(SettingType::Unknown, generate_handler);
194
195 let (_, broker_receptor) =
197 delegate.create_sink().await.expect("could not create event sink receptor");
198
199 let (_, receptor) =
200 delegate.create(MessengerType::Unbound).await.expect("messenger should be created");
201 let mut generate_future = factory_impl
204 .generate(SettingType::Unknown, delegate, receptor.get_signature())
205 .into_stream()
206 .fuse();
207 let mut broker_receptor = broker_receptor.fuse();
208
209 let mut generate_done = None;
213 let mut startup_done = None;
214 let mut idx: u8 = 0;
215 while generate_done.is_none() || startup_done.is_none() {
216 select! {
217 maybe = broker_receptor.next() => {
218 let _ = maybe.expect("should have gotten a reply");
219 startup_done = Some(idx);
220 idx += 1;
221 }
222 result = generate_future.select_next_some() => {
223 let _ = result.expect("should have received a signature");
224 generate_done = Some(idx);
225 idx += 1;
226 }
227
228 complete => break,
229 }
230 }
231
232 assert!(generate_done.unwrap() > startup_done.unwrap());
235 }
236}