settings_storage/
device_storage.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::private::Sealed;
6use crate::stash_logger::StashInspectLogger;
7use crate::storage_factory::{DefaultLoader, NoneT};
8use crate::UpdateState;
9use anyhow::{format_err, Context, Error};
10use fidl_fuchsia_stash::{StoreAccessorProxy, Value};
11use fuchsia_async::{MonotonicDuration, MonotonicInstant, Task, Timer};
12use futures::channel::mpsc::UnboundedSender;
13use futures::future::OptionFuture;
14use futures::lock::{Mutex, MutexGuard};
15use futures::{FutureExt, StreamExt};
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18use std::any::Any;
19use std::borrow::Cow;
20use std::collections::HashMap;
21use std::pin::pin;
22use std::rc::Rc;
23
24const SETTINGS_PREFIX: &str = "settings";
25
26/// Minimum amount of time between Flush calls to Stash, in milliseconds. The Flush call triggers
27/// file I/O which is slow. If we call flush too often, we can overwhelm Stash, which eventually
28/// causes the kernel to crash our service due to filling up the channel.
29const MIN_FLUSH_INTERVAL: MonotonicDuration = MonotonicDuration::from_millis(500);
30
31/// Stores device level settings in persistent storage.
32/// User level settings should not use this.
33pub struct DeviceStorage {
34    /// Map of [`DeviceStorageCompatible`] keys to their typed storage.
35    typed_storage_map: HashMap<&'static str, TypedStorage>,
36
37    typed_loader_map: HashMap<&'static str, Box<TypeErasedLoader>>,
38
39    /// If true, reads will be returned from the data in memory rather than reading from storage.
40    caching_enabled: bool,
41
42    /// If true, writes to the underlying storage will only occur at most every
43    /// MIN_WRITE_INTERVAL_MS.
44    debounce_writes: bool,
45
46    /// Handle used to write stash failures to inspect.
47    inspect_handle: Rc<Mutex<StashInspectLogger>>,
48}
49
50/// A wrapper for managing all communication and caching for one particular type of data being
51/// stored. The actual types are erased.
52struct TypedStorage {
53    /// Sender to communicate with task loop that handles flushes.
54    flush_sender: UnboundedSender<()>,
55
56    /// Cached storage managed through interior mutability.
57    cached_storage: Mutex<CachedStorage>,
58}
59
60/// `CachedStorage` abstracts over a cached value that's read from and written
61/// to some backing store.
62struct CachedStorage {
63    /// Cache for the most recently read or written value.
64    current_data: Option<Box<TypeErasedData>>,
65
66    /// Stash connection for this particular type's stash storage.
67    stash_proxy: StoreAccessorProxy,
68}
69
70/// Structs that can be stored in device storage
71///
72/// Structs that can be stored in device storage should derive the Serialize, Deserialize, and
73/// Clone traits, as well as provide constants.
74/// KEY should be unique the struct, usually the name of the struct itself.
75/// DEFAULT_VALUE will be the value returned when nothing has yet been stored.
76///
77/// Anything that implements this should not introduce breaking changes with the same key.
78/// Clients that want to make a breaking change should create a new structure with a new key and
79/// implement conversion/cleanup logic. Adding optional fields to a struct is not breaking, but
80/// removing fields, renaming fields, or adding non-optional fields are.
81///
82/// [`Storage`]: super::setting_handler::persist::Storage
83pub trait DeviceStorageCompatible: Serialize + DeserializeOwned + Clone + PartialEq + Any {
84    type Loader: DefaultDispatcher<Self>;
85
86    fn try_deserialize_from(value: &str) -> Result<Self, Error> {
87        Self::extract(value)
88    }
89
90    fn extract(value: &str) -> Result<Self, Error> {
91        serde_json::from_str(value).map_err(|e| format_err!("could not deserialize: {e:?}"))
92    }
93
94    fn serialize_to(&self) -> String {
95        serde_json::to_string(self).expect("value should serialize")
96    }
97
98    const KEY: &'static str;
99}
100
101/// Types that can be converted into a storable type.
102///
103/// This trait represents types that can be converted into a storable type. It's also important
104/// that the type it is transformed into can also be converted back into this type. This reverse
105/// conversion is used to populate the fields of the original type with the stored values plus
106/// defaulting the other fields that, e.g. might later be populated from hardware APIs.
107///
108/// # Example
109/// ```
110/// // Struct used in controllers.
111/// struct SomeSettingInfo {
112///     storable_field: u8,
113///     hardware_backed_field: String,
114/// }
115///
116/// // Struct only used for storage.
117/// #[derive(Serialize, Deserialize, PartialEq, Clone)]
118/// struct StorableSomeSettingInfo {
119///     storable_field: u8,
120/// }
121///
122/// // Impl compatible for the storable type.
123/// impl DeviceStorageCompatible for StorableSomeSettingInfo {
124///     const KEY: &'static str = "some_setting_info";
125///
126///     fn default_value() -> Self {
127///         Self { storable_field: 1, }
128///     }
129/// }
130///
131/// // Impl convertible for controller type.
132/// impl DeviceStorageConvertible for SomeSettingInfo {
133///     type Storable = StorableSomeSettingInfo;
134///     fn get_storable(&self) -> Cow<'_, Self::Storable> {
135///         Cow::Owned(Self {
136///             storable_field: self.storable_field,
137///             hardware_backed_field: String::new()
138///         })
139///     }
140/// }
141///
142/// // This impl helps us convert from the storable version to the
143/// // controller version of the struct. Hardware fields should be backed
144/// // by default or usable values.
145/// impl Into<SomeSettingInfo> for StorableSomeSettingInfo {
146///     fn into(self) -> SomeSettingInfo {
147///         SomeSettingInfo {
148///             storable_field: self.storable_field,
149///             hardware_backed_field: String::new(),
150///         }
151///     }
152/// }
153///
154/// ```
155pub trait DeviceStorageConvertible: Sized {
156    /// The type that will be used for storing the data.
157    type Storable: DeviceStorageCompatible + Into<Self>;
158
159    /// Convert `self` into its storable version.
160    // The reason we don't take ownership here is that the setting handler uses the original value
161    // to send a message on the message hub for when the change is written. Serializing also only
162    // borrows the data and doesn't need to own it. When `Storable` is `Self`, we only need to keep
163    // the borrow on self, but when the types differ, then we need to own the newly constructed
164    // type.
165    fn get_storable(&self) -> Cow<'_, Self::Storable>;
166}
167
168// Any type that is storage compatible is also storage convertible (it can convert to itself!).
169impl<T> DeviceStorageConvertible for T
170where
171    T: DeviceStorageCompatible,
172{
173    type Storable = T;
174
175    fn get_storable(&self) -> Cow<'_, Self::Storable> {
176        Cow::Borrowed(self)
177    }
178}
179
180type MappingFn = Box<dyn FnOnce(&(dyn Any)) -> String>;
181type TypeErasedData = dyn Any;
182type TypeErasedLoader = dyn Any;
183
184impl DeviceStorage {
185    /// Construct a device storage from the iteratable item, which will produce the keys for
186    /// storage, and from a generator that will produce a stash proxy given a particular key.
187    pub fn with_stash_proxy<I, G>(
188        iter: I,
189        stash_generator: G,
190        inspect_handle: Rc<Mutex<StashInspectLogger>>,
191    ) -> Self
192    where
193        I: IntoIterator<Item = (&'static str, Option<Box<TypeErasedLoader>>)>,
194        G: Fn() -> StoreAccessorProxy,
195    {
196        let mut typed_loader_map = HashMap::new();
197        let typed_storage_map = iter
198            .into_iter()
199            .map({
200                let inspect_handle = Rc::clone(&inspect_handle);
201                let typed_loader_map = &mut typed_loader_map;
202                move |(key, loader)| {
203                    if let Some(loader) = loader {
204                        let _ = typed_loader_map.insert(key, loader);
205                    }
206                    // Generate a separate stash proxy for each key.
207                    let (flush_sender, flush_receiver) = futures::channel::mpsc::unbounded::<()>();
208                    let stash_proxy = stash_generator();
209
210                    let storage = TypedStorage {
211                        flush_sender,
212                        cached_storage: Mutex::new(CachedStorage {
213                            current_data: None,
214                            stash_proxy: stash_proxy.clone(),
215                        }),
216                    };
217
218                    let inspect_handle = Rc::clone(&inspect_handle);
219                    // Each key has an independent flush queue.
220                    Task::local(async move {
221                        let mut next_allowed_flush = MonotonicInstant::now();
222                        let mut next_flush_timer = pin!(OptionFuture::from(None).fuse());
223                        let flush_requested = flush_receiver.fuse();
224                        futures::pin_mut!(flush_requested);
225                        loop {
226                            futures::select! {
227                                () = flush_requested.select_next_some() => {
228                                    next_flush_timer.set(OptionFuture::from(Some(Timer::new(
229                                        next_allowed_flush
230                                    )))
231                                    .fuse());
232                                },
233                                o = next_flush_timer => {
234                                    if let Some(()) = o {
235                                        DeviceStorage::stash_flush(
236                                            &stash_proxy,
237                                            Rc::clone(&inspect_handle),
238                                            key.to_string()).await;
239                                        next_allowed_flush = MonotonicInstant::now() + MIN_FLUSH_INTERVAL;
240                                    }
241                                }
242                                complete => break,
243                            }
244                        }
245                    })
246                    .detach();
247                    (key, storage)
248                }
249            })
250            .collect();
251        DeviceStorage {
252            caching_enabled: true,
253            debounce_writes: true,
254            typed_storage_map,
255            typed_loader_map,
256            inspect_handle,
257        }
258    }
259
260    /// Test-only
261    pub fn set_caching_enabled(&mut self, enabled: bool) {
262        self.caching_enabled = enabled;
263    }
264
265    /// Test-only
266    pub fn set_debounce_writes(&mut self, debounce: bool) {
267        self.debounce_writes = debounce;
268    }
269
270    /// Triggers a flush on the given stash proxy.
271    async fn stash_flush(
272        stash_proxy: &StoreAccessorProxy,
273        inspect_handle: Rc<Mutex<StashInspectLogger>>,
274        setting_key: String,
275    ) {
276        let flush_result = stash_proxy.flush().await;
277        match flush_result {
278            Ok(Err(err)) => {
279                Self::handle_flush_failure(inspect_handle, setting_key, format!("{:?}", err)).await;
280            }
281            Err(err) => {
282                Self::handle_flush_failure(inspect_handle, setting_key, format!("{:?}", err)).await;
283            }
284            _ => {}
285        }
286    }
287
288    async fn handle_flush_failure(
289        inspect_handle: Rc<Mutex<StashInspectLogger>>,
290        setting_key: String,
291        err: String,
292    ) {
293        log::error!("Failed to flush to stash: {:?}", err);
294
295        // Record the write failure to inspect.
296        inspect_handle.lock().await.record_flush_failure(setting_key);
297    }
298
299    async fn inner_write(
300        &self,
301        key: &'static str,
302        new_value: String,
303        data_as_any: Box<TypeErasedData>,
304        mapping_fn: MappingFn,
305    ) -> Result<UpdateState, Error> {
306        let typed_storage = self
307            .typed_storage_map
308            .get(key)
309            .ok_or_else(|| format_err!("Invalid data keyed by {}", key))?;
310        let mut cached_storage = typed_storage.cached_storage.lock().await;
311        let mut maybe_init;
312        let cached_value = {
313            maybe_init = cached_storage
314                .current_data
315                .as_deref()
316                // Get the data as a shared reference so we don't move out of the option.
317                .map(mapping_fn);
318            if maybe_init.is_none() {
319                let stash_key = prefixed(key);
320                if let Some(stash_value) =
321                    cached_storage.stash_proxy.get_value(&stash_key).await.unwrap_or_else(|_| {
322                        panic!("failed to get value from stash for {stash_key:?}")
323                    })
324                {
325                    if let Value::Stringval(string_value) = &*stash_value {
326                        maybe_init = Some(string_value.clone());
327                    } else {
328                        panic!("Unexpected type for key found in stash");
329                    }
330                }
331            }
332            maybe_init.as_ref()
333        };
334
335        Ok(if cached_value != Some(&new_value) {
336            let serialized = Value::Stringval(new_value);
337            let key = prefixed(key);
338            cached_storage.stash_proxy.set_value(&key, serialized)?;
339            if !self.debounce_writes {
340                // Not debouncing writes for testing, just flush immediately.
341                DeviceStorage::stash_flush(
342                    &cached_storage.stash_proxy,
343                    Rc::clone(&self.inspect_handle),
344                    key,
345                )
346                .await;
347            } else {
348                typed_storage.flush_sender.unbounded_send(()).with_context(|| {
349                    format!("flush_sender failed to send flush message, associated key is {key}")
350                })?;
351            }
352            cached_storage.current_data = Some(data_as_any);
353            UpdateState::Updated
354        } else {
355            UpdateState::Unchanged
356        })
357    }
358
359    /// Write `new_value` to storage. The write will be persisted to disk at a set interval.
360    pub async fn write<T>(&self, new_value: &T) -> Result<UpdateState, Error>
361    where
362        T: DeviceStorageCompatible,
363    {
364        self.inner_write(
365            T::KEY,
366            new_value.serialize_to(),
367            Box::new(new_value.clone()) as Box<TypeErasedData>,
368            Box::new(|any: &(dyn Any)| {
369                // Attempt to downcast the `dyn Any` to its original type. If `T` was not its
370                // original type, then we want to panic because there's a compile-time issue
371                // with overlapping keys.
372                let value = any.downcast_ref::<T>().expect(
373                    "Type mismatch even though keys match. Two different\
374                                        types have the same key value",
375                );
376                value.serialize_to()
377            }),
378        )
379        .await
380    }
381
382    /// Test-only method to write directly to stash without touching the cache. This is used for
383    /// setting up data as if it existed on disk before the connection to stash was made.
384    pub async fn write_str(&self, key: &'static str, value: String) -> Result<(), Error> {
385        let typed_storage =
386            self.typed_storage_map.get(key).expect("Did not request an initialized key");
387        let cached_storage = typed_storage.cached_storage.lock().await;
388        cached_storage.stash_proxy.set_value(&prefixed(key), Value::Stringval(value))?;
389        typed_storage.flush_sender.unbounded_send(()).unwrap();
390        Ok(())
391    }
392
393    async fn get_inner(
394        &self,
395        key: &'static str,
396    ) -> (MutexGuard<'_, CachedStorage>, Option<Option<String>>) {
397        let typed_storage = self
398            .typed_storage_map
399            .get(key)
400            // TODO(https://fxbug.dev/42064613) Replace this with an error result.
401            .unwrap_or_else(|| panic!("Invalid data keyed by {key}"));
402        let cached_storage = typed_storage.cached_storage.lock().await;
403        let new = if cached_storage.current_data.is_none() || !self.caching_enabled {
404            let stash_key = prefixed(key);
405            if let Some(stash_value) = cached_storage
406                .stash_proxy
407                .get_value(&stash_key)
408                .await
409                .unwrap_or_else(|_| panic!("failed to get value from stash for {stash_key:?}"))
410            {
411                if let Value::Stringval(string_value) = *stash_value {
412                    Some(Some(string_value))
413                } else {
414                    panic!("Unexpected type for key found in stash");
415                }
416            } else {
417                Some(None)
418            }
419        } else {
420            None
421        };
422
423        (cached_storage, new)
424    }
425
426    /// Gets the latest value cached locally, or loads the value from storage.
427    /// Doesn't support multiple concurrent callers of the same struct.
428    pub async fn get<T>(&self) -> T
429    where
430        T: DeviceStorageCompatible,
431    {
432        let (mut cached_storage, update) = self.get_inner(T::KEY).await;
433        if let Some(update) = update {
434            cached_storage.current_data = Some(update.and_then(|string_value| {
435                T::try_deserialize_from(&string_value).map(|val| Box::new(val) as Box<TypeErasedData>).map_err(|e| log::error!(
436                    "Using default. Failed to deserialize type {}: {e:?}\nSource data: {string_value:?}",
437                    T::KEY
438                )).ok()
439            }).unwrap_or_else(|| Box::new(<T::Loader as DefaultDispatcher<T>>::get_default(self)) as Box<TypeErasedData>));
440        };
441
442        cached_storage
443            .current_data
444            .as_ref()
445            .expect("should always have a value")
446            .downcast_ref::<T>()
447            .expect(
448                "Type mismatch even though keys match. Two different types have the same key\
449                     value",
450            )
451            .clone()
452    }
453}
454
455pub trait DefaultDispatcher<T>: Sealed
456where
457    T: DeviceStorageCompatible,
458{
459    fn get_default(_: &DeviceStorage) -> T;
460}
461
462impl<T> DefaultDispatcher<T> for NoneT
463where
464    T: DeviceStorageCompatible<Loader = Self> + Default,
465{
466    fn get_default(_: &DeviceStorage) -> T {
467        T::default()
468    }
469}
470
471impl<T, L> DefaultDispatcher<T> for L
472where
473    T: DeviceStorageCompatible<Loader = L>,
474    L: DefaultLoader<Result = T> + 'static,
475{
476    fn get_default(storage: &DeviceStorage) -> T {
477        match storage.typed_loader_map.get(T::KEY) {
478            Some(loader) => match loader.downcast_ref::<T::Loader>() {
479                Some(loader) => loader.default_value(),
480                None => {
481                    panic!("Mismatch key and loader for key {}", T::KEY);
482                }
483            },
484            None => panic!("Missing loader for {}", T::KEY),
485        }
486    }
487}
488
489fn prefixed(input_string: &str) -> String {
490    format!("{SETTINGS_PREFIX}_{input_string}")
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496    use assert_matches::assert_matches;
497    use diagnostics_assertions::assert_data_tree;
498    use fidl_fuchsia_stash::{
499        FlushError, StoreAccessorMarker, StoreAccessorRequest, StoreAccessorRequestStream,
500    };
501    use fuchsia_async as fasync;
502    use fuchsia_async::TestExecutor;
503    use fuchsia_inspect::component;
504    use futures::prelude::*;
505    use serde::{Deserialize, Serialize};
506    use std::task::Poll;
507
508    const VALUE0: i32 = 3;
509    const VALUE1: i32 = 33;
510    const VALUE2: i32 = 128;
511
512    #[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
513    struct TestStruct {
514        value: i32,
515    }
516
517    const STORE_KEY: &str = "settings_testkey";
518
519    impl DeviceStorageCompatible for TestStruct {
520        type Loader = NoneT;
521        const KEY: &'static str = "testkey";
522    }
523
524    impl Default for TestStruct {
525        fn default() -> Self {
526            TestStruct { value: VALUE0 }
527        }
528    }
529
530    /// Advances `future` until `executor` finishes. Panics if the end result was a stall.
531    #[track_caller]
532    fn advance_executor<F>(executor: &mut TestExecutor, future: &mut F)
533    where
534        F: Future + Unpin,
535    {
536        assert!(executor.run_until_stalled(future).is_ready(), "TestExecutor stalled!");
537    }
538
539    /// Verifies that a SetValue call was sent to stash with the given value.
540    async fn verify_stash_set(stash_stream: &mut StoreAccessorRequestStream, expected_value: i32) {
541        match stash_stream.next().await.unwrap() {
542            Ok(StoreAccessorRequest::SetValue { key, val, control_handle: _ }) => {
543                assert_eq!(key, STORE_KEY);
544                if let Value::Stringval(string_value) = val {
545                    let input_value = TestStruct::try_deserialize_from(&string_value)
546                        .expect("deserialization should succeed");
547                    assert_eq!(input_value.value, expected_value);
548                } else {
549                    panic!("Unexpected type for key found in stash");
550                }
551            }
552            request => panic!("Unexpected request: {request:?}"),
553        }
554    }
555
556    /// Verifies that a SetValue call was sent to stash with the given value.
557    async fn validate_stash_get_and_respond(
558        stash_stream: &mut StoreAccessorRequestStream,
559        response: String,
560    ) {
561        match stash_stream.next().await.unwrap() {
562            Ok(StoreAccessorRequest::GetValue { key, responder }) => {
563                assert_eq!(key, STORE_KEY);
564                responder.send(Some(Value::Stringval(response))).expect("unable to send response");
565            }
566            request => panic!("Unexpected request: {request:?}"),
567        }
568    }
569
570    /// Verifies that a Flush call was sent to stash.
571    async fn verify_stash_flush(stash_stream: &mut StoreAccessorRequestStream) {
572        match stash_stream.next().await.unwrap() {
573            Ok(StoreAccessorRequest::Flush { responder }) => {
574                let _ = responder.send(Ok(()));
575            } // expected
576            request => panic!("Unexpected request: {request:?}"),
577        }
578    }
579
580    /// Verifies that a Flush call was sent to stash, and send back a failure.
581    async fn fail_stash_flush(stash_stream: &mut StoreAccessorRequestStream) {
582        match stash_stream.next().await.unwrap() {
583            Ok(StoreAccessorRequest::Flush { responder }) => {
584                let _ = responder.send(Err(FlushError::CommitFailed));
585            } // expected
586            request => panic!("Unexpected request: {request:?}"),
587        }
588    }
589
590    #[fuchsia::test(allow_stalls = false)]
591    async fn test_get() {
592        let (stash_proxy, mut stash_stream) =
593            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
594
595        fasync::Task::local(async move {
596            let value_to_get = TestStruct { value: VALUE1 };
597
598            #[allow(clippy::single_match)]
599            while let Some(req) = stash_stream.try_next().await.unwrap() {
600                #[allow(unreachable_patterns)]
601                match req {
602                    StoreAccessorRequest::GetValue { key, responder } => {
603                        assert_eq!(key, STORE_KEY);
604                        let response = Value::Stringval(value_to_get.serialize_to());
605
606                        responder.send(Some(response)).unwrap();
607                    }
608                    _ => {}
609                }
610            }
611        })
612        .detach();
613
614        let storage = DeviceStorage::with_stash_proxy(
615            vec![(TestStruct::KEY, None)],
616            move || stash_proxy.clone(),
617            Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
618        );
619        let result = storage.get::<TestStruct>().await;
620
621        assert_eq!(result.value, VALUE1);
622    }
623
624    #[fuchsia::test(allow_stalls = false)]
625    async fn test_get_default() {
626        let (stash_proxy, mut stash_stream) =
627            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
628
629        fasync::Task::local(async move {
630            #[allow(clippy::single_match)]
631            while let Some(req) = stash_stream.try_next().await.unwrap() {
632                #[allow(unreachable_patterns)]
633                match req {
634                    StoreAccessorRequest::GetValue { key: _, responder } => {
635                        responder.send(None).unwrap();
636                    }
637                    _ => {}
638                }
639            }
640        })
641        .detach();
642
643        let storage = DeviceStorage::with_stash_proxy(
644            vec![(TestStruct::KEY, None)],
645            move || stash_proxy.clone(),
646            Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
647        );
648        let result = storage.get::<TestStruct>().await;
649
650        assert_eq!(result.value, VALUE0);
651    }
652
653    // For an invalid stash value, the get() method should return the default value.
654    #[fuchsia::test(allow_stalls = false)]
655    async fn test_invalid_stash() {
656        let (stash_proxy, mut stash_stream) =
657            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
658
659        fasync::Task::local(async move {
660            #[allow(clippy::single_match)]
661            while let Some(req) = stash_stream.try_next().await.unwrap() {
662                #[allow(unreachable_patterns)]
663                match req {
664                    StoreAccessorRequest::GetValue { key: _, responder } => {
665                        let response = Value::Stringval("invalid value".to_string());
666                        responder.send(Some(response)).unwrap();
667                    }
668                    _ => {}
669                }
670            }
671        })
672        .detach();
673
674        let storage = DeviceStorage::with_stash_proxy(
675            vec![(TestStruct::KEY, None)],
676            move || stash_proxy.clone(),
677            Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
678        );
679
680        let result = storage.get::<TestStruct>().await;
681
682        assert_eq!(result.value, VALUE0);
683    }
684
685    // Verifies that stash flush failures are written to inspect.
686    #[fuchsia::test]
687    fn test_flush_fail_writes_to_inspect() {
688        let written_value = VALUE2;
689        let mut executor = TestExecutor::new_with_fake_time();
690
691        let (stash_proxy, mut stash_stream) =
692            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
693
694        let inspector = component::inspector();
695        let logger_handle = Rc::new(Mutex::new(StashInspectLogger::new(inspector.root())));
696        let storage = DeviceStorage::with_stash_proxy(
697            vec![(TestStruct::KEY, None)],
698            move || stash_proxy.clone(),
699            logger_handle,
700        );
701
702        // Write to device storage.
703        let value_to_write = TestStruct { value: written_value };
704        let write_future = storage.write(&value_to_write);
705        futures::pin_mut!(write_future);
706
707        // Initial cache check is done if no read was ever performed.
708        assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Pending);
709
710        {
711            let respond_future = validate_stash_get_and_respond(
712                &mut stash_stream,
713                serde_json::to_string(&TestStruct::default()).unwrap(),
714            );
715            futures::pin_mut!(respond_future);
716            advance_executor(&mut executor, &mut respond_future);
717        }
718
719        // Write request finishes immediately.
720        assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Ready(Ok(_)));
721
722        // Set request is received immediately on write.
723        {
724            let set_value_future = verify_stash_set(&mut stash_stream, written_value);
725            futures::pin_mut!(set_value_future);
726            advance_executor(&mut executor, &mut set_value_future);
727        }
728
729        // Start listening for the flush request.
730        let flush_future = fail_stash_flush(&mut stash_stream);
731        futures::pin_mut!(flush_future);
732
733        // Flush is received without a wait. Due to the way time works with executors, if there was
734        // a delay, the test would stall since time never advances.
735        advance_executor(&mut executor, &mut flush_future);
736
737        // Queue up a second write to guarantee that CachedStorage has written the failure to
738        // inspect.
739        {
740            let value_to_write = TestStruct { value: VALUE1 };
741            let write_future = storage.write(&value_to_write);
742            futures::pin_mut!(write_future);
743            assert_matches!(
744                executor.run_until_stalled(&mut write_future),
745                Poll::Ready(Result::Ok(_))
746            );
747        }
748
749        // Run all background tasks until stalled.
750        let _ = executor.run_until_stalled(&mut future::pending::<()>());
751
752        assert_data_tree!(inspector, root: {
753            stash_failures: {
754                testkey: {
755                    count: 1u64,
756                }
757            }
758        });
759    }
760
761    // Test that an initial write to DeviceStorage causes a SetValue and Flush to Stash
762    // without any wait.
763    #[fuchsia::test]
764    fn test_first_write_flushes_immediately() {
765        let written_value = VALUE2;
766        let mut executor = TestExecutor::new_with_fake_time();
767
768        let (stash_proxy, mut stash_stream) =
769            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
770
771        let storage = DeviceStorage::with_stash_proxy(
772            vec![(TestStruct::KEY, None)],
773            move || stash_proxy.clone(),
774            Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
775        );
776
777        // Write to device storage.
778        let value_to_write = TestStruct { value: written_value };
779        let write_future = storage.write(&value_to_write);
780        futures::pin_mut!(write_future);
781
782        // Initial cache check is done if no read was ever performed.
783        assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Pending);
784
785        {
786            let respond_future = validate_stash_get_and_respond(
787                &mut stash_stream,
788                serde_json::to_string(&TestStruct::default()).unwrap(),
789            );
790            futures::pin_mut!(respond_future);
791            advance_executor(&mut executor, &mut respond_future);
792        }
793
794        // Write request finishes immediately.
795        assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Ready(Ok(_)));
796
797        // Set request is received immediately on write.
798        {
799            let set_value_future = verify_stash_set(&mut stash_stream, written_value);
800            futures::pin_mut!(set_value_future);
801            advance_executor(&mut executor, &mut set_value_future);
802        }
803
804        // Start listening for the flush request.
805        let flush_future = verify_stash_flush(&mut stash_stream);
806        futures::pin_mut!(flush_future);
807
808        // Flush is received without a wait. Due to the way time works with executors, if there was
809        // a delay, the test would stall since time never advances.
810        advance_executor(&mut executor, &mut flush_future);
811    }
812
813    #[derive(Default, Copy, Clone, PartialEq, Serialize, Deserialize)]
814    struct WrongStruct;
815
816    impl DeviceStorageCompatible for WrongStruct {
817        type Loader = NoneT;
818        const KEY: &'static str = "WRONG_STRUCT";
819    }
820
821    // Test that an initial write to DeviceStorage causes a SetValue and Flush to Stash
822    // without any wait.
823    #[fuchsia::test(allow_stalls = false)]
824    async fn test_write_with_mismatch_type_returns_error() {
825        let (stash_proxy, mut stream) =
826            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
827
828        let spawned = fasync::Task::local(async move {
829            while let Some(request) = stream.next().await {
830                match request {
831                    Ok(StoreAccessorRequest::GetValue { key, responder }) => {
832                        assert_eq!(key, STORE_KEY);
833                        let _ = responder.send(Some(Value::Stringval(
834                            serde_json::to_string(&TestStruct { value: VALUE2 }).unwrap(),
835                        )));
836                    }
837                    Ok(StoreAccessorRequest::SetValue { key, .. }) => {
838                        assert_eq!(key, STORE_KEY);
839                    }
840                    _ => panic!("Unexpected request {request:?}"),
841                }
842            }
843        });
844
845        let storage = DeviceStorage::with_stash_proxy(
846            vec![(TestStruct::KEY, None)],
847            move || stash_proxy.clone(),
848            Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
849        );
850
851        // Write successfully to storage once.
852        let result = storage.write(&TestStruct { value: VALUE2 }).await;
853        assert!(result.is_ok());
854
855        // Write to device storage again with a different type to validate that the type can't
856        // be changed.
857        let result = storage.write(&WrongStruct).await;
858        assert_matches!(result, Err(e) if e.to_string() == "Invalid data keyed by WRONG_STRUCT");
859
860        drop(storage);
861        spawned.await;
862    }
863
864    // Test that multiple writes to DeviceStorage will cause a SetValue each time, but will only
865    // Flush to Stash at an interval.
866    #[fuchsia::test]
867    fn test_multiple_write_debounce() {
868        // Custom executor for this test so that we can advance the clock arbitrarily and verify the
869        // state of the executor at any given point.
870        let mut executor = TestExecutor::new_with_fake_time();
871        let start_time = MonotonicInstant::from_nanos(0);
872        executor.set_fake_time(start_time);
873
874        let (stash_proxy, mut stash_stream) =
875            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
876
877        let storage = DeviceStorage::with_stash_proxy(
878            vec![(TestStruct::KEY, None)],
879            move || stash_proxy.clone(),
880            Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
881        );
882
883        let first_value = VALUE1;
884        let second_value = VALUE2;
885
886        // First write finishes immediately.
887        {
888            let value_to_write = TestStruct { value: first_value };
889            let write_future = storage.write(&value_to_write);
890            futures::pin_mut!(write_future);
891
892            // Initial cache check is done if no read was ever performed.
893            assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Pending);
894
895            {
896                let respond_future = validate_stash_get_and_respond(
897                    &mut stash_stream,
898                    serde_json::to_string(&TestStruct::default()).unwrap(),
899                );
900                futures::pin_mut!(respond_future);
901                advance_executor(&mut executor, &mut respond_future);
902            }
903
904            assert_matches!(
905                executor.run_until_stalled(&mut write_future),
906                Poll::Ready(Result::Ok(_))
907            );
908        }
909
910        // First set request is received immediately on write.
911        {
912            let set_value_future = verify_stash_set(&mut stash_stream, first_value);
913            futures::pin_mut!(set_value_future);
914            advance_executor(&mut executor, &mut set_value_future);
915        }
916
917        // First flush request is received.
918        {
919            let flush_future = verify_stash_flush(&mut stash_stream);
920            futures::pin_mut!(flush_future);
921            advance_executor(&mut executor, &mut flush_future);
922        }
923
924        // Now we repeat the process with a second write request, which will need to advance the
925        // fake time due to the timer.
926
927        // Second write finishes immediately.
928        {
929            let value_to_write = TestStruct { value: second_value };
930            let write_future = storage.write(&value_to_write);
931            futures::pin_mut!(write_future);
932            assert_matches!(
933                executor.run_until_stalled(&mut write_future),
934                Poll::Ready(Result::Ok(_))
935            );
936        }
937
938        // Second set request finishes immediately on write.
939        {
940            let set_value_future = verify_stash_set(&mut stash_stream, second_value);
941            futures::pin_mut!(set_value_future);
942            advance_executor(&mut executor, &mut set_value_future);
943        }
944
945        // Start waiting for flush request.
946        let flush_future = verify_stash_flush(&mut stash_stream);
947        futures::pin_mut!(flush_future);
948
949        // TextExecutor stalls due to waiting on timer to finish.
950        assert_matches!(executor.run_until_stalled(&mut flush_future), Poll::Pending);
951
952        // Advance time to 1ms before the flush triggers.
953        executor
954            .set_fake_time(start_time + (MIN_FLUSH_INTERVAL - MonotonicDuration::from_millis(1)));
955
956        // TextExecutor is still waiting on the time to finish.
957        assert_matches!(executor.run_until_stalled(&mut flush_future), Poll::Pending);
958
959        // Advance time so that the flush will trigger.
960        executor.set_fake_time(start_time + MIN_FLUSH_INTERVAL);
961
962        // Stash receives a flush request after one timer cycle and the future terminates.
963        advance_executor(&mut executor, &mut flush_future);
964    }
965
966    // This mod includes structs to only be used by
967    // test_device_compatible_migration tests.
968    mod test_device_compatible_migration {
969        use super::*;
970        use serde::{Deserialize, Serialize};
971
972        pub(crate) const DEFAULT_V1_VALUE: i32 = 1;
973        pub(crate) const DEFAULT_CURRENT_VALUE: i32 = 2;
974        pub(crate) const DEFAULT_CURRENT_VALUE_2: i32 = 3;
975
976        #[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
977        pub(crate) struct V1 {
978            pub value: i32,
979        }
980
981        impl DeviceStorageCompatible for V1 {
982            type Loader = NoneT;
983            const KEY: &'static str = "testkey";
984        }
985
986        impl Default for V1 {
987            fn default() -> Self {
988                Self { value: DEFAULT_V1_VALUE }
989            }
990        }
991
992        #[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
993        pub(crate) struct Current {
994            pub value: i32,
995            pub value_2: i32,
996        }
997
998        impl From<V1> for Current {
999            fn from(v1: V1) -> Self {
1000                Current { value: v1.value, value_2: DEFAULT_CURRENT_VALUE_2 }
1001            }
1002        }
1003
1004        impl DeviceStorageCompatible for Current {
1005            type Loader = NoneT;
1006            const KEY: &'static str = "testkey2";
1007
1008            fn try_deserialize_from(value: &str) -> Result<Self, Error> {
1009                Self::extract(value).or_else(|_| V1::extract(value).map(Self::from))
1010            }
1011        }
1012
1013        impl Default for Current {
1014            fn default() -> Self {
1015                Self { value: DEFAULT_CURRENT_VALUE, value_2: DEFAULT_CURRENT_VALUE_2 }
1016            }
1017        }
1018    }
1019
1020    #[fuchsia::test]
1021    fn test_device_compatible_custom_migration() {
1022        // Create an initial struct based on the first version.
1023        let initial = test_device_compatible_migration::V1::default();
1024        // Serialize.
1025        let initial_serialized = initial.serialize_to();
1026
1027        // Deserialize using the second version.
1028        let current =
1029            test_device_compatible_migration::Current::try_deserialize_from(&initial_serialized)
1030                .expect("deserialization should succeed");
1031        // Assert values carried over from first version and defaults are used for rest.
1032        assert_eq!(current.value, test_device_compatible_migration::DEFAULT_V1_VALUE);
1033        assert_eq!(current.value_2, test_device_compatible_migration::DEFAULT_CURRENT_VALUE_2);
1034    }
1035
1036    #[fuchsia::test(allow_stalls = false)]
1037    async fn test_corrupt_get_returns_default() {
1038        let (stash_proxy, mut stash_stream) =
1039            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
1040
1041        fasync::Task::local(async move {
1042            #[allow(clippy::single_match)]
1043            while let Some(req) = stash_stream.try_next().await.unwrap() {
1044                #[allow(unreachable_patterns)]
1045                match req {
1046                    StoreAccessorRequest::GetValue { key, responder } => {
1047                        assert_eq!(
1048                            key,
1049                            format!("settings_{}", test_device_compatible_migration::Current::KEY)
1050                        );
1051                        let response = Value::Stringval("bad json".to_string());
1052                        responder.send(Some(response)).unwrap();
1053                    }
1054                    _ => {}
1055                }
1056            }
1057        })
1058        .detach();
1059
1060        let storage = DeviceStorage::with_stash_proxy(
1061            vec![(test_device_compatible_migration::Current::KEY, None)],
1062            move || stash_proxy.clone(),
1063            Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
1064        );
1065        let current = storage.get::<test_device_compatible_migration::Current>().await;
1066
1067        assert_eq!(current.value, test_device_compatible_migration::DEFAULT_CURRENT_VALUE);
1068        assert_eq!(current.value_2, test_device_compatible_migration::DEFAULT_CURRENT_VALUE_2);
1069    }
1070}