1use crate::private::Sealed;
6use crate::stash_logger::StashInspectLogger;
7use crate::storage_factory::{DefaultLoader, NoneT};
8use crate::UpdateState;
9use anyhow::{format_err, Context, Error};
10use fidl_fuchsia_stash::{StoreAccessorProxy, Value};
11use fuchsia_async::{MonotonicDuration, MonotonicInstant, Task, Timer};
12use futures::channel::mpsc::UnboundedSender;
13use futures::future::OptionFuture;
14use futures::lock::{Mutex, MutexGuard};
15use futures::{FutureExt, StreamExt};
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18use std::any::Any;
19use std::borrow::Cow;
20use std::collections::HashMap;
21use std::pin::pin;
22use std::rc::Rc;
23
24const SETTINGS_PREFIX: &str = "settings";
25
26const MIN_FLUSH_INTERVAL: MonotonicDuration = MonotonicDuration::from_millis(500);
30
31pub struct DeviceStorage {
34 typed_storage_map: HashMap<&'static str, TypedStorage>,
36
37 typed_loader_map: HashMap<&'static str, Box<TypeErasedLoader>>,
38
39 caching_enabled: bool,
41
42 debounce_writes: bool,
45
46 inspect_handle: Rc<Mutex<StashInspectLogger>>,
48}
49
50struct TypedStorage {
53 flush_sender: UnboundedSender<()>,
55
56 cached_storage: Mutex<CachedStorage>,
58}
59
60struct CachedStorage {
63 current_data: Option<Box<TypeErasedData>>,
65
66 stash_proxy: StoreAccessorProxy,
68}
69
70pub trait DeviceStorageCompatible: Serialize + DeserializeOwned + Clone + PartialEq + Any {
84 type Loader: DefaultDispatcher<Self>;
85
86 fn try_deserialize_from(value: &str) -> Result<Self, Error> {
87 Self::extract(value)
88 }
89
90 fn extract(value: &str) -> Result<Self, Error> {
91 serde_json::from_str(value).map_err(|e| format_err!("could not deserialize: {e:?}"))
92 }
93
94 fn serialize_to(&self) -> String {
95 serde_json::to_string(self).expect("value should serialize")
96 }
97
98 const KEY: &'static str;
99}
100
101pub trait DeviceStorageConvertible: Sized {
156 type Storable: DeviceStorageCompatible + Into<Self>;
158
159 fn get_storable(&self) -> Cow<'_, Self::Storable>;
166}
167
168impl<T> DeviceStorageConvertible for T
170where
171 T: DeviceStorageCompatible,
172{
173 type Storable = T;
174
175 fn get_storable(&self) -> Cow<'_, Self::Storable> {
176 Cow::Borrowed(self)
177 }
178}
179
180type MappingFn = Box<dyn FnOnce(&dyn Any) -> String>;
181type TypeErasedData = dyn Any;
182type TypeErasedLoader = dyn Any;
183
184impl DeviceStorage {
185 pub fn with_stash_proxy<I, G>(
188 iter: I,
189 stash_generator: G,
190 inspect_handle: Rc<Mutex<StashInspectLogger>>,
191 ) -> Self
192 where
193 I: IntoIterator<Item = (&'static str, Option<Box<TypeErasedLoader>>)>,
194 G: Fn() -> StoreAccessorProxy,
195 {
196 let mut typed_loader_map = HashMap::new();
197 let typed_storage_map = iter
198 .into_iter()
199 .map({
200 let inspect_handle = Rc::clone(&inspect_handle);
201 let typed_loader_map = &mut typed_loader_map;
202 move |(key, loader)| {
203 if let Some(loader) = loader {
204 let _ = typed_loader_map.insert(key, loader);
205 }
206 let (flush_sender, flush_receiver) = futures::channel::mpsc::unbounded::<()>();
208 let stash_proxy = stash_generator();
209
210 let storage = TypedStorage {
211 flush_sender,
212 cached_storage: Mutex::new(CachedStorage {
213 current_data: None,
214 stash_proxy: stash_proxy.clone(),
215 }),
216 };
217
218 let inspect_handle = Rc::clone(&inspect_handle);
219 Task::local(async move {
221 let mut next_allowed_flush = MonotonicInstant::now();
222 let mut next_flush_timer = pin!(OptionFuture::from(None).fuse());
223 let flush_requested = flush_receiver.fuse();
224 futures::pin_mut!(flush_requested);
225 loop {
226 futures::select! {
227 () = flush_requested.select_next_some() => {
228 next_flush_timer.set(OptionFuture::from(Some(Timer::new(
229 next_allowed_flush
230 )))
231 .fuse());
232 },
233 o = next_flush_timer => {
234 if let Some(()) = o {
235 DeviceStorage::stash_flush(
236 &stash_proxy,
237 Rc::clone(&inspect_handle),
238 key.to_string()).await;
239 next_allowed_flush = MonotonicInstant::now() + MIN_FLUSH_INTERVAL;
240 }
241 }
242 complete => break,
243 }
244 }
245 })
246 .detach();
247 (key, storage)
248 }
249 })
250 .collect();
251 DeviceStorage {
252 caching_enabled: true,
253 debounce_writes: true,
254 typed_storage_map,
255 typed_loader_map,
256 inspect_handle,
257 }
258 }
259
260 pub fn set_caching_enabled(&mut self, enabled: bool) {
262 self.caching_enabled = enabled;
263 }
264
265 pub fn set_debounce_writes(&mut self, debounce: bool) {
267 self.debounce_writes = debounce;
268 }
269
270 async fn stash_flush(
272 stash_proxy: &StoreAccessorProxy,
273 inspect_handle: Rc<Mutex<StashInspectLogger>>,
274 setting_key: String,
275 ) {
276 let flush_result = stash_proxy.flush().await;
277 match flush_result {
278 Ok(Err(err)) => {
279 Self::handle_flush_failure(inspect_handle, setting_key, format!("{err:?}")).await;
280 }
281 Err(err) => {
282 Self::handle_flush_failure(inspect_handle, setting_key, format!("{err:?}")).await;
283 }
284 _ => {}
285 }
286 }
287
288 async fn handle_flush_failure(
289 inspect_handle: Rc<Mutex<StashInspectLogger>>,
290 setting_key: String,
291 err: String,
292 ) {
293 log::error!("Failed to flush to stash: {:?}", err);
294
295 inspect_handle.lock().await.record_flush_failure(setting_key);
297 }
298
299 async fn inner_write(
300 &self,
301 key: &'static str,
302 new_value: String,
303 data_as_any: Box<TypeErasedData>,
304 mapping_fn: MappingFn,
305 ) -> Result<UpdateState, Error> {
306 let typed_storage = self
307 .typed_storage_map
308 .get(key)
309 .ok_or_else(|| format_err!("Invalid data keyed by {}", key))?;
310 let mut cached_storage = typed_storage.cached_storage.lock().await;
311 let mut maybe_init;
312 let cached_value = {
313 maybe_init = cached_storage
314 .current_data
315 .as_deref()
316 .map(mapping_fn);
318 if maybe_init.is_none() {
319 let stash_key = prefixed(key);
320 if let Some(stash_value) =
321 cached_storage.stash_proxy.get_value(&stash_key).await.unwrap_or_else(|_| {
322 panic!("failed to get value from stash for {stash_key:?}")
323 })
324 {
325 if let Value::Stringval(string_value) = &*stash_value {
326 maybe_init = Some(string_value.clone());
327 } else {
328 panic!("Unexpected type for key found in stash");
329 }
330 }
331 }
332 maybe_init.as_ref()
333 };
334
335 Ok(if cached_value != Some(&new_value) {
336 let serialized = Value::Stringval(new_value);
337 let key = prefixed(key);
338 cached_storage.stash_proxy.set_value(&key, serialized)?;
339 if !self.debounce_writes {
340 DeviceStorage::stash_flush(
342 &cached_storage.stash_proxy,
343 Rc::clone(&self.inspect_handle),
344 key,
345 )
346 .await;
347 } else {
348 typed_storage.flush_sender.unbounded_send(()).with_context(|| {
349 format!("flush_sender failed to send flush message, associated key is {key}")
350 })?;
351 }
352 cached_storage.current_data = Some(data_as_any);
353 UpdateState::Updated
354 } else {
355 UpdateState::Unchanged
356 })
357 }
358
359 pub async fn write<T>(&self, new_value: &T) -> Result<UpdateState, Error>
361 where
362 T: DeviceStorageConvertible,
363 {
364 let storable = new_value.get_storable();
365 self.inner_write(
366 T::Storable::KEY,
367 storable.serialize_to(),
368 Box::new(storable.into_owned()) as Box<TypeErasedData>,
369 Box::new(|any: &dyn Any| {
370 let value = any.downcast_ref::<T::Storable>().expect(
374 "Type mismatch even though keys match. Two different\
375 types have the same key value",
376 );
377 value.serialize_to()
378 }),
379 )
380 .await
381 }
382
383 pub async fn write_str(&self, key: &'static str, value: String) -> Result<(), Error> {
386 let typed_storage =
387 self.typed_storage_map.get(key).expect("Did not request an initialized key");
388 let cached_storage = typed_storage.cached_storage.lock().await;
389 cached_storage.stash_proxy.set_value(&prefixed(key), Value::Stringval(value))?;
390 typed_storage.flush_sender.unbounded_send(()).unwrap();
391 Ok(())
392 }
393
394 async fn get_inner(
395 &self,
396 key: &'static str,
397 ) -> (MutexGuard<'_, CachedStorage>, Option<Option<String>>) {
398 let typed_storage = self
399 .typed_storage_map
400 .get(key)
401 .unwrap_or_else(|| panic!("Invalid data keyed by {key}"));
403 let cached_storage = typed_storage.cached_storage.lock().await;
404 let new = if cached_storage.current_data.is_none() || !self.caching_enabled {
405 let stash_key = prefixed(key);
406 if let Some(stash_value) = cached_storage
407 .stash_proxy
408 .get_value(&stash_key)
409 .await
410 .unwrap_or_else(|_| panic!("failed to get value from stash for {stash_key:?}"))
411 {
412 if let Value::Stringval(string_value) = *stash_value {
413 Some(Some(string_value))
414 } else {
415 panic!("Unexpected type for key found in stash");
416 }
417 } else {
418 Some(None)
419 }
420 } else {
421 None
422 };
423
424 (cached_storage, new)
425 }
426
427 pub async fn get<T>(&self) -> T::Storable
430 where
431 T: DeviceStorageConvertible,
432 {
433 let (mut cached_storage, update) = self.get_inner(T::Storable::KEY).await;
434 if let Some(update) = update {
435 cached_storage.current_data = Some(update.and_then(|string_value| {
436 T::Storable::try_deserialize_from(&string_value).map(|val| Box::new(val) as Box<TypeErasedData>).map_err(|e| log::error!(
437 "Using default. Failed to deserialize type {}: {e:?}\nSource data: {string_value:?}",
438 T::Storable::KEY
439 )).ok()
440 }).unwrap_or_else(|| Box::new(<<T::Storable as DeviceStorageCompatible>::Loader as DefaultDispatcher<T::Storable>>::get_default(self)) as Box<TypeErasedData>));
441 };
442
443 cached_storage
444 .current_data
445 .as_ref()
446 .expect("should always have a value")
447 .downcast_ref::<T::Storable>()
448 .expect(
449 "Type mismatch even though keys match. Two different types have the same key\
450 value",
451 )
452 .clone()
453 }
454}
455
456pub trait DefaultDispatcher<T>: Sealed
457where
458 T: DeviceStorageCompatible,
459{
460 fn get_default(_: &DeviceStorage) -> T;
461}
462
463impl<T> DefaultDispatcher<T> for NoneT
464where
465 T: DeviceStorageCompatible<Loader = Self> + Default,
466{
467 fn get_default(_: &DeviceStorage) -> T {
468 T::default()
469 }
470}
471
472impl<T, L> DefaultDispatcher<T> for L
473where
474 T: DeviceStorageCompatible<Loader = L>,
475 L: DefaultLoader<Result = T> + 'static,
476{
477 fn get_default(storage: &DeviceStorage) -> T {
478 match storage.typed_loader_map.get(T::KEY) {
479 Some(loader) => match loader.downcast_ref::<T::Loader>() {
480 Some(loader) => loader.default_value(),
481 None => {
482 panic!("Mismatch key and loader for key {}", T::KEY);
483 }
484 },
485 None => panic!("Missing loader for {}", T::KEY),
486 }
487 }
488}
489
490fn prefixed(input_string: &str) -> String {
491 format!("{SETTINGS_PREFIX}_{input_string}")
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497 use assert_matches::assert_matches;
498 use diagnostics_assertions::assert_data_tree;
499 use fidl_fuchsia_stash::{
500 FlushError, StoreAccessorMarker, StoreAccessorRequest, StoreAccessorRequestStream,
501 };
502 use fuchsia_async as fasync;
503 use fuchsia_async::TestExecutor;
504 use fuchsia_inspect::component;
505 use futures::prelude::*;
506 use serde::{Deserialize, Serialize};
507 use std::task::Poll;
508
509 const VALUE0: i32 = 3;
510 const VALUE1: i32 = 33;
511 const VALUE2: i32 = 128;
512
513 #[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
514 struct TestStruct {
515 value: i32,
516 }
517
518 const STORE_KEY: &str = "settings_testkey";
519
520 impl DeviceStorageCompatible for TestStruct {
521 type Loader = NoneT;
522 const KEY: &'static str = "testkey";
523 }
524
525 impl Default for TestStruct {
526 fn default() -> Self {
527 TestStruct { value: VALUE0 }
528 }
529 }
530
531 #[track_caller]
533 fn advance_executor<F>(executor: &mut TestExecutor, future: &mut F)
534 where
535 F: Future + Unpin,
536 {
537 assert!(executor.run_until_stalled(future).is_ready(), "TestExecutor stalled!");
538 }
539
540 async fn verify_stash_set(stash_stream: &mut StoreAccessorRequestStream, expected_value: i32) {
542 match stash_stream.next().await.unwrap() {
543 Ok(StoreAccessorRequest::SetValue { key, val, control_handle: _ }) => {
544 assert_eq!(key, STORE_KEY);
545 if let Value::Stringval(string_value) = val {
546 let input_value = TestStruct::try_deserialize_from(&string_value)
547 .expect("deserialization should succeed");
548 assert_eq!(input_value.value, expected_value);
549 } else {
550 panic!("Unexpected type for key found in stash");
551 }
552 }
553 request => panic!("Unexpected request: {request:?}"),
554 }
555 }
556
557 async fn validate_stash_get_and_respond(
559 stash_stream: &mut StoreAccessorRequestStream,
560 response: String,
561 ) {
562 match stash_stream.next().await.unwrap() {
563 Ok(StoreAccessorRequest::GetValue { key, responder }) => {
564 assert_eq!(key, STORE_KEY);
565 responder.send(Some(Value::Stringval(response))).expect("unable to send response");
566 }
567 request => panic!("Unexpected request: {request:?}"),
568 }
569 }
570
571 async fn verify_stash_flush(stash_stream: &mut StoreAccessorRequestStream) {
573 match stash_stream.next().await.unwrap() {
574 Ok(StoreAccessorRequest::Flush { responder }) => {
575 let _ = responder.send(Ok(()));
576 } request => panic!("Unexpected request: {request:?}"),
578 }
579 }
580
581 async fn fail_stash_flush(stash_stream: &mut StoreAccessorRequestStream) {
583 match stash_stream.next().await.unwrap() {
584 Ok(StoreAccessorRequest::Flush { responder }) => {
585 let _ = responder.send(Err(FlushError::CommitFailed));
586 } request => panic!("Unexpected request: {request:?}"),
588 }
589 }
590
591 #[fuchsia::test(allow_stalls = false)]
592 async fn test_get() {
593 let (stash_proxy, mut stash_stream) =
594 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
595
596 fasync::Task::local(async move {
597 let value_to_get = TestStruct { value: VALUE1 };
598
599 #[allow(clippy::single_match)]
600 while let Some(req) = stash_stream.try_next().await.unwrap() {
601 #[allow(unreachable_patterns)]
602 match req {
603 StoreAccessorRequest::GetValue { key, responder } => {
604 assert_eq!(key, STORE_KEY);
605 let response = Value::Stringval(value_to_get.serialize_to());
606
607 responder.send(Some(response)).unwrap();
608 }
609 _ => {}
610 }
611 }
612 })
613 .detach();
614
615 let storage = DeviceStorage::with_stash_proxy(
616 vec![(TestStruct::KEY, None)],
617 move || stash_proxy.clone(),
618 Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
619 );
620 let result = storage.get::<TestStruct>().await;
621
622 assert_eq!(result.value, VALUE1);
623 }
624
625 #[fuchsia::test(allow_stalls = false)]
626 async fn test_get_default() {
627 let (stash_proxy, mut stash_stream) =
628 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
629
630 fasync::Task::local(async move {
631 #[allow(clippy::single_match)]
632 while let Some(req) = stash_stream.try_next().await.unwrap() {
633 #[allow(unreachable_patterns)]
634 match req {
635 StoreAccessorRequest::GetValue { key: _, responder } => {
636 responder.send(None).unwrap();
637 }
638 _ => {}
639 }
640 }
641 })
642 .detach();
643
644 let storage = DeviceStorage::with_stash_proxy(
645 vec![(TestStruct::KEY, None)],
646 move || stash_proxy.clone(),
647 Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
648 );
649 let result = storage.get::<TestStruct>().await;
650
651 assert_eq!(result.value, VALUE0);
652 }
653
654 #[fuchsia::test(allow_stalls = false)]
656 async fn test_invalid_stash() {
657 let (stash_proxy, mut stash_stream) =
658 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
659
660 fasync::Task::local(async move {
661 #[allow(clippy::single_match)]
662 while let Some(req) = stash_stream.try_next().await.unwrap() {
663 #[allow(unreachable_patterns)]
664 match req {
665 StoreAccessorRequest::GetValue { key: _, responder } => {
666 let response = Value::Stringval("invalid value".to_string());
667 responder.send(Some(response)).unwrap();
668 }
669 _ => {}
670 }
671 }
672 })
673 .detach();
674
675 let storage = DeviceStorage::with_stash_proxy(
676 vec![(TestStruct::KEY, None)],
677 move || stash_proxy.clone(),
678 Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
679 );
680
681 let result = storage.get::<TestStruct>().await;
682
683 assert_eq!(result.value, VALUE0);
684 }
685
686 #[fuchsia::test]
688 fn test_flush_fail_writes_to_inspect() {
689 let written_value = VALUE2;
690 let mut executor = TestExecutor::new_with_fake_time();
691
692 let (stash_proxy, mut stash_stream) =
693 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
694
695 let inspector = component::inspector();
696 let logger_handle = Rc::new(Mutex::new(StashInspectLogger::new(inspector.root())));
697 let storage = DeviceStorage::with_stash_proxy(
698 vec![(TestStruct::KEY, None)],
699 move || stash_proxy.clone(),
700 logger_handle,
701 );
702
703 let value_to_write = TestStruct { value: written_value };
705 let write_future = storage.write(&value_to_write);
706 futures::pin_mut!(write_future);
707
708 assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Pending);
710
711 {
712 let respond_future = validate_stash_get_and_respond(
713 &mut stash_stream,
714 serde_json::to_string(&TestStruct::default()).unwrap(),
715 );
716 futures::pin_mut!(respond_future);
717 advance_executor(&mut executor, &mut respond_future);
718 }
719
720 assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Ready(Ok(_)));
722
723 {
725 let set_value_future = verify_stash_set(&mut stash_stream, written_value);
726 futures::pin_mut!(set_value_future);
727 advance_executor(&mut executor, &mut set_value_future);
728 }
729
730 let flush_future = fail_stash_flush(&mut stash_stream);
732 futures::pin_mut!(flush_future);
733
734 advance_executor(&mut executor, &mut flush_future);
737
738 {
741 let value_to_write = TestStruct { value: VALUE1 };
742 let write_future = storage.write(&value_to_write);
743 futures::pin_mut!(write_future);
744 assert_matches!(
745 executor.run_until_stalled(&mut write_future),
746 Poll::Ready(Result::Ok(_))
747 );
748 }
749
750 let _ = executor.run_until_stalled(&mut future::pending::<()>());
752
753 assert_data_tree!(@executor executor, inspector, root: {
754 stash_failures: {
755 testkey: {
756 count: 1u64,
757 }
758 }
759 });
760 }
761
762 #[fuchsia::test]
765 fn test_first_write_flushes_immediately() {
766 let written_value = VALUE2;
767 let mut executor = TestExecutor::new_with_fake_time();
768
769 let (stash_proxy, mut stash_stream) =
770 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
771
772 let storage = DeviceStorage::with_stash_proxy(
773 vec![(TestStruct::KEY, None)],
774 move || stash_proxy.clone(),
775 Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
776 );
777
778 let value_to_write = TestStruct { value: written_value };
780 let write_future = storage.write(&value_to_write);
781 futures::pin_mut!(write_future);
782
783 assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Pending);
785
786 {
787 let respond_future = validate_stash_get_and_respond(
788 &mut stash_stream,
789 serde_json::to_string(&TestStruct::default()).unwrap(),
790 );
791 futures::pin_mut!(respond_future);
792 advance_executor(&mut executor, &mut respond_future);
793 }
794
795 assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Ready(Ok(_)));
797
798 {
800 let set_value_future = verify_stash_set(&mut stash_stream, written_value);
801 futures::pin_mut!(set_value_future);
802 advance_executor(&mut executor, &mut set_value_future);
803 }
804
805 let flush_future = verify_stash_flush(&mut stash_stream);
807 futures::pin_mut!(flush_future);
808
809 advance_executor(&mut executor, &mut flush_future);
812 }
813
814 #[derive(Default, Copy, Clone, PartialEq, Serialize, Deserialize)]
815 struct WrongStruct;
816
817 impl DeviceStorageCompatible for WrongStruct {
818 type Loader = NoneT;
819 const KEY: &'static str = "WRONG_STRUCT";
820 }
821
822 #[fuchsia::test(allow_stalls = false)]
825 async fn test_write_with_mismatch_type_returns_error() {
826 let (stash_proxy, mut stream) =
827 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
828
829 let spawned = fasync::Task::local(async move {
830 while let Some(request) = stream.next().await {
831 match request {
832 Ok(StoreAccessorRequest::GetValue { key, responder }) => {
833 assert_eq!(key, STORE_KEY);
834 let _ = responder.send(Some(Value::Stringval(
835 serde_json::to_string(&TestStruct { value: VALUE2 }).unwrap(),
836 )));
837 }
838 Ok(StoreAccessorRequest::SetValue { key, .. }) => {
839 assert_eq!(key, STORE_KEY);
840 }
841 _ => panic!("Unexpected request {request:?}"),
842 }
843 }
844 });
845
846 let storage = DeviceStorage::with_stash_proxy(
847 vec![(TestStruct::KEY, None)],
848 move || stash_proxy.clone(),
849 Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
850 );
851
852 let result = storage.write(&TestStruct { value: VALUE2 }).await;
854 assert!(result.is_ok());
855
856 let result = storage.write(&WrongStruct).await;
859 assert_matches!(result, Err(e) if e.to_string() == "Invalid data keyed by WRONG_STRUCT");
860
861 drop(storage);
862 spawned.await;
863 }
864
865 #[fuchsia::test]
868 fn test_multiple_write_debounce() {
869 let mut executor = TestExecutor::new_with_fake_time();
872 let start_time = MonotonicInstant::from_nanos(0);
873 executor.set_fake_time(start_time);
874
875 let (stash_proxy, mut stash_stream) =
876 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
877
878 let storage = DeviceStorage::with_stash_proxy(
879 vec![(TestStruct::KEY, None)],
880 move || stash_proxy.clone(),
881 Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
882 );
883
884 let first_value = VALUE1;
885 let second_value = VALUE2;
886
887 {
889 let value_to_write = TestStruct { value: first_value };
890 let write_future = storage.write(&value_to_write);
891 futures::pin_mut!(write_future);
892
893 assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Pending);
895
896 {
897 let respond_future = validate_stash_get_and_respond(
898 &mut stash_stream,
899 serde_json::to_string(&TestStruct::default()).unwrap(),
900 );
901 futures::pin_mut!(respond_future);
902 advance_executor(&mut executor, &mut respond_future);
903 }
904
905 assert_matches!(
906 executor.run_until_stalled(&mut write_future),
907 Poll::Ready(Result::Ok(_))
908 );
909 }
910
911 {
913 let set_value_future = verify_stash_set(&mut stash_stream, first_value);
914 futures::pin_mut!(set_value_future);
915 advance_executor(&mut executor, &mut set_value_future);
916 }
917
918 {
920 let flush_future = verify_stash_flush(&mut stash_stream);
921 futures::pin_mut!(flush_future);
922 advance_executor(&mut executor, &mut flush_future);
923 }
924
925 {
930 let value_to_write = TestStruct { value: second_value };
931 let write_future = storage.write(&value_to_write);
932 futures::pin_mut!(write_future);
933 assert_matches!(
934 executor.run_until_stalled(&mut write_future),
935 Poll::Ready(Result::Ok(_))
936 );
937 }
938
939 {
941 let set_value_future = verify_stash_set(&mut stash_stream, second_value);
942 futures::pin_mut!(set_value_future);
943 advance_executor(&mut executor, &mut set_value_future);
944 }
945
946 let flush_future = verify_stash_flush(&mut stash_stream);
948 futures::pin_mut!(flush_future);
949
950 assert_matches!(executor.run_until_stalled(&mut flush_future), Poll::Pending);
952
953 executor
955 .set_fake_time(start_time + (MIN_FLUSH_INTERVAL - MonotonicDuration::from_millis(1)));
956
957 assert_matches!(executor.run_until_stalled(&mut flush_future), Poll::Pending);
959
960 executor.set_fake_time(start_time + MIN_FLUSH_INTERVAL);
962
963 advance_executor(&mut executor, &mut flush_future);
965 }
966
967 mod test_device_compatible_migration {
970 use super::*;
971 use serde::{Deserialize, Serialize};
972
973 pub(crate) const DEFAULT_V1_VALUE: i32 = 1;
974 pub(crate) const DEFAULT_CURRENT_VALUE: i32 = 2;
975 pub(crate) const DEFAULT_CURRENT_VALUE_2: i32 = 3;
976
977 #[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
978 pub(crate) struct V1 {
979 pub value: i32,
980 }
981
982 impl DeviceStorageCompatible for V1 {
983 type Loader = NoneT;
984 const KEY: &'static str = "testkey";
985 }
986
987 impl Default for V1 {
988 fn default() -> Self {
989 Self { value: DEFAULT_V1_VALUE }
990 }
991 }
992
993 #[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
994 pub(crate) struct Current {
995 pub value: i32,
996 pub value_2: i32,
997 }
998
999 impl From<V1> for Current {
1000 fn from(v1: V1) -> Self {
1001 Current { value: v1.value, value_2: DEFAULT_CURRENT_VALUE_2 }
1002 }
1003 }
1004
1005 impl DeviceStorageCompatible for Current {
1006 type Loader = NoneT;
1007 const KEY: &'static str = "testkey2";
1008
1009 fn try_deserialize_from(value: &str) -> Result<Self, Error> {
1010 Self::extract(value).or_else(|_| V1::extract(value).map(Self::from))
1011 }
1012 }
1013
1014 impl Default for Current {
1015 fn default() -> Self {
1016 Self { value: DEFAULT_CURRENT_VALUE, value_2: DEFAULT_CURRENT_VALUE_2 }
1017 }
1018 }
1019 }
1020
1021 #[fuchsia::test]
1022 fn test_device_compatible_custom_migration() {
1023 let initial = test_device_compatible_migration::V1::default();
1025 let initial_serialized = initial.serialize_to();
1027
1028 let current =
1030 test_device_compatible_migration::Current::try_deserialize_from(&initial_serialized)
1031 .expect("deserialization should succeed");
1032 assert_eq!(current.value, test_device_compatible_migration::DEFAULT_V1_VALUE);
1034 assert_eq!(current.value_2, test_device_compatible_migration::DEFAULT_CURRENT_VALUE_2);
1035 }
1036
1037 #[fuchsia::test(allow_stalls = false)]
1038 async fn test_corrupt_get_returns_default() {
1039 let (stash_proxy, mut stash_stream) =
1040 fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>();
1041
1042 fasync::Task::local(async move {
1043 #[allow(clippy::single_match)]
1044 while let Some(req) = stash_stream.try_next().await.unwrap() {
1045 #[allow(unreachable_patterns)]
1046 match req {
1047 StoreAccessorRequest::GetValue { key, responder } => {
1048 assert_eq!(
1049 key,
1050 format!("settings_{}", test_device_compatible_migration::Current::KEY)
1051 );
1052 let response = Value::Stringval("bad json".to_string());
1053 responder.send(Some(response)).unwrap();
1054 }
1055 _ => {}
1056 }
1057 }
1058 })
1059 .detach();
1060
1061 let storage = DeviceStorage::with_stash_proxy(
1062 vec![(test_device_compatible_migration::Current::KEY, None)],
1063 move || stash_proxy.clone(),
1064 Rc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
1065 );
1066 let current = storage.get::<test_device_compatible_migration::Current>().await;
1067
1068 assert_eq!(current.value, test_device_compatible_migration::DEFAULT_CURRENT_VALUE);
1069 assert_eq!(current.value_2, test_device_compatible_migration::DEFAULT_CURRENT_VALUE_2);
1070 }
1071}