1use crate::private::Sealed;
6use crate::stash_logger::StashInspectLogger;
7use crate::storage_factory::{DefaultLoader, NoneT};
8use crate::UpdateState;
9use anyhow::{format_err, Context, Error};
10use fidl_fuchsia_stash::{StoreAccessorProxy, Value};
11use fuchsia_async::{MonotonicDuration, MonotonicInstant, Task, Timer};
12use futures::channel::mpsc::UnboundedSender;
13use futures::future::OptionFuture;
14use futures::lock::{Mutex, MutexGuard};
15use futures::{FutureExt, StreamExt};
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18use std::any::Any;
19use std::borrow::Cow;
20use std::collections::HashMap;
21use std::pin::pin;
22use std::rc::Rc;
23
24const SETTINGS_PREFIX: &str = "settings";
25
26const MIN_FLUSH_INTERVAL: MonotonicDuration = MonotonicDuration::from_millis(500);
30
31pub struct DeviceStorage {
34 typed_storage_map: HashMap<&'static str, TypedStorage>,
36
37 typed_loader_map: HashMap<&'static str, Box<TypeErasedLoader>>,
38
39 caching_enabled: bool,
41
42 debounce_writes: bool,
45
46 inspect_handle: Rc<Mutex<StashInspectLogger>>,
48}
49
50struct TypedStorage {
53 flush_sender: UnboundedSender<()>,
55
56 cached_storage: Mutex<CachedStorage>,
58}
59
60struct CachedStorage {
63 current_data: Option<Box<TypeErasedData>>,
65
66 stash_proxy: StoreAccessorProxy,
68}
69
70pub trait DeviceStorageCompatible: Serialize + DeserializeOwned + Clone + PartialEq + Any {
84 type Loader: DefaultDispatcher<Self>;
85
86 fn try_deserialize_from(value: &str) -> Result<Self, Error> {
87 Self::extract(value)
88 }
89
90 fn extract(value: &str) -> Result<Self, Error> {
91 serde_json::from_str(value).map_err(|e| format_err!("could not deserialize: {e:?}"))
92 }
93
94 fn serialize_to(&self) -> String {
95 serde_json::to_string(self).expect("value should serialize")
96 }
97
98 const KEY: &'static str;
99}
100
101pub trait DeviceStorageConvertible: Sized {
156 type Storable: DeviceStorageCompatible + Into<Self>;
158
159 fn get_storable(&self) -> Cow<'_, Self::Storable>;
166}
167
168impl<T> DeviceStorageConvertible for T
170where
171 T: DeviceStorageCompatible,
172{
173 type Storable = T;
174
175 fn get_storable(&self) -> Cow<'_, Self::Storable> {
176 Cow::Borrowed(self)
177 }
178}
179
180type MappingFn = Box<dyn FnOnce(&(dyn Any)) -> String>;
181type TypeErasedData = dyn Any;
182type TypeErasedLoader = dyn Any;
183
184impl DeviceStorage {
185 pub fn with_stash_proxy<I, G>(
188 iter: I,
189 stash_generator: G,
190 inspect_handle: Rc<Mutex<StashInspectLogger>>,
191 ) -> Self
192 where
193 I: IntoIterator<Item = (&'static str, Option<Box<TypeErasedLoader>>)>,
194 G: Fn() -> StoreAccessorProxy,
195 {
196 let mut typed_loader_map = HashMap::new();
197 let typed_storage_map = iter
198 .into_iter()
199 .map({
200 let inspect_handle = Rc::clone(&inspect_handle);
201 let typed_loader_map = &mut typed_loader_map;
202 move |(key, loader)| {
203 if let Some(loader) = loader {
204 let _ = typed_loader_map.insert(key, loader);
205 }
206 let (flush_sender, flush_receiver) = futures::channel::mpsc::unbounded::<()>();
208 let stash_proxy = stash_generator();
209
210 let storage = TypedStorage {
211 flush_sender,
212 cached_storage: Mutex::new(CachedStorage {
213 current_data: None,
214 stash_proxy: stash_proxy.clone(),
215 }),
216 };
217
218 let inspect_handle = Rc::clone(&inspect_handle);
219 Task::local(async move {
221 let mut next_allowed_flush = MonotonicInstant::now();
222 let mut next_flush_timer = pin!(OptionFuture::from(None).fuse());
223 let flush_requested = flush_receiver.fuse();
224 futures::pin_mut!(flush_requested);
225 loop {
226 futures::select! {
227 () = flush_requested.select_next_some() => {
228 next_flush_timer.set(OptionFuture::from(Some(Timer::new(
229 next_allowed_flush
230 )))
231 .fuse());
232 },
233 o = next_flush_timer => {
234 if let Some(()) = o {
235 DeviceStorage::stash_flush(
236 &stash_proxy,
237 Rc::clone(&inspect_handle),
238 key.to_string()).await;
239 next_allowed_flush = MonotonicInstant::now() + MIN_FLUSH_INTERVAL;
240 }
241 }
242 complete => break,
243 }
244 }
245 })
246 .detach();
247 (key, storage)
248 }
249 })
250 .collect();
251 DeviceStorage {
252 caching_enabled: true,
253 debounce_writes: true,
254 typed_storage_map,
255 typed_loader_map,
256 inspect_handle,
257 }
258 }
259
260 pub fn set_caching_enabled(&mut self, enabled: bool) {
262 self.caching_enabled = enabled;
263 }
264
265 pub fn set_debounce_writes(&mut self, debounce: bool) {
267 self.debounce_writes = debounce;
268 }
269
270 async fn stash_flush(
272 stash_proxy: &StoreAccessorProxy,
273 inspect_handle: Rc<Mutex<StashInspectLogger>>,
274 setting_key: String,
275 ) {
276 let flush_result = stash_proxy.flush().await;
277 match flush_result {
278 Ok(Err(err)) => {
279 Self::handle_flush_failure(inspect_handle, setting_key, format!("{:?}", err)).await;
280 }
281 Err(err) => {
282 Self::handle_flush_failure(inspect_handle, setting_key, format!("{:?}", err)).await;
283 }
284 _ => {}
285 }
286 }
287
288 async fn handle_flush_failure(
289 inspect_handle: Rc<Mutex<StashInspectLogger>>,
290 setting_key: String,
291 err: String,
292 ) {
293 log::error!("Failed to flush to stash: {:?}", err);
294
295 inspect_handle.lock().await.record_flush_failure(setting_key);
297 }
298
299 async fn inner_write(
300 &self,
301 key: &'static str,
302 new_value: String,
303 data_as_any: Box<TypeErasedData>,
304 mapping_fn: MappingFn,
305 ) -> Result<UpdateState, Error> {
306 let typed_storage = self
307 .typed_storage_map
308 .get(key)
309 .ok_or_else(|| format_err!("Invalid data keyed by {}", key))?;
310 let mut cached_storage = typed_storage.cached_storage.lock().await;
311 let mut maybe_init;
312 let cached_value = {
313 maybe_init = cached_storage
314 .current_data
315 .as_deref()
316 .map(mapping_fn);
318 if maybe_init.is_none() {
319 let stash_key = prefixed(key);
320 if let Some(stash_value) =
321 cached_storage.stash_proxy.get_value(&stash_key).await.unwrap_or_else(|_| {
322 panic!("failed to get value from stash for {stash_key:?}")
323 })
324 {
325 if let Value::Stringval(string_value) = &*stash_value {
326 maybe_init = Some(string_value.clone());
327 } else {
328 panic!("Unexpected type for key found in stash");
329 }
330 }
331 }
332 maybe_init.as_ref()
333 };
334
335 Ok(if cached_value != Some(&new_value) {
336 let serialized = Value::Stringval(new_value);
337 let key = prefixed(key);
338 cached_storage.stash_proxy.set_value(&key, serialized)?;
339 if !self.debounce_writes {
340 DeviceStorage::stash_flush(
342 &cached_storage.stash_proxy,
343 Rc::clone(&self.inspect_handle),
344 key,
345 )
346 .await;
347 } else {
348 typed_storage.flush_sender.unbounded_send(()).with_context(|| {
349 format!("flush_sender failed to send flush message, associated key is {key}")
350 })?;
351 }
352 cached_storage.current_data = Some(data_as_any);
353 UpdateState::Updated
354 } else {
355 UpdateState::Unchanged
356 })
357 }
358
359 pub async fn write<T>(&self, new_value: &T) -> Result<UpdateState, Error>
361 where
362 T: DeviceStorageCompatible,
363 {
364 self.inner_write(
365 T::KEY,
366 new_value.serialize_to(),
367 Box::new(new_value.clone()) as Box<TypeErasedData>,
368 Box::new(|any: &(dyn Any)| {
369 let value = any.downcast_ref::<T>().expect(
373 "Type mismatch even though keys match. Two different\
374 types have the same key value",
375 );
376 value.serialize_to()
377 }),
378 )
379 .await
380 }
381
382 pub async fn write_str(&self, key: &'static str, value: String) -> Result<(), Error> {
385 let typed_storage =
386 self.typed_storage_map.get(key).expect("Did not request an initialized key");
387 let cached_storage = typed_storage.cached_storage.lock().await;
388 cached_storage.stash_proxy.set_value(&prefixed(key), Value::Stringval(value))?;
389 typed_storage.flush_sender.unbounded_send(()).unwrap();
390 Ok(())
391 }
392
393 async fn get_inner(
394 &self,
395 key: &'static str,
396 ) -> (MutexGuard<'_, CachedStorage>, Option<Option<String>>) {
397 let typed_storage = self
398 .typed_storage_map
399 .get(key)
400 .unwrap_or_else(|| panic!("Invalid data keyed by {key}"));
402 let cached_storage = typed_storage.cached_storage.lock().await;
403 let new = if cached_storage.current_data.is_none() || !self.caching_enabled {
404 let stash_key = prefixed(key);
405 if let Some(stash_value) = cached_storage
406 .stash_proxy
407 .get_value(&stash_key)
408 .await
409 .unwrap_or_else(|_| panic!("failed to get value from stash for {stash_key:?}"))
410 {
411 if let Value::Stringval(string_value) = *stash_value {
412 Some(Some(string_value))
413 } else {
414 panic!("Unexpected type for key found in stash");
415 }
416 } else {
417 Some(None)
418 }
419 } else {
420 None
421 };
422
423 (cached_storage, new)
424 }
425
426 pub async fn get<T>(&self) -> T
429 where
430 T: DeviceStorageCompatible,
431 {
432 let (mut cached_storage, update) = self.get_inner(T::KEY).await;
433 if let Some(update) = update {
434 cached_storage.current_data = Some(update.and_then(|string_value| {
435 T::try_deserialize_from(&string_value).map(|val| Box::new(val) as Box<TypeErasedData>).map_err(|e| log::error!(
436 "Using default. Failed to deserialize type {}: {e:?}\nSource data: {string_value:?}",
437 T::KEY
438 )).ok()
439 }).unwrap_or_else(|| Box::new(<T::Loader as DefaultDispatcher<T>>::get_default(self)) as Box<TypeErasedData>));
440 };
441
442 cached_storage
443 .current_data
444 .as_ref()
445 .expect("should always have a value")
446 .downcast_ref::<T>()
447 .expect(
448 "Type mismatch even though keys match. Two different types have the same key\
449 value",
450 )
451 .clone()
452 }
453}
454
455pub trait DefaultDispatcher<T>: Sealed
456where
457 T: DeviceStorageCompatible,
458{
459 fn get_default(_: &DeviceStorage) -> T;
460}
461
462impl<T> DefaultDispatcher<T> for NoneT
463where
464 T: DeviceStorageCompatible<Loader = Self> + Default,
465{
466 fn get_default(_: &DeviceStorage) -> T {
467 T::default()
468 }
469}
470
471impl<T, L> DefaultDispatcher<T> for L
472where
473 T: DeviceStorageCompatible<Loader = L>,
474 L: DefaultLoader<Result = T> + 'static,
475{
476 fn get_default(storage: &DeviceStorage) -> T {
477 match storage.typed_loader_map.get(T::KEY) {
478 Some(loader) => match loader.downcast_ref::<T::Loader>() {
479 Some(loader) => loader.default_value(),
480 None => {
481 panic!("Mismatch key and loader for key {}", T::KEY);
482 }
483 },
484 None => panic!("Missing loader for {}", T::KEY),
485 }
486 }
487}
488
489fn prefixed(input_string: &str) -> String {
490 format!("{SETTINGS_PREFIX}_{input_string}")
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496 use assert_matches::assert_matches;
497 use diagnostics_assertions::assert_data_tree;
498 use fidl_fuchsia_stash::{
499 FlushError, StoreAccessorMarker, StoreAccessorRequest, StoreAccessorRequestStream,
500 };
501 use fuchsia_async as fasync;
502 use fuchsia_async::TestExecutor;
503 use fuchsia_inspect::component;
504 use futures::prelude::*;
505 use serde::{Deserialize, Serialize};
506 use std::task::Poll;
507
508 const VALUE0: i32 = 3;
509 const VALUE1: i32 = 33;
510 const VALUE2: i32 = 128;
511
512 #[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
513 struct TestStruct {
514 value: i32,
515 }
516
517 const STORE_KEY: &str = "settings_testkey";
518
519 impl DeviceStorageCompatible for TestStruct {
520 type Loader = NoneT;
521 const KEY: &'static str = "testkey";
522 }
523
524 impl Default for TestStruct {
525 fn default() -> Self {
526 TestStruct { value: VALUE0 }
527 }
528 }
529
530 #[track_caller]
532 fn advance_executor<F>(executor: &mut TestExecutor, future: &mut F)
533 where
534 F: Future + Unpin,
535 {
536 assert!(executor.run_until_stalled(future).is_ready(), "TestExecutor stalled!");
537 }
538
539 async fn verify_stash_set(stash_stream: &mut StoreAccessorRequestStream, expected_value: i32) {
541 match stash_stream.next().await.unwrap() {
542 Ok(StoreAccessorRequest::SetValue { key, val, control_handle: _ }) => {
543 assert_eq!(key, STORE_KEY);
544 if let Value::Stringval(string_value) = val {
545 let input_value = TestStruct::try_deserialize_from(&string_value)
546 .expect("deserialization should succeed");
547 assert_eq!(input_value.value, expected_value);
548 } else {
549 panic!("Unexpected type for key found in stash");
550 }
551 }
552 request => panic!("Unexpected request: {request:?}"),
553 }
554 }
555
556 async fn validate_stash_get_and_respond(
558 stash_stream: &mut StoreAccessorRequestStream,
559 response: String,
560 ) {
561 match stash_stream.next().await.unwrap() {
562 Ok(StoreAccessorRequest::GetValue { key, responder }) => {
563 assert_eq!(key, STORE_KEY);
564 responder.send(Some(Value::Stringval(response))).expect("unable to send response");
565 }
566 request => panic!("Unexpected request: {request:?}"),
567 }
568 }
569
570 async fn verify_stash_flush(stash_stream: &mut StoreAccessorRequestStream) {
572 match stash_stream.next().await.unwrap() {
573 Ok(StoreAccessorRequest::Flush { responder }) => {
574 let _ = responder.send(Ok(()));
575 } request => panic!("Unexpected request: {request:?}"),
577 }
578 }
579
580 async fn fail_stash_flush(stash_stream: &mut StoreAccessorRequestStream) {
582 match stash_stream.next().await.unwrap() {
583 Ok(StoreAccessorRequest::Flush { responder }) => {
584 let _ = responder.send(Err(FlushError::CommitFailed));
585 } request => panic!("Unexpected request: {request:?}"),
587 }
588 }
589
590 #[fuchsia::test(allow_stalls = false)]
591 async fn test_get() {
592 let (stash_proxy, mut stash_stream) =
593 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
594
595 fasync::Task::local(async move {
596 let value_to_get = TestStruct { value: VALUE1 };
597
598 #[allow(clippy::single_match)]
599 while let Some(req) = stash_stream.try_next().await.unwrap() {
600 #[allow(unreachable_patterns)]
601 match req {
602 StoreAccessorRequest::GetValue { key, responder } => {
603 assert_eq!(key, STORE_KEY);
604 let response = Value::Stringval(value_to_get.serialize_to());
605
606 responder.send(Some(response)).unwrap();
607 }
608 _ => {}
609 }
610 }
611 })
612 .detach();
613
614 let storage = DeviceStorage::with_stash_proxy(
615 vec![(TestStruct::KEY, None)],
616 move || stash_proxy.clone(),
617 Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
618 );
619 let result = storage.get::<TestStruct>().await;
620
621 assert_eq!(result.value, VALUE1);
622 }
623
624 #[fuchsia::test(allow_stalls = false)]
625 async fn test_get_default() {
626 let (stash_proxy, mut stash_stream) =
627 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
628
629 fasync::Task::local(async move {
630 #[allow(clippy::single_match)]
631 while let Some(req) = stash_stream.try_next().await.unwrap() {
632 #[allow(unreachable_patterns)]
633 match req {
634 StoreAccessorRequest::GetValue { key: _, responder } => {
635 responder.send(None).unwrap();
636 }
637 _ => {}
638 }
639 }
640 })
641 .detach();
642
643 let storage = DeviceStorage::with_stash_proxy(
644 vec![(TestStruct::KEY, None)],
645 move || stash_proxy.clone(),
646 Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
647 );
648 let result = storage.get::<TestStruct>().await;
649
650 assert_eq!(result.value, VALUE0);
651 }
652
653 #[fuchsia::test(allow_stalls = false)]
655 async fn test_invalid_stash() {
656 let (stash_proxy, mut stash_stream) =
657 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
658
659 fasync::Task::local(async move {
660 #[allow(clippy::single_match)]
661 while let Some(req) = stash_stream.try_next().await.unwrap() {
662 #[allow(unreachable_patterns)]
663 match req {
664 StoreAccessorRequest::GetValue { key: _, responder } => {
665 let response = Value::Stringval("invalid value".to_string());
666 responder.send(Some(response)).unwrap();
667 }
668 _ => {}
669 }
670 }
671 })
672 .detach();
673
674 let storage = DeviceStorage::with_stash_proxy(
675 vec![(TestStruct::KEY, None)],
676 move || stash_proxy.clone(),
677 Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
678 );
679
680 let result = storage.get::<TestStruct>().await;
681
682 assert_eq!(result.value, VALUE0);
683 }
684
685 #[fuchsia::test]
687 fn test_flush_fail_writes_to_inspect() {
688 let written_value = VALUE2;
689 let mut executor = TestExecutor::new_with_fake_time();
690
691 let (stash_proxy, mut stash_stream) =
692 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
693
694 let inspector = component::inspector();
695 let logger_handle = Rc::new(Mutex::new(StashInspectLogger::new(inspector.root())));
696 let storage = DeviceStorage::with_stash_proxy(
697 vec![(TestStruct::KEY, None)],
698 move || stash_proxy.clone(),
699 logger_handle,
700 );
701
702 let value_to_write = TestStruct { value: written_value };
704 let write_future = storage.write(&value_to_write);
705 futures::pin_mut!(write_future);
706
707 assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Pending);
709
710 {
711 let respond_future = validate_stash_get_and_respond(
712 &mut stash_stream,
713 serde_json::to_string(&TestStruct::default()).unwrap(),
714 );
715 futures::pin_mut!(respond_future);
716 advance_executor(&mut executor, &mut respond_future);
717 }
718
719 assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Ready(Ok(_)));
721
722 {
724 let set_value_future = verify_stash_set(&mut stash_stream, written_value);
725 futures::pin_mut!(set_value_future);
726 advance_executor(&mut executor, &mut set_value_future);
727 }
728
729 let flush_future = fail_stash_flush(&mut stash_stream);
731 futures::pin_mut!(flush_future);
732
733 advance_executor(&mut executor, &mut flush_future);
736
737 {
740 let value_to_write = TestStruct { value: VALUE1 };
741 let write_future = storage.write(&value_to_write);
742 futures::pin_mut!(write_future);
743 assert_matches!(
744 executor.run_until_stalled(&mut write_future),
745 Poll::Ready(Result::Ok(_))
746 );
747 }
748
749 let _ = executor.run_until_stalled(&mut future::pending::<()>());
751
752 assert_data_tree!(inspector, root: {
753 stash_failures: {
754 testkey: {
755 count: 1u64,
756 }
757 }
758 });
759 }
760
761 #[fuchsia::test]
764 fn test_first_write_flushes_immediately() {
765 let written_value = VALUE2;
766 let mut executor = TestExecutor::new_with_fake_time();
767
768 let (stash_proxy, mut stash_stream) =
769 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
770
771 let storage = DeviceStorage::with_stash_proxy(
772 vec![(TestStruct::KEY, None)],
773 move || stash_proxy.clone(),
774 Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
775 );
776
777 let value_to_write = TestStruct { value: written_value };
779 let write_future = storage.write(&value_to_write);
780 futures::pin_mut!(write_future);
781
782 assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Pending);
784
785 {
786 let respond_future = validate_stash_get_and_respond(
787 &mut stash_stream,
788 serde_json::to_string(&TestStruct::default()).unwrap(),
789 );
790 futures::pin_mut!(respond_future);
791 advance_executor(&mut executor, &mut respond_future);
792 }
793
794 assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Ready(Ok(_)));
796
797 {
799 let set_value_future = verify_stash_set(&mut stash_stream, written_value);
800 futures::pin_mut!(set_value_future);
801 advance_executor(&mut executor, &mut set_value_future);
802 }
803
804 let flush_future = verify_stash_flush(&mut stash_stream);
806 futures::pin_mut!(flush_future);
807
808 advance_executor(&mut executor, &mut flush_future);
811 }
812
813 #[derive(Default, Copy, Clone, PartialEq, Serialize, Deserialize)]
814 struct WrongStruct;
815
816 impl DeviceStorageCompatible for WrongStruct {
817 type Loader = NoneT;
818 const KEY: &'static str = "WRONG_STRUCT";
819 }
820
821 #[fuchsia::test(allow_stalls = false)]
824 async fn test_write_with_mismatch_type_returns_error() {
825 let (stash_proxy, mut stream) =
826 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
827
828 let spawned = fasync::Task::local(async move {
829 while let Some(request) = stream.next().await {
830 match request {
831 Ok(StoreAccessorRequest::GetValue { key, responder }) => {
832 assert_eq!(key, STORE_KEY);
833 let _ = responder.send(Some(Value::Stringval(
834 serde_json::to_string(&TestStruct { value: VALUE2 }).unwrap(),
835 )));
836 }
837 Ok(StoreAccessorRequest::SetValue { key, .. }) => {
838 assert_eq!(key, STORE_KEY);
839 }
840 _ => panic!("Unexpected request {request:?}"),
841 }
842 }
843 });
844
845 let storage = DeviceStorage::with_stash_proxy(
846 vec![(TestStruct::KEY, None)],
847 move || stash_proxy.clone(),
848 Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
849 );
850
851 let result = storage.write(&TestStruct { value: VALUE2 }).await;
853 assert!(result.is_ok());
854
855 let result = storage.write(&WrongStruct).await;
858 assert_matches!(result, Err(e) if e.to_string() == "Invalid data keyed by WRONG_STRUCT");
859
860 drop(storage);
861 spawned.await;
862 }
863
864 #[fuchsia::test]
867 fn test_multiple_write_debounce() {
868 let mut executor = TestExecutor::new_with_fake_time();
871 let start_time = MonotonicInstant::from_nanos(0);
872 executor.set_fake_time(start_time);
873
874 let (stash_proxy, mut stash_stream) =
875 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
876
877 let storage = DeviceStorage::with_stash_proxy(
878 vec![(TestStruct::KEY, None)],
879 move || stash_proxy.clone(),
880 Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
881 );
882
883 let first_value = VALUE1;
884 let second_value = VALUE2;
885
886 {
888 let value_to_write = TestStruct { value: first_value };
889 let write_future = storage.write(&value_to_write);
890 futures::pin_mut!(write_future);
891
892 assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Pending);
894
895 {
896 let respond_future = validate_stash_get_and_respond(
897 &mut stash_stream,
898 serde_json::to_string(&TestStruct::default()).unwrap(),
899 );
900 futures::pin_mut!(respond_future);
901 advance_executor(&mut executor, &mut respond_future);
902 }
903
904 assert_matches!(
905 executor.run_until_stalled(&mut write_future),
906 Poll::Ready(Result::Ok(_))
907 );
908 }
909
910 {
912 let set_value_future = verify_stash_set(&mut stash_stream, first_value);
913 futures::pin_mut!(set_value_future);
914 advance_executor(&mut executor, &mut set_value_future);
915 }
916
917 {
919 let flush_future = verify_stash_flush(&mut stash_stream);
920 futures::pin_mut!(flush_future);
921 advance_executor(&mut executor, &mut flush_future);
922 }
923
924 {
929 let value_to_write = TestStruct { value: second_value };
930 let write_future = storage.write(&value_to_write);
931 futures::pin_mut!(write_future);
932 assert_matches!(
933 executor.run_until_stalled(&mut write_future),
934 Poll::Ready(Result::Ok(_))
935 );
936 }
937
938 {
940 let set_value_future = verify_stash_set(&mut stash_stream, second_value);
941 futures::pin_mut!(set_value_future);
942 advance_executor(&mut executor, &mut set_value_future);
943 }
944
945 let flush_future = verify_stash_flush(&mut stash_stream);
947 futures::pin_mut!(flush_future);
948
949 assert_matches!(executor.run_until_stalled(&mut flush_future), Poll::Pending);
951
952 executor
954 .set_fake_time(start_time + (MIN_FLUSH_INTERVAL - MonotonicDuration::from_millis(1)));
955
956 assert_matches!(executor.run_until_stalled(&mut flush_future), Poll::Pending);
958
959 executor.set_fake_time(start_time + MIN_FLUSH_INTERVAL);
961
962 advance_executor(&mut executor, &mut flush_future);
964 }
965
966 mod test_device_compatible_migration {
969 use super::*;
970 use serde::{Deserialize, Serialize};
971
972 pub(crate) const DEFAULT_V1_VALUE: i32 = 1;
973 pub(crate) const DEFAULT_CURRENT_VALUE: i32 = 2;
974 pub(crate) const DEFAULT_CURRENT_VALUE_2: i32 = 3;
975
976 #[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
977 pub(crate) struct V1 {
978 pub value: i32,
979 }
980
981 impl DeviceStorageCompatible for V1 {
982 type Loader = NoneT;
983 const KEY: &'static str = "testkey";
984 }
985
986 impl Default for V1 {
987 fn default() -> Self {
988 Self { value: DEFAULT_V1_VALUE }
989 }
990 }
991
992 #[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
993 pub(crate) struct Current {
994 pub value: i32,
995 pub value_2: i32,
996 }
997
998 impl From<V1> for Current {
999 fn from(v1: V1) -> Self {
1000 Current { value: v1.value, value_2: DEFAULT_CURRENT_VALUE_2 }
1001 }
1002 }
1003
1004 impl DeviceStorageCompatible for Current {
1005 type Loader = NoneT;
1006 const KEY: &'static str = "testkey2";
1007
1008 fn try_deserialize_from(value: &str) -> Result<Self, Error> {
1009 Self::extract(value).or_else(|_| V1::extract(value).map(Self::from))
1010 }
1011 }
1012
1013 impl Default for Current {
1014 fn default() -> Self {
1015 Self { value: DEFAULT_CURRENT_VALUE, value_2: DEFAULT_CURRENT_VALUE_2 }
1016 }
1017 }
1018 }
1019
1020 #[fuchsia::test]
1021 fn test_device_compatible_custom_migration() {
1022 let initial = test_device_compatible_migration::V1::default();
1024 let initial_serialized = initial.serialize_to();
1026
1027 let current =
1029 test_device_compatible_migration::Current::try_deserialize_from(&initial_serialized)
1030 .expect("deserialization should succeed");
1031 assert_eq!(current.value, test_device_compatible_migration::DEFAULT_V1_VALUE);
1033 assert_eq!(current.value_2, test_device_compatible_migration::DEFAULT_CURRENT_VALUE_2);
1034 }
1035
1036 #[fuchsia::test(allow_stalls = false)]
1037 async fn test_corrupt_get_returns_default() {
1038 let (stash_proxy, mut stash_stream) =
1039 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
1040
1041 fasync::Task::local(async move {
1042 #[allow(clippy::single_match)]
1043 while let Some(req) = stash_stream.try_next().await.unwrap() {
1044 #[allow(unreachable_patterns)]
1045 match req {
1046 StoreAccessorRequest::GetValue { key, responder } => {
1047 assert_eq!(
1048 key,
1049 format!("settings_{}", test_device_compatible_migration::Current::KEY)
1050 );
1051 let response = Value::Stringval("bad json".to_string());
1052 responder.send(Some(response)).unwrap();
1053 }
1054 _ => {}
1055 }
1056 }
1057 })
1058 .detach();
1059
1060 let storage = DeviceStorage::with_stash_proxy(
1061 vec![(test_device_compatible_migration::Current::KEY, None)],
1062 move || stash_proxy.clone(),
1063 Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
1064 );
1065 let current = storage.get::<test_device_compatible_migration::Current>().await;
1066
1067 assert_eq!(current.value, test_device_compatible_migration::DEFAULT_CURRENT_VALUE);
1068 assert_eq!(current.value_2, test_device_compatible_migration::DEFAULT_CURRENT_VALUE_2);
1069 }
1070}