settings/handler/
setting_handler_factory_impl.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::base::SettingType;
5use crate::handler::base::{
6    Context, Environment, GenerateHandler, SettingHandlerFactory, SettingHandlerFactoryError,
7};
8use crate::handler::setting_handler::{Command, Payload, State};
9use crate::message::base::{Audience, MessageEvent, MessengerType};
10use crate::service;
11use crate::service::message::{Delegate, Signature};
12use crate::service_context::ServiceContext;
13use async_trait::async_trait;
14use futures::StreamExt;
15use std::collections::{HashMap, HashSet};
16use std::rc::Rc;
17use std::sync::atomic::{AtomicU64, Ordering};
18
19/// SettingHandlerFactoryImpl houses registered closures for generating setting
20/// handlers.
21pub(crate) struct SettingHandlerFactoryImpl {
22    environment: Environment,
23    generators: HashMap<SettingType, GenerateHandler>,
24
25    /// Atomic counter used to generate new IDs, which uniquely identify a context.
26    context_id_counter: Rc<AtomicU64>,
27}
28
29#[async_trait(?Send)]
30impl SettingHandlerFactory for SettingHandlerFactoryImpl {
31    async fn generate(
32        &mut self,
33        setting_type: SettingType,
34        delegate: Delegate,
35        notifier_signature: Signature,
36    ) -> Result<Signature, SettingHandlerFactoryError> {
37        if !self.environment.settings.contains(&setting_type) {
38            return Err(SettingHandlerFactoryError::SettingNotFound(setting_type));
39        }
40
41        let (messenger, receptor) = delegate
42            .create(MessengerType::Unbound)
43            .await
44            .map_err(|_| SettingHandlerFactoryError::HandlerMessengerError)?;
45        let signature = receptor.get_signature();
46
47        let generate_function = self
48            .generators
49            .get(&setting_type)
50            .ok_or(SettingHandlerFactoryError::GeneratorNotFound(setting_type))?;
51
52        (generate_function)(Context::new(
53            setting_type,
54            messenger,
55            receptor,
56            notifier_signature,
57            self.environment.clone(),
58            self.context_id_counter.fetch_add(1, Ordering::Relaxed),
59        ))
60        .await
61        .map_err(|e| {
62            SettingHandlerFactoryError::HandlerStartupError(
63                setting_type,
64                format!("Failed to generate setting handler: {e:?}").into(),
65            )
66        })?;
67
68        let (controller_messenger, _) = delegate
69            .create(MessengerType::Unbound)
70            .await
71            .map_err(|_| SettingHandlerFactoryError::ControllerMessengerError)?;
72
73        // At this point, we know the controller was constructed successfully.
74        // Tell the controller to run the Startup phase to initialize its state.
75        let mut controller_receptor = controller_messenger.message(
76            Payload::Command(Command::ChangeState(State::Startup)).into(),
77            Audience::Messenger(signature),
78        );
79
80        // Wait for the startup phase to be over before continuing.
81        while let Some(message_event) = controller_receptor.next().await {
82            match message_event {
83                MessageEvent::Status(_) => {} // no-op
84                MessageEvent::Message(service::Payload::Controller(Payload::Result(result)), _) => {
85                    // Startup phase is complete. If it had no errors the proxy can assume it
86                    // has an active controller with create() and startup() already run on it
87                    // before handling its request.
88                    return result.map(|_| signature).map_err(|e| {
89                        SettingHandlerFactoryError::HandlerStartupError(
90                            setting_type,
91                            format!("Got bad startup response: {e:?}").into(),
92                        )
93                    });
94                }
95                e => {
96                    return Err(SettingHandlerFactoryError::HandlerStartupError(
97                        setting_type,
98                        format!("Unexpected startup response: {e:?}").into(),
99                    ));
100                }
101            }
102        }
103
104        panic!("Did not get any responses from {setting_type:?} controller startup");
105    }
106}
107
108impl SettingHandlerFactoryImpl {
109    pub(crate) fn new(
110        settings: HashSet<SettingType>,
111        service_context: Rc<ServiceContext>,
112        context_id_counter: Rc<AtomicU64>,
113    ) -> SettingHandlerFactoryImpl {
114        SettingHandlerFactoryImpl {
115            environment: Environment::new(settings, service_context),
116            generators: HashMap::new(),
117            context_id_counter,
118        }
119    }
120
121    pub(crate) fn register(
122        &mut self,
123        setting_type: SettingType,
124        generate_function: GenerateHandler,
125    ) {
126        // Ignores any previously registered generator.
127        let _ = self.generators.insert(setting_type, generate_function);
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use crate::handler::base::Request;
135    use crate::handler::setting_handler::controller::{Create, Handle};
136    use crate::handler::setting_handler::{
137        BoxedController, ClientImpl, ControllerError, ControllerStateResult, SettingHandlerResult,
138    };
139    use fuchsia_async as fasync;
140    use futures::future::FutureExt;
141    use futures::select;
142
143    /// Test controller used to test startup waiting behavior in SettingHandlerFactoryImpl.
144    struct TestController {
145        client: Rc<ClientImpl>,
146    }
147
148    #[async_trait(?Send)]
149    impl Create for TestController {
150        async fn create(client: Rc<ClientImpl>) -> Result<Self, ControllerError> {
151            Ok(Self { client })
152        }
153    }
154
155    #[async_trait(?Send)]
156    impl Handle for TestController {
157        // Not relevant.
158        async fn handle(&self, _request: Request) -> Option<SettingHandlerResult> {
159            None
160        }
161
162        // When we see a startup message, send a signal to signify we're done.
163        async fn change_state(&mut self, state: State) -> Option<ControllerStateResult> {
164            let _ = self.client.emit_state_event(state);
165            match state {
166                State::Startup => Some(Ok(())),
167                _ => None,
168            }
169        }
170    }
171
172    #[fasync::run_until_stalled(test)]
173    async fn ensure_startup_is_awaited() {
174        let delegate = service::MessageHub::create_hub();
175        let mut factory_impl = SettingHandlerFactoryImpl::new(
176            [SettingType::Unknown].into(),
177            Rc::new(ServiceContext::new(None, Some(delegate.clone()))),
178            Rc::new(AtomicU64::new(0)),
179        );
180
181        // Register generation of controller with factory_impl.
182        let generate_handler: GenerateHandler = Box::new(move |context| {
183            Box::pin(ClientImpl::create(
184                context,
185                Box::new(move |proxy| {
186                    Box::pin(async move {
187                        Ok(Box::new(TestController::create(proxy).await.unwrap())
188                            as BoxedController)
189                    })
190                }),
191            ))
192        });
193        factory_impl.register(SettingType::Unknown, generate_handler);
194
195        // Create a broker that only listens to replies.
196        let (_, broker_receptor) =
197            delegate.create_sink().await.expect("could not create event sink receptor");
198
199        let (_, receptor) =
200            delegate.create(MessengerType::Unbound).await.expect("messenger should be created");
201        // Generate the controller, but don't await it yet so we can time it with the response
202        // from the broker.
203        let mut generate_future = factory_impl
204            .generate(SettingType::Unknown, delegate, receptor.get_signature())
205            .into_stream()
206            .fuse();
207        let mut broker_receptor = broker_receptor.fuse();
208
209        // We need to validate that the generate_done always completes after the startup_done
210        // signal. If this ever fails, it implies that the generator does not properly wait for
211        // startup to complete.
212        let mut generate_done = None;
213        let mut startup_done = None;
214        let mut idx: u8 = 0;
215        while generate_done.is_none() || startup_done.is_none() {
216            select! {
217                maybe = broker_receptor.next() => {
218                    let _ = maybe.expect("should have gotten a reply");
219                    startup_done = Some(idx);
220                    idx += 1;
221                }
222                result = generate_future.select_next_some() => {
223                    let _ = result.expect("should have received a signature");
224                    generate_done = Some(idx);
225                    idx += 1;
226                }
227
228                complete => break,
229            }
230        }
231
232        // Validate that the id for generate is larger than the one for startup,
233        // implying that generate finished after startup was done.
234        assert!(generate_done.unwrap() > startup_done.unwrap());
235    }
236}