settings_storage/
device_storage.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::private::Sealed;
6use crate::stash_logger::StashInspectLogger;
7use crate::storage_factory::{DefaultLoader, NoneT};
8use crate::UpdateState;
9use anyhow::{format_err, Context, Error};
10use fidl_fuchsia_stash::{StoreAccessorProxy, Value};
11use fuchsia_async::{MonotonicDuration, MonotonicInstant, Task, Timer};
12use futures::channel::mpsc::UnboundedSender;
13use futures::future::OptionFuture;
14use futures::lock::{Mutex, MutexGuard};
15use futures::{FutureExt, StreamExt};
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18use std::any::Any;
19use std::borrow::Cow;
20use std::collections::HashMap;
21use std::pin::pin;
22use std::rc::Rc;
23
24const SETTINGS_PREFIX: &str = "settings";
25
26/// Minimum amount of time between Flush calls to Stash, in milliseconds. The Flush call triggers
27/// file I/O which is slow. If we call flush too often, we can overwhelm Stash, which eventually
28/// causes the kernel to crash our service due to filling up the channel.
29const MIN_FLUSH_INTERVAL: MonotonicDuration = MonotonicDuration::from_millis(500);
30
31/// Stores device level settings in persistent storage.
32/// User level settings should not use this.
33pub struct DeviceStorage {
34    /// Map of [`DeviceStorageCompatible`] keys to their typed storage.
35    typed_storage_map: HashMap<&'static str, TypedStorage>,
36
37    typed_loader_map: HashMap<&'static str, Box<TypeErasedLoader>>,
38
39    /// If true, reads will be returned from the data in memory rather than reading from storage.
40    caching_enabled: bool,
41
42    /// If true, writes to the underlying storage will only occur at most every
43    /// MIN_WRITE_INTERVAL_MS.
44    debounce_writes: bool,
45
46    /// Handle used to write stash failures to inspect.
47    inspect_handle: Rc<Mutex<StashInspectLogger>>,
48}
49
50/// A wrapper for managing all communication and caching for one particular type of data being
51/// stored. The actual types are erased.
52struct TypedStorage {
53    /// Sender to communicate with task loop that handles flushes.
54    flush_sender: UnboundedSender<()>,
55
56    /// Cached storage managed through interior mutability.
57    cached_storage: Mutex<CachedStorage>,
58}
59
60/// `CachedStorage` abstracts over a cached value that's read from and written
61/// to some backing store.
62struct CachedStorage {
63    /// Cache for the most recently read or written value.
64    current_data: Option<Box<TypeErasedData>>,
65
66    /// Stash connection for this particular type's stash storage.
67    stash_proxy: StoreAccessorProxy,
68}
69
70/// Structs that can be stored in device storage
71///
72/// Structs that can be stored in device storage should derive the Serialize, Deserialize, and
73/// Clone traits, as well as provide constants.
74/// KEY should be unique the struct, usually the name of the struct itself.
75/// DEFAULT_VALUE will be the value returned when nothing has yet been stored.
76///
77/// Anything that implements this should not introduce breaking changes with the same key.
78/// Clients that want to make a breaking change should create a new structure with a new key and
79/// implement conversion/cleanup logic. Adding optional fields to a struct is not breaking, but
80/// removing fields, renaming fields, or adding non-optional fields are.
81///
82/// [`Storage`]: super::setting_handler::persist::Storage
83pub trait DeviceStorageCompatible: Serialize + DeserializeOwned + Clone + PartialEq + Any {
84    type Loader: DefaultDispatcher<Self>;
85
86    fn try_deserialize_from(value: &str) -> Result<Self, Error> {
87        Self::extract(value)
88    }
89
90    fn extract(value: &str) -> Result<Self, Error> {
91        serde_json::from_str(value).map_err(|e| format_err!("could not deserialize: {e:?}"))
92    }
93
94    fn serialize_to(&self) -> String {
95        serde_json::to_string(self).expect("value should serialize")
96    }
97
98    const KEY: &'static str;
99}
100
101/// Types that can be converted into a storable type.
102///
103/// This trait represents types that can be converted into a storable type. It's also important
104/// that the type it is transformed into can also be converted back into this type. This reverse
105/// conversion is used to populate the fields of the original type with the stored values plus
106/// defaulting the other fields that, e.g. might later be populated from hardware APIs.
107///
108/// # Example
109/// ```
110/// // Struct used in controllers.
111/// struct SomeSettingInfo {
112///     storable_field: u8,
113///     hardware_backed_field: String,
114/// }
115///
116/// // Struct only used for storage.
117/// #[derive(Serialize, Deserialize, PartialEq, Clone)]
118/// struct StorableSomeSettingInfo {
119///     storable_field: u8,
120/// }
121///
122/// // Impl compatible for the storable type.
123/// impl DeviceStorageCompatible for StorableSomeSettingInfo {
124///     const KEY: &'static str = "some_setting_info";
125///
126///     fn default_value() -> Self {
127///         Self { storable_field: 1, }
128///     }
129/// }
130///
131/// // Impl convertible for controller type.
132/// impl DeviceStorageConvertible for SomeSettingInfo {
133///     type Storable = StorableSomeSettingInfo;
134///     fn get_storable(&self) -> Cow<'_, Self::Storable> {
135///         Cow::Owned(Self {
136///             storable_field: self.storable_field,
137///             hardware_backed_field: String::new()
138///         })
139///     }
140/// }
141///
142/// // This impl helps us convert from the storable version to the
143/// // controller version of the struct. Hardware fields should be backed
144/// // by default or usable values.
145/// impl Into<SomeSettingInfo> for StorableSomeSettingInfo {
146///     fn into(self) -> SomeSettingInfo {
147///         SomeSettingInfo {
148///             storable_field: self.storable_field,
149///             hardware_backed_field: String::new(),
150///         }
151///     }
152/// }
153///
154/// ```
155pub trait DeviceStorageConvertible: Sized {
156    /// The type that will be used for storing the data.
157    type Storable: DeviceStorageCompatible + Into<Self>;
158
159    /// Convert `self` into its storable version.
160    // The reason we don't take ownership here is that the setting handler uses the original value
161    // to send a message on the message hub for when the change is written. Serializing also only
162    // borrows the data and doesn't need to own it. When `Storable` is `Self`, we only need to keep
163    // the borrow on self, but when the types differ, then we need to own the newly constructed
164    // type.
165    fn get_storable(&self) -> Cow<'_, Self::Storable>;
166}
167
168// Any type that is storage compatible is also storage convertible (it can convert to itself!).
169impl<T> DeviceStorageConvertible for T
170where
171    T: DeviceStorageCompatible,
172{
173    type Storable = T;
174
175    fn get_storable(&self) -> Cow<'_, Self::Storable> {
176        Cow::Borrowed(self)
177    }
178}
179
180type MappingFn = Box<dyn FnOnce(&dyn Any) -> String>;
181type TypeErasedData = dyn Any;
182type TypeErasedLoader = dyn Any;
183
184impl DeviceStorage {
185    /// Construct a device storage from the iteratable item, which will produce the keys for
186    /// storage, and from a generator that will produce a stash proxy given a particular key.
187    pub fn with_stash_proxy<I, G>(
188        iter: I,
189        stash_generator: G,
190        inspect_handle: Rc<Mutex<StashInspectLogger>>,
191    ) -> Self
192    where
193        I: IntoIterator<Item = (&'static str, Option<Box<TypeErasedLoader>>)>,
194        G: Fn() -> StoreAccessorProxy,
195    {
196        let mut typed_loader_map = HashMap::new();
197        let typed_storage_map = iter
198            .into_iter()
199            .map({
200                let inspect_handle = Rc::clone(&inspect_handle);
201                let typed_loader_map = &mut typed_loader_map;
202                move |(key, loader)| {
203                    if let Some(loader) = loader {
204                        let _ = typed_loader_map.insert(key, loader);
205                    }
206                    // Generate a separate stash proxy for each key.
207                    let (flush_sender, flush_receiver) = futures::channel::mpsc::unbounded::<()>();
208                    let stash_proxy = stash_generator();
209
210                    let storage = TypedStorage {
211                        flush_sender,
212                        cached_storage: Mutex::new(CachedStorage {
213                            current_data: None,
214                            stash_proxy: stash_proxy.clone(),
215                        }),
216                    };
217
218                    let inspect_handle = Rc::clone(&inspect_handle);
219                    // Each key has an independent flush queue.
220                    Task::local(async move {
221                        let mut next_allowed_flush = MonotonicInstant::now();
222                        let mut next_flush_timer = pin!(OptionFuture::from(None).fuse());
223                        let flush_requested = flush_receiver.fuse();
224                        futures::pin_mut!(flush_requested);
225                        loop {
226                            futures::select! {
227                                () = flush_requested.select_next_some() => {
228                                    next_flush_timer.set(OptionFuture::from(Some(Timer::new(
229                                        next_allowed_flush
230                                    )))
231                                    .fuse());
232                                },
233                                o = next_flush_timer => {
234                                    if let Some(()) = o {
235                                        DeviceStorage::stash_flush(
236                                            &stash_proxy,
237                                            Rc::clone(&inspect_handle),
238                                            key.to_string()).await;
239                                        next_allowed_flush = MonotonicInstant::now() + MIN_FLUSH_INTERVAL;
240                                    }
241                                }
242                                complete => break,
243                            }
244                        }
245                    })
246                    .detach();
247                    (key, storage)
248                }
249            })
250            .collect();
251        DeviceStorage {
252            caching_enabled: true,
253            debounce_writes: true,
254            typed_storage_map,
255            typed_loader_map,
256            inspect_handle,
257        }
258    }
259
260    /// Test-only
261    pub fn set_caching_enabled(&mut self, enabled: bool) {
262        self.caching_enabled = enabled;
263    }
264
265    /// Test-only
266    pub fn set_debounce_writes(&mut self, debounce: bool) {
267        self.debounce_writes = debounce;
268    }
269
270    /// Triggers a flush on the given stash proxy.
271    async fn stash_flush(
272        stash_proxy: &StoreAccessorProxy,
273        inspect_handle: Rc<Mutex<StashInspectLogger>>,
274        setting_key: String,
275    ) {
276        let flush_result = stash_proxy.flush().await;
277        match flush_result {
278            Ok(Err(err)) => {
279                Self::handle_flush_failure(inspect_handle, setting_key, format!("{err:?}")).await;
280            }
281            Err(err) => {
282                Self::handle_flush_failure(inspect_handle, setting_key, format!("{err:?}")).await;
283            }
284            _ => {}
285        }
286    }
287
288    async fn handle_flush_failure(
289        inspect_handle: Rc<Mutex<StashInspectLogger>>,
290        setting_key: String,
291        err: String,
292    ) {
293        log::error!("Failed to flush to stash: {:?}", err);
294
295        // Record the write failure to inspect.
296        inspect_handle.lock().await.record_flush_failure(setting_key);
297    }
298
299    async fn inner_write(
300        &self,
301        key: &'static str,
302        new_value: String,
303        data_as_any: Box<TypeErasedData>,
304        mapping_fn: MappingFn,
305    ) -> Result<UpdateState, Error> {
306        let typed_storage = self
307            .typed_storage_map
308            .get(key)
309            .ok_or_else(|| format_err!("Invalid data keyed by {}", key))?;
310        let mut cached_storage = typed_storage.cached_storage.lock().await;
311        let mut maybe_init;
312        let cached_value = {
313            maybe_init = cached_storage
314                .current_data
315                .as_deref()
316                // Get the data as a shared reference so we don't move out of the option.
317                .map(mapping_fn);
318            if maybe_init.is_none() {
319                let stash_key = prefixed(key);
320                if let Some(stash_value) =
321                    cached_storage.stash_proxy.get_value(&stash_key).await.unwrap_or_else(|_| {
322                        panic!("failed to get value from stash for {stash_key:?}")
323                    })
324                {
325                    if let Value::Stringval(string_value) = &*stash_value {
326                        maybe_init = Some(string_value.clone());
327                    } else {
328                        panic!("Unexpected type for key found in stash");
329                    }
330                }
331            }
332            maybe_init.as_ref()
333        };
334
335        Ok(if cached_value != Some(&new_value) {
336            let serialized = Value::Stringval(new_value);
337            let key = prefixed(key);
338            cached_storage.stash_proxy.set_value(&key, serialized)?;
339            if !self.debounce_writes {
340                // Not debouncing writes for testing, just flush immediately.
341                DeviceStorage::stash_flush(
342                    &cached_storage.stash_proxy,
343                    Rc::clone(&self.inspect_handle),
344                    key,
345                )
346                .await;
347            } else {
348                typed_storage.flush_sender.unbounded_send(()).with_context(|| {
349                    format!("flush_sender failed to send flush message, associated key is {key}")
350                })?;
351            }
352            cached_storage.current_data = Some(data_as_any);
353            UpdateState::Updated
354        } else {
355            UpdateState::Unchanged
356        })
357    }
358
359    /// Write `new_value` to storage. The write will be persisted to disk at a set interval.
360    pub async fn write<T>(&self, new_value: &T) -> Result<UpdateState, Error>
361    where
362        T: DeviceStorageConvertible,
363    {
364        let storable = new_value.get_storable();
365        self.inner_write(
366            T::Storable::KEY,
367            storable.serialize_to(),
368            Box::new(storable.into_owned()) as Box<TypeErasedData>,
369            Box::new(|any: &dyn Any| {
370                // Attempt to downcast the `dyn Any` to its original type. If `T` was not its
371                // original type, then we want to panic because there's a compile-time issue
372                // with overlapping keys.
373                let value = any.downcast_ref::<T::Storable>().expect(
374                    "Type mismatch even though keys match. Two different\
375                                        types have the same key value",
376                );
377                value.serialize_to()
378            }),
379        )
380        .await
381    }
382
383    /// Test-only method to write directly to stash without touching the cache. This is used for
384    /// setting up data as if it existed on disk before the connection to stash was made.
385    pub async fn write_str(&self, key: &'static str, value: String) -> Result<(), Error> {
386        let typed_storage =
387            self.typed_storage_map.get(key).expect("Did not request an initialized key");
388        let cached_storage = typed_storage.cached_storage.lock().await;
389        cached_storage.stash_proxy.set_value(&prefixed(key), Value::Stringval(value))?;
390        typed_storage.flush_sender.unbounded_send(()).unwrap();
391        Ok(())
392    }
393
394    async fn get_inner(
395        &self,
396        key: &'static str,
397    ) -> (MutexGuard<'_, CachedStorage>, Option<Option<String>>) {
398        let typed_storage = self
399            .typed_storage_map
400            .get(key)
401            // TODO(https://fxbug.dev/42064613) Replace this with an error result.
402            .unwrap_or_else(|| panic!("Invalid data keyed by {key}"));
403        let cached_storage = typed_storage.cached_storage.lock().await;
404        let new = if cached_storage.current_data.is_none() || !self.caching_enabled {
405            let stash_key = prefixed(key);
406            if let Some(stash_value) = cached_storage
407                .stash_proxy
408                .get_value(&stash_key)
409                .await
410                .unwrap_or_else(|_| panic!("failed to get value from stash for {stash_key:?}"))
411            {
412                if let Value::Stringval(string_value) = *stash_value {
413                    Some(Some(string_value))
414                } else {
415                    panic!("Unexpected type for key found in stash");
416                }
417            } else {
418                Some(None)
419            }
420        } else {
421            None
422        };
423
424        (cached_storage, new)
425    }
426
427    /// Gets the latest value cached locally, or loads the value from storage.
428    /// Doesn't support multiple concurrent callers of the same struct.
429    pub async fn get<T>(&self) -> T::Storable
430    where
431        T: DeviceStorageConvertible,
432    {
433        let (mut cached_storage, update) = self.get_inner(T::Storable::KEY).await;
434        if let Some(update) = update {
435            cached_storage.current_data = Some(update.and_then(|string_value| {
436                T::Storable::try_deserialize_from(&string_value).map(|val| Box::new(val) as Box<TypeErasedData>).map_err(|e| log::error!(
437                    "Using default. Failed to deserialize type {}: {e:?}\nSource data: {string_value:?}",
438                    T::Storable::KEY
439                )).ok()
440            }).unwrap_or_else(|| Box::new(<<T::Storable as DeviceStorageCompatible>::Loader as DefaultDispatcher<T::Storable>>::get_default(self)) as Box<TypeErasedData>));
441        };
442
443        cached_storage
444            .current_data
445            .as_ref()
446            .expect("should always have a value")
447            .downcast_ref::<T::Storable>()
448            .expect(
449                "Type mismatch even though keys match. Two different types have the same key\
450                     value",
451            )
452            .clone()
453    }
454}
455
456pub trait DefaultDispatcher<T>: Sealed
457where
458    T: DeviceStorageCompatible,
459{
460    fn get_default(_: &DeviceStorage) -> T;
461}
462
463impl<T> DefaultDispatcher<T> for NoneT
464where
465    T: DeviceStorageCompatible<Loader = Self> + Default,
466{
467    fn get_default(_: &DeviceStorage) -> T {
468        T::default()
469    }
470}
471
472impl<T, L> DefaultDispatcher<T> for L
473where
474    T: DeviceStorageCompatible<Loader = L>,
475    L: DefaultLoader<Result = T> + 'static,
476{
477    fn get_default(storage: &DeviceStorage) -> T {
478        match storage.typed_loader_map.get(T::KEY) {
479            Some(loader) => match loader.downcast_ref::<T::Loader>() {
480                Some(loader) => loader.default_value(),
481                None => {
482                    panic!("Mismatch key and loader for key {}", T::KEY);
483                }
484            },
485            None => panic!("Missing loader for {}", T::KEY),
486        }
487    }
488}
489
490fn prefixed(input_string: &str) -> String {
491    format!("{SETTINGS_PREFIX}_{input_string}")
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497    use assert_matches::assert_matches;
498    use diagnostics_assertions::assert_data_tree;
499    use fidl_fuchsia_stash::{
500        FlushError, StoreAccessorMarker, StoreAccessorRequest, StoreAccessorRequestStream,
501    };
502    use fuchsia_async as fasync;
503    use fuchsia_async::TestExecutor;
504    use fuchsia_inspect::component;
505    use futures::prelude::*;
506    use serde::{Deserialize, Serialize};
507    use std::task::Poll;
508
509    const VALUE0: i32 = 3;
510    const VALUE1: i32 = 33;
511    const VALUE2: i32 = 128;
512
513    #[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
514    struct TestStruct {
515        value: i32,
516    }
517
518    const STORE_KEY: &str = "settings_testkey";
519
520    impl DeviceStorageCompatible for TestStruct {
521        type Loader = NoneT;
522        const KEY: &'static str = "testkey";
523    }
524
525    impl Default for TestStruct {
526        fn default() -> Self {
527            TestStruct { value: VALUE0 }
528        }
529    }
530
531    /// Advances `future` until `executor` finishes. Panics if the end result was a stall.
532    #[track_caller]
533    fn advance_executor<F>(executor: &mut TestExecutor, future: &mut F)
534    where
535        F: Future + Unpin,
536    {
537        assert!(executor.run_until_stalled(future).is_ready(), "TestExecutor stalled!");
538    }
539
540    /// Verifies that a SetValue call was sent to stash with the given value.
541    async fn verify_stash_set(stash_stream: &mut StoreAccessorRequestStream, expected_value: i32) {
542        match stash_stream.next().await.unwrap() {
543            Ok(StoreAccessorRequest::SetValue { key, val, control_handle: _ }) => {
544                assert_eq!(key, STORE_KEY);
545                if let Value::Stringval(string_value) = val {
546                    let input_value = TestStruct::try_deserialize_from(&string_value)
547                        .expect("deserialization should succeed");
548                    assert_eq!(input_value.value, expected_value);
549                } else {
550                    panic!("Unexpected type for key found in stash");
551                }
552            }
553            request => panic!("Unexpected request: {request:?}"),
554        }
555    }
556
557    /// Verifies that a SetValue call was sent to stash with the given value.
558    async fn validate_stash_get_and_respond(
559        stash_stream: &mut StoreAccessorRequestStream,
560        response: String,
561    ) {
562        match stash_stream.next().await.unwrap() {
563            Ok(StoreAccessorRequest::GetValue { key, responder }) => {
564                assert_eq!(key, STORE_KEY);
565                responder.send(Some(Value::Stringval(response))).expect("unable to send response");
566            }
567            request => panic!("Unexpected request: {request:?}"),
568        }
569    }
570
571    /// Verifies that a Flush call was sent to stash.
572    async fn verify_stash_flush(stash_stream: &mut StoreAccessorRequestStream) {
573        match stash_stream.next().await.unwrap() {
574            Ok(StoreAccessorRequest::Flush { responder }) => {
575                let _ = responder.send(Ok(()));
576            } // expected
577            request => panic!("Unexpected request: {request:?}"),
578        }
579    }
580
581    /// Verifies that a Flush call was sent to stash, and send back a failure.
582    async fn fail_stash_flush(stash_stream: &mut StoreAccessorRequestStream) {
583        match stash_stream.next().await.unwrap() {
584            Ok(StoreAccessorRequest::Flush { responder }) => {
585                let _ = responder.send(Err(FlushError::CommitFailed));
586            } // expected
587            request => panic!("Unexpected request: {request:?}"),
588        }
589    }
590
591    #[fuchsia::test(allow_stalls = false)]
592    async fn test_get() {
593        let (stash_proxy, mut stash_stream) =
594            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
595
596        fasync::Task::local(async move {
597            let value_to_get = TestStruct { value: VALUE1 };
598
599            #[allow(clippy::single_match)]
600            while let Some(req) = stash_stream.try_next().await.unwrap() {
601                #[allow(unreachable_patterns)]
602                match req {
603                    StoreAccessorRequest::GetValue { key, responder } => {
604                        assert_eq!(key, STORE_KEY);
605                        let response = Value::Stringval(value_to_get.serialize_to());
606
607                        responder.send(Some(response)).unwrap();
608                    }
609                    _ => {}
610                }
611            }
612        })
613        .detach();
614
615        let storage = DeviceStorage::with_stash_proxy(
616            vec![(TestStruct::KEY, None)],
617            move || stash_proxy.clone(),
618            Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
619        );
620        let result = storage.get::<TestStruct>().await;
621
622        assert_eq!(result.value, VALUE1);
623    }
624
625    #[fuchsia::test(allow_stalls = false)]
626    async fn test_get_default() {
627        let (stash_proxy, mut stash_stream) =
628            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
629
630        fasync::Task::local(async move {
631            #[allow(clippy::single_match)]
632            while let Some(req) = stash_stream.try_next().await.unwrap() {
633                #[allow(unreachable_patterns)]
634                match req {
635                    StoreAccessorRequest::GetValue { key: _, responder } => {
636                        responder.send(None).unwrap();
637                    }
638                    _ => {}
639                }
640            }
641        })
642        .detach();
643
644        let storage = DeviceStorage::with_stash_proxy(
645            vec![(TestStruct::KEY, None)],
646            move || stash_proxy.clone(),
647            Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
648        );
649        let result = storage.get::<TestStruct>().await;
650
651        assert_eq!(result.value, VALUE0);
652    }
653
654    // For an invalid stash value, the get() method should return the default value.
655    #[fuchsia::test(allow_stalls = false)]
656    async fn test_invalid_stash() {
657        let (stash_proxy, mut stash_stream) =
658            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
659
660        fasync::Task::local(async move {
661            #[allow(clippy::single_match)]
662            while let Some(req) = stash_stream.try_next().await.unwrap() {
663                #[allow(unreachable_patterns)]
664                match req {
665                    StoreAccessorRequest::GetValue { key: _, responder } => {
666                        let response = Value::Stringval("invalid value".to_string());
667                        responder.send(Some(response)).unwrap();
668                    }
669                    _ => {}
670                }
671            }
672        })
673        .detach();
674
675        let storage = DeviceStorage::with_stash_proxy(
676            vec![(TestStruct::KEY, None)],
677            move || stash_proxy.clone(),
678            Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
679        );
680
681        let result = storage.get::<TestStruct>().await;
682
683        assert_eq!(result.value, VALUE0);
684    }
685
686    // Verifies that stash flush failures are written to inspect.
687    #[fuchsia::test]
688    fn test_flush_fail_writes_to_inspect() {
689        let written_value = VALUE2;
690        let mut executor = TestExecutor::new_with_fake_time();
691
692        let (stash_proxy, mut stash_stream) =
693            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
694
695        let inspector = component::inspector();
696        let logger_handle = Rc::new(Mutex::new(StashInspectLogger::new(inspector.root())));
697        let storage = DeviceStorage::with_stash_proxy(
698            vec![(TestStruct::KEY, None)],
699            move || stash_proxy.clone(),
700            logger_handle,
701        );
702
703        // Write to device storage.
704        let value_to_write = TestStruct { value: written_value };
705        let write_future = storage.write(&value_to_write);
706        futures::pin_mut!(write_future);
707
708        // Initial cache check is done if no read was ever performed.
709        assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Pending);
710
711        {
712            let respond_future = validate_stash_get_and_respond(
713                &mut stash_stream,
714                serde_json::to_string(&TestStruct::default()).unwrap(),
715            );
716            futures::pin_mut!(respond_future);
717            advance_executor(&mut executor, &mut respond_future);
718        }
719
720        // Write request finishes immediately.
721        assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Ready(Ok(_)));
722
723        // Set request is received immediately on write.
724        {
725            let set_value_future = verify_stash_set(&mut stash_stream, written_value);
726            futures::pin_mut!(set_value_future);
727            advance_executor(&mut executor, &mut set_value_future);
728        }
729
730        // Start listening for the flush request.
731        let flush_future = fail_stash_flush(&mut stash_stream);
732        futures::pin_mut!(flush_future);
733
734        // Flush is received without a wait. Due to the way time works with executors, if there was
735        // a delay, the test would stall since time never advances.
736        advance_executor(&mut executor, &mut flush_future);
737
738        // Queue up a second write to guarantee that CachedStorage has written the failure to
739        // inspect.
740        {
741            let value_to_write = TestStruct { value: VALUE1 };
742            let write_future = storage.write(&value_to_write);
743            futures::pin_mut!(write_future);
744            assert_matches!(
745                executor.run_until_stalled(&mut write_future),
746                Poll::Ready(Result::Ok(_))
747            );
748        }
749
750        // Run all background tasks until stalled.
751        let _ = executor.run_until_stalled(&mut future::pending::<()>());
752
753        assert_data_tree!(@executor executor, inspector, root: {
754            stash_failures: {
755                testkey: {
756                    count: 1u64,
757                }
758            }
759        });
760    }
761
762    // Test that an initial write to DeviceStorage causes a SetValue and Flush to Stash
763    // without any wait.
764    #[fuchsia::test]
765    fn test_first_write_flushes_immediately() {
766        let written_value = VALUE2;
767        let mut executor = TestExecutor::new_with_fake_time();
768
769        let (stash_proxy, mut stash_stream) =
770            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
771
772        let storage = DeviceStorage::with_stash_proxy(
773            vec![(TestStruct::KEY, None)],
774            move || stash_proxy.clone(),
775            Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
776        );
777
778        // Write to device storage.
779        let value_to_write = TestStruct { value: written_value };
780        let write_future = storage.write(&value_to_write);
781        futures::pin_mut!(write_future);
782
783        // Initial cache check is done if no read was ever performed.
784        assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Pending);
785
786        {
787            let respond_future = validate_stash_get_and_respond(
788                &mut stash_stream,
789                serde_json::to_string(&TestStruct::default()).unwrap(),
790            );
791            futures::pin_mut!(respond_future);
792            advance_executor(&mut executor, &mut respond_future);
793        }
794
795        // Write request finishes immediately.
796        assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Ready(Ok(_)));
797
798        // Set request is received immediately on write.
799        {
800            let set_value_future = verify_stash_set(&mut stash_stream, written_value);
801            futures::pin_mut!(set_value_future);
802            advance_executor(&mut executor, &mut set_value_future);
803        }
804
805        // Start listening for the flush request.
806        let flush_future = verify_stash_flush(&mut stash_stream);
807        futures::pin_mut!(flush_future);
808
809        // Flush is received without a wait. Due to the way time works with executors, if there was
810        // a delay, the test would stall since time never advances.
811        advance_executor(&mut executor, &mut flush_future);
812    }
813
814    #[derive(Default, Copy, Clone, PartialEq, Serialize, Deserialize)]
815    struct WrongStruct;
816
817    impl DeviceStorageCompatible for WrongStruct {
818        type Loader = NoneT;
819        const KEY: &'static str = "WRONG_STRUCT";
820    }
821
822    // Test that an initial write to DeviceStorage causes a SetValue and Flush to Stash
823    // without any wait.
824    #[fuchsia::test(allow_stalls = false)]
825    async fn test_write_with_mismatch_type_returns_error() {
826        let (stash_proxy, mut stream) =
827            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
828
829        let spawned = fasync::Task::local(async move {
830            while let Some(request) = stream.next().await {
831                match request {
832                    Ok(StoreAccessorRequest::GetValue { key, responder }) => {
833                        assert_eq!(key, STORE_KEY);
834                        let _ = responder.send(Some(Value::Stringval(
835                            serde_json::to_string(&TestStruct { value: VALUE2 }).unwrap(),
836                        )));
837                    }
838                    Ok(StoreAccessorRequest::SetValue { key, .. }) => {
839                        assert_eq!(key, STORE_KEY);
840                    }
841                    _ => panic!("Unexpected request {request:?}"),
842                }
843            }
844        });
845
846        let storage = DeviceStorage::with_stash_proxy(
847            vec![(TestStruct::KEY, None)],
848            move || stash_proxy.clone(),
849            Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
850        );
851
852        // Write successfully to storage once.
853        let result = storage.write(&TestStruct { value: VALUE2 }).await;
854        assert!(result.is_ok());
855
856        // Write to device storage again with a different type to validate that the type can't
857        // be changed.
858        let result = storage.write(&WrongStruct).await;
859        assert_matches!(result, Err(e) if e.to_string() == "Invalid data keyed by WRONG_STRUCT");
860
861        drop(storage);
862        spawned.await;
863    }
864
865    // Test that multiple writes to DeviceStorage will cause a SetValue each time, but will only
866    // Flush to Stash at an interval.
867    #[fuchsia::test]
868    fn test_multiple_write_debounce() {
869        // Custom executor for this test so that we can advance the clock arbitrarily and verify the
870        // state of the executor at any given point.
871        let mut executor = TestExecutor::new_with_fake_time();
872        let start_time = MonotonicInstant::from_nanos(0);
873        executor.set_fake_time(start_time);
874
875        let (stash_proxy, mut stash_stream) =
876            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
877
878        let storage = DeviceStorage::with_stash_proxy(
879            vec![(TestStruct::KEY, None)],
880            move || stash_proxy.clone(),
881            Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
882        );
883
884        let first_value = VALUE1;
885        let second_value = VALUE2;
886
887        // First write finishes immediately.
888        {
889            let value_to_write = TestStruct { value: first_value };
890            let write_future = storage.write(&value_to_write);
891            futures::pin_mut!(write_future);
892
893            // Initial cache check is done if no read was ever performed.
894            assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Pending);
895
896            {
897                let respond_future = validate_stash_get_and_respond(
898                    &mut stash_stream,
899                    serde_json::to_string(&TestStruct::default()).unwrap(),
900                );
901                futures::pin_mut!(respond_future);
902                advance_executor(&mut executor, &mut respond_future);
903            }
904
905            assert_matches!(
906                executor.run_until_stalled(&mut write_future),
907                Poll::Ready(Result::Ok(_))
908            );
909        }
910
911        // First set request is received immediately on write.
912        {
913            let set_value_future = verify_stash_set(&mut stash_stream, first_value);
914            futures::pin_mut!(set_value_future);
915            advance_executor(&mut executor, &mut set_value_future);
916        }
917
918        // First flush request is received.
919        {
920            let flush_future = verify_stash_flush(&mut stash_stream);
921            futures::pin_mut!(flush_future);
922            advance_executor(&mut executor, &mut flush_future);
923        }
924
925        // Now we repeat the process with a second write request, which will need to advance the
926        // fake time due to the timer.
927
928        // Second write finishes immediately.
929        {
930            let value_to_write = TestStruct { value: second_value };
931            let write_future = storage.write(&value_to_write);
932            futures::pin_mut!(write_future);
933            assert_matches!(
934                executor.run_until_stalled(&mut write_future),
935                Poll::Ready(Result::Ok(_))
936            );
937        }
938
939        // Second set request finishes immediately on write.
940        {
941            let set_value_future = verify_stash_set(&mut stash_stream, second_value);
942            futures::pin_mut!(set_value_future);
943            advance_executor(&mut executor, &mut set_value_future);
944        }
945
946        // Start waiting for flush request.
947        let flush_future = verify_stash_flush(&mut stash_stream);
948        futures::pin_mut!(flush_future);
949
950        // TextExecutor stalls due to waiting on timer to finish.
951        assert_matches!(executor.run_until_stalled(&mut flush_future), Poll::Pending);
952
953        // Advance time to 1ms before the flush triggers.
954        executor
955            .set_fake_time(start_time + (MIN_FLUSH_INTERVAL - MonotonicDuration::from_millis(1)));
956
957        // TextExecutor is still waiting on the time to finish.
958        assert_matches!(executor.run_until_stalled(&mut flush_future), Poll::Pending);
959
960        // Advance time so that the flush will trigger.
961        executor.set_fake_time(start_time + MIN_FLUSH_INTERVAL);
962
963        // Stash receives a flush request after one timer cycle and the future terminates.
964        advance_executor(&mut executor, &mut flush_future);
965    }
966
967    // This mod includes structs to only be used by
968    // test_device_compatible_migration tests.
969    mod test_device_compatible_migration {
970        use super::*;
971        use serde::{Deserialize, Serialize};
972
973        pub(crate) const DEFAULT_V1_VALUE: i32 = 1;
974        pub(crate) const DEFAULT_CURRENT_VALUE: i32 = 2;
975        pub(crate) const DEFAULT_CURRENT_VALUE_2: i32 = 3;
976
977        #[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
978        pub(crate) struct V1 {
979            pub value: i32,
980        }
981
982        impl DeviceStorageCompatible for V1 {
983            type Loader = NoneT;
984            const KEY: &'static str = "testkey";
985        }
986
987        impl Default for V1 {
988            fn default() -> Self {
989                Self { value: DEFAULT_V1_VALUE }
990            }
991        }
992
993        #[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
994        pub(crate) struct Current {
995            pub value: i32,
996            pub value_2: i32,
997        }
998
999        impl From<V1> for Current {
1000            fn from(v1: V1) -> Self {
1001                Current { value: v1.value, value_2: DEFAULT_CURRENT_VALUE_2 }
1002            }
1003        }
1004
1005        impl DeviceStorageCompatible for Current {
1006            type Loader = NoneT;
1007            const KEY: &'static str = "testkey2";
1008
1009            fn try_deserialize_from(value: &str) -> Result<Self, Error> {
1010                Self::extract(value).or_else(|_| V1::extract(value).map(Self::from))
1011            }
1012        }
1013
1014        impl Default for Current {
1015            fn default() -> Self {
1016                Self { value: DEFAULT_CURRENT_VALUE, value_2: DEFAULT_CURRENT_VALUE_2 }
1017            }
1018        }
1019    }
1020
1021    #[fuchsia::test]
1022    fn test_device_compatible_custom_migration() {
1023        // Create an initial struct based on the first version.
1024        let initial = test_device_compatible_migration::V1::default();
1025        // Serialize.
1026        let initial_serialized = initial.serialize_to();
1027
1028        // Deserialize using the second version.
1029        let current =
1030            test_device_compatible_migration::Current::try_deserialize_from(&initial_serialized)
1031                .expect("deserialization should succeed");
1032        // Assert values carried over from first version and defaults are used for rest.
1033        assert_eq!(current.value, test_device_compatible_migration::DEFAULT_V1_VALUE);
1034        assert_eq!(current.value_2, test_device_compatible_migration::DEFAULT_CURRENT_VALUE_2);
1035    }
1036
1037    #[fuchsia::test(allow_stalls = false)]
1038    async fn test_corrupt_get_returns_default() {
1039        let (stash_proxy, mut stash_stream) =
1040            fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
1041
1042        fasync::Task::local(async move {
1043            #[allow(clippy::single_match)]
1044            while let Some(req) = stash_stream.try_next().await.unwrap() {
1045                #[allow(unreachable_patterns)]
1046                match req {
1047                    StoreAccessorRequest::GetValue { key, responder } => {
1048                        assert_eq!(
1049                            key,
1050                            format!("settings_{}", test_device_compatible_migration::Current::KEY)
1051                        );
1052                        let response = Value::Stringval("bad json".to_string());
1053                        responder.send(Some(response)).unwrap();
1054                    }
1055                    _ => {}
1056                }
1057            }
1058        })
1059        .detach();
1060
1061        let storage = DeviceStorage::with_stash_proxy(
1062            vec![(test_device_compatible_migration::Current::KEY, None)],
1063            move || stash_proxy.clone(),
1064            Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
1065        );
1066        let current = storage.get::<test_device_compatible_migration::Current>().await;
1067
1068        assert_eq!(current.value, test_device_compatible_migration::DEFAULT_CURRENT_VALUE);
1069        assert_eq!(current.value_2, test_device_compatible_migration::DEFAULT_CURRENT_VALUE_2);
1070    }
1071}