1use crate::private::Sealed;
6use crate::storage_factory::{DefaultLoader, NoneT};
7use crate::UpdateState;
8use anyhow::{bail, format_err, Context, Error};
9use fidl::{persist, unpersist, Persistable, Status};
10use fidl_fuchsia_io::DirectoryProxy;
11use fuchsia_async::{MonotonicInstant, Task, Timer};
12use fuchsia_fs::file::ReadError;
13use fuchsia_fs::node::OpenError;
14use fuchsia_fs::Flags;
15
16use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
17use futures::future::OptionFuture;
18use futures::lock::{Mutex, MutexGuard};
19use futures::{FutureExt, StreamExt};
20use std::any::Any;
21use std::collections::HashMap;
22use std::pin::pin;
23use std::rc::Rc;
24use zx::MonotonicDuration;
25
26const MIN_FLUSH_INTERVAL_MS: i64 = 500;
29const MAX_FLUSH_INTERVAL_MS: i64 = 1_800_000; const MIN_FLUSH_DURATION: MonotonicDuration = MonotonicDuration::from_millis(MIN_FLUSH_INTERVAL_MS);
31
32pub trait FidlStorageConvertible {
33 type Storable;
34 type Loader;
35
36 const KEY: &'static str;
37
38 fn to_storable(self) -> Self::Storable;
39 fn from_storable(storable: Self::Storable) -> Self;
40}
41
42pub struct FidlStorage {
45 typed_storage_map: HashMap<&'static str, TypedStorage>,
47
48 typed_loader_map: HashMap<&'static str, Box<dyn Any>>,
49
50 caching_enabled: bool,
52
53 debounce_writes: bool,
56
57 storage_dir: DirectoryProxy,
58}
59
60struct TypedStorage {
63 flush_sender: UnboundedSender<()>,
65
66 cached_storage: Rc<Mutex<CachedStorage>>,
68}
69
70struct CachedStorage {
73 current_data: Option<Vec<u8>>,
76
77 temp_file_path: String,
88
89 file_path: String,
91}
92
93impl CachedStorage {
94 async fn sync(&mut self, storage_dir: &DirectoryProxy) -> Result<(), Error> {
96 {
98 let file_proxy = fuchsia_fs::directory::open_file(
99 storage_dir,
100 &self.temp_file_path,
101 Flags::FLAG_MUST_CREATE
102 | Flags::FILE_TRUNCATE
103 | fuchsia_fs::PERM_READABLE
104 | fuchsia_fs::PERM_WRITABLE,
105 )
106 .await
107 .with_context(|| format!("unable to open {:?} for writing", self.temp_file_path))?;
108 fuchsia_fs::file::write(&file_proxy, self.current_data.as_ref().unwrap())
109 .await
110 .context("failed to write data to file")?;
111 file_proxy
112 .close()
113 .await
114 .context("failed to call close on temp file")?
115 .map_err(zx::Status::from_raw)?;
116 }
117 fuchsia_fs::directory::rename(storage_dir, &self.temp_file_path, &self.file_path)
118 .await
119 .context("failed to rename temp file to permanent file")?;
120 storage_dir
121 .sync()
122 .await
123 .context("failed to call sync on directory after rename")?
124 .map_err(zx::Status::from_raw)
125 .or_else(|e| if let zx::Status::NOT_SUPPORTED = e { Ok(()) } else { Err(e) })
128 .context("failed to sync rename to directory")
129 }
130}
131
132impl FidlStorage {
133 pub(crate) async fn with_file_proxy<I, G>(
141 iter: I,
142 storage_dir: DirectoryProxy,
143 files_generator: G,
144 ) -> Result<(Self, Vec<Task<()>>), Error>
145 where
146 I: IntoIterator<Item = (&'static str, Option<Box<dyn Any>>)>,
147 G: Fn(&'static str) -> Result<(String, String), Error>,
148 {
149 let mut typed_storage_map = HashMap::new();
150 let iter = iter.into_iter();
151 typed_storage_map.reserve(iter.size_hint().0);
152 let mut typed_loader_map = HashMap::new();
153 let mut sync_tasks = Vec::with_capacity(iter.size_hint().0);
154 for (key, loader) in iter {
155 let (flush_sender, flush_receiver) = futures::channel::mpsc::unbounded::<()>();
157 let (temp_file_path, file_path) =
158 files_generator(key).context("failed to generate file")?;
159
160 let cached_storage = Rc::new(Mutex::new(CachedStorage {
161 current_data: None,
162 temp_file_path,
163 file_path,
164 }));
165 let storage = TypedStorage { flush_sender, cached_storage: Rc::clone(&cached_storage) };
166
167 let sync_task = Task::local(Self::synchronize_task(
169 Clone::clone(&storage_dir),
170 cached_storage,
171 flush_receiver,
172 ));
173 sync_tasks.push(sync_task);
174 let _ = typed_storage_map.insert(key, storage);
175 if let Some(loader) = loader {
176 let _ = typed_loader_map.insert(key, loader);
177 }
178 }
179 Ok((
180 FidlStorage {
181 caching_enabled: true,
182 debounce_writes: true,
183 typed_storage_map,
184 typed_loader_map,
185 storage_dir,
186 },
187 sync_tasks,
188 ))
189 }
190
191 async fn synchronize_task(
192 storage_dir: DirectoryProxy,
193 cached_storage: Rc<Mutex<CachedStorage>>,
194 flush_receiver: UnboundedReceiver<()>,
195 ) {
196 let mut has_pending_flush = false;
197
198 let mut last_flush: MonotonicInstant = MonotonicInstant::now() - MIN_FLUSH_DURATION;
202
203 let mut next_flush_timer = pin!(OptionFuture::<Timer>::from(None).fuse());
206 let mut retries = 0;
207 let mut retrying = false;
208
209 let flush_fuse = flush_receiver.fuse();
210
211 futures::pin_mut!(flush_fuse);
212 loop {
213 futures::select! {
214 _ = flush_fuse.select_next_some() => {
215 if retrying {
218 continue;
219 }
220
221 let now = MonotonicInstant::now();
223 let next_flush_time = if now - last_flush > MIN_FLUSH_DURATION {
224 now
227 } else {
228 last_flush + MIN_FLUSH_DURATION
232 };
233
234 has_pending_flush = true;
235 next_flush_timer.set(OptionFuture::from(Some(Timer::new(next_flush_time))).fuse());
236 }
237
238 _ = next_flush_timer => {
239 if has_pending_flush {
241 let mut cached_storage = cached_storage.lock().await;
242
243 if let Err(e) = cached_storage.sync(&storage_dir).await {
246 retrying = true;
247 let flush_duration = MonotonicDuration::from_millis(
248 2_i64.saturating_pow(retries)
249 .saturating_mul(MIN_FLUSH_INTERVAL_MS)
250 .min(MAX_FLUSH_INTERVAL_MS)
251 );
252 let next_flush_time = MonotonicInstant::now() + flush_duration;
253 log::error!(
254 "Failed to sync write to disk for {:?}, delaying by {:?}, \
255 caused by: {:?}",
256 cached_storage.file_path,
257 flush_duration,
258 e
259 );
260
261 next_flush_timer.set(OptionFuture::from(Some(Timer::new(next_flush_time))).fuse());
263 retries += 1;
264 continue;
265 }
266 last_flush = MonotonicInstant::now();
267 has_pending_flush = false;
268 retrying = false;
269 retries = 0;
270 }
271 }
272
273 complete => break,
274 }
275 }
276 }
277
278 #[cfg(test)]
279 #[allow(dead_code)]
281 fn set_caching_enabled(&mut self, enabled: bool) {
282 self.caching_enabled = enabled;
283 }
284
285 #[cfg(test)]
286 #[allow(dead_code)]
288 fn set_debounce_writes(&mut self, debounce: bool) {
289 self.debounce_writes = debounce;
290 }
291
292 async fn inner_write(
293 &self,
294 key: &'static str,
295 new_value: Vec<u8>,
296 ) -> Result<UpdateState, Error> {
297 let typed_storage = self
298 .typed_storage_map
299 .get(key)
300 .ok_or_else(|| format_err!("Invalid data keyed by {}", key))?;
301 let mut cached_storage = typed_storage.cached_storage.lock().await;
302 let bytes;
303 let cached_value = match cached_storage.current_data.as_ref() {
304 Some(cached_value) => Some(cached_value),
305 None => {
306 let file_proxy = fuchsia_fs::directory::open_file(
307 &self.storage_dir,
308 &cached_storage.file_path,
309 fuchsia_fs::PERM_READABLE,
310 )
311 .await;
312 bytes = match file_proxy {
313 Ok(file_proxy) => match fuchsia_fs::file::read(&file_proxy).await {
314 Ok(bytes) => Some(bytes),
315 Err(ReadError::Open(OpenError::OpenError(e))) if e == Status::NOT_FOUND => {
316 None
317 }
318 Err(e) => {
319 bail!("failed to get value from fidl storage for {:?}: {:?}", key, e)
320 }
321 },
322 Err(OpenError::OpenError(Status::NOT_FOUND)) => None,
323 Err(e) => bail!("unable to read data on disk for {:?}: {:?}", key, e),
324 };
325 bytes.as_ref()
326 }
327 };
328
329 Ok(if cached_value.map(|c| *c != new_value).unwrap_or(true) {
330 cached_storage.current_data = Some(new_value);
331 if !self.debounce_writes {
332 cached_storage
334 .sync(&self.storage_dir)
335 .await
336 .with_context(|| format!("Failed to sync data for key {key:?}"))?;
337 } else {
338 typed_storage.flush_sender.unbounded_send(()).with_context(|| {
339 format!("flush_sender failed to send flush message, associated key is {key}")
340 })?;
341 }
342 UpdateState::Updated
343 } else {
344 UpdateState::Unchanged
345 })
346 }
347
348 pub async fn write<T>(&self, new_value: T) -> Result<UpdateState, Error>
350 where
351 T: FidlStorageConvertible,
352 <T as FidlStorageConvertible>::Storable: Persistable,
353 {
354 let new_value = persist(&new_value.to_storable())?;
355 self.inner_write(T::KEY, new_value).await
356 }
357
358 async fn get_inner(&self, key: &'static str) -> MutexGuard<'_, CachedStorage> {
359 let typed_storage = self
360 .typed_storage_map
361 .get(key)
362 .unwrap_or_else(|| panic!("Invalid data keyed by {key}"));
364 let mut cached_storage = typed_storage.cached_storage.lock().await;
365 if cached_storage.current_data.is_none() || !self.caching_enabled {
366 if let Some(file_proxy) = match fuchsia_fs::directory::open_file(
367 &self.storage_dir,
368 &cached_storage.file_path,
369 fuchsia_fs::PERM_READABLE,
370 )
371 .await
372 {
373 Ok(file_proxy) => Some(file_proxy),
374 Err(OpenError::OpenError(Status::NOT_FOUND)) => None,
375 Err(e) => panic!("failed to open file for {key:?}: {e:?}"),
377 } {
378 let data = match fuchsia_fs::file::read(&file_proxy).await {
379 Ok(data) => Some(data),
380 Err(ReadError::ReadError(Status::NOT_FOUND)) => None,
381 Err(e) => panic!("failed to get fidl data from disk for {key:?}: {e:?}"),
383 };
384
385 cached_storage.current_data = data;
386 }
387 }
388
389 cached_storage
390 }
391
392 pub async fn get<T>(&self) -> T
395 where
396 T: FidlStorageConvertible,
397 T::Storable: Persistable,
398 T::Loader: DefaultDispatcher<T>,
399 {
400 match self.get_inner(T::KEY).await.current_data.as_ref().map(|data| {
401 T::from_storable(
402 unpersist(data).expect("Should not be able to save mismatching types in file"),
403 )
404 }) {
405 Some(data) => data,
406 None => <T::Loader as DefaultDispatcher<T>>::get_default(self),
407 }
408 }
409}
410
411pub trait DefaultDispatcher<T>: Sealed {
412 fn get_default(_: &FidlStorage) -> T;
413}
414
415impl<T> DefaultDispatcher<T> for NoneT
416where
417 T: Default,
418{
419 fn get_default(_: &FidlStorage) -> T {
420 T::default()
421 }
422}
423
424impl<T, L> DefaultDispatcher<T> for L
425where
426 T: FidlStorageConvertible<Loader = L>,
427 L: DefaultLoader<Result = T> + 'static,
428{
429 fn get_default(storage: &FidlStorage) -> T {
430 match storage.typed_loader_map.get(T::KEY) {
431 Some(loader) => match loader.downcast_ref::<T::Loader>() {
432 Some(loader) => loader.default_value(),
433 None => {
434 panic!("Mismatch key and loader for key {}", T::KEY);
435 }
436 },
437 None => panic!("Missing loader for {}", T::KEY),
438 }
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445 use assert_matches::assert_matches;
446 use fasync::TestExecutor;
447 use fidl::endpoints::ControlHandle;
448 use fidl::epitaph::ChannelEpitaphExt;
449 use fidl_test_storage::{TestStruct, WrongStruct};
450 use futures::TryStreamExt;
451 use std::sync::Arc;
452 use std::task::Poll;
453 use test_case::test_case;
454 use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
455
456 const VALUE0: i32 = 3;
457 const VALUE1: i32 = 33;
458 const VALUE2: i32 = 128;
459
460 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
461 struct LibTestStruct {
462 value: i32,
463 }
464
465 impl FidlStorageConvertible for LibTestStruct {
466 type Storable = TestStruct;
467 type Loader = NoneT;
468 const KEY: &'static str = "testkey";
469
470 fn to_storable(self) -> Self::Storable {
471 TestStruct { value: self.value }
472 }
473
474 fn from_storable(storable: Self::Storable) -> Self {
475 Self { value: storable.value }
476 }
477 }
478
479 impl Default for LibTestStruct {
480 fn default() -> Self {
481 Self { value: VALUE0 }
482 }
483 }
484
485 fn open_tempdir(tempdir: &tempfile::TempDir) -> fio::DirectoryProxy {
486 fuchsia_fs::directory::open_in_namespace(
487 tempdir.path().to_str().expect("tempdir path is not valid UTF-8"),
488 fuchsia_fs::PERM_READABLE | fuchsia_fs::PERM_WRITABLE,
489 )
490 .expect("failed to open connection to tempdir")
491 }
492
493 #[fuchsia::test]
494 async fn test_get() {
495 let value_to_get = LibTestStruct { value: VALUE1 };
496 let tempdir = tempfile::tempdir().expect("failed to create tempdir");
497 let content = persist(&value_to_get.to_storable()).unwrap();
498 std::fs::write(tempdir.path().join("xyz.pfidl"), content).expect("failed to write file");
499 let storage_dir = open_tempdir(&tempdir);
500
501 let (storage, sync_tasks) = FidlStorage::with_file_proxy(
502 vec![(LibTestStruct::KEY, None)],
503 storage_dir,
504 move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
505 )
506 .await
507 .expect("should be able to generate file");
508 for task in sync_tasks {
509 task.detach();
510 }
511 let result = storage.get::<LibTestStruct>().await;
512
513 assert_eq!(result.value, VALUE1);
514 }
515
516 #[fuchsia::test]
517 async fn test_get_default() {
518 let tempdir = tempfile::tempdir().expect("failed to create tempdir");
519 let storage_dir = open_tempdir(&tempdir);
520
521 let (storage, sync_tasks) = FidlStorage::with_file_proxy(
522 vec![(LibTestStruct::KEY, None)],
523 storage_dir,
524 move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
525 )
526 .await
527 .expect("file proxy should be created");
528 for task in sync_tasks {
529 task.detach();
530 }
531 let result = storage.get::<LibTestStruct>().await;
532
533 assert_eq!(result.value, VALUE0);
534 }
535
536 struct DirectoryInterceptor {
539 real_dir: fio::DirectoryProxy,
540 inner: std::sync::Mutex<DirectoryInterceptorInner>,
541 }
542
543 struct DirectoryInterceptorInner {
544 sync_notifier: Option<futures::channel::mpsc::UnboundedSender<()>>,
545 #[allow(clippy::type_complexity)]
546 open_interceptor: Box<dyn Fn(&str, bool) -> Option<Status>>,
547 }
548
549 impl DirectoryInterceptor {
550 #[allow(clippy::arc_with_non_send_sync)]
552 fn new(real_dir: fio::DirectoryProxy) -> (Arc<Self>, fio::DirectoryProxy) {
553 let (proxy, requests) =
554 fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
555 let this = Arc::new(Self {
556 real_dir,
557 inner: std::sync::Mutex::new(DirectoryInterceptorInner {
558 sync_notifier: None,
559 open_interceptor: Box::new(|_, _| None),
560 }),
561 });
562 fasync::Task::local(this.clone().run(requests)).detach();
563 (this.clone(), proxy)
564 }
565
566 fn install_sync_notifier(&self) -> futures::channel::mpsc::UnboundedReceiver<()> {
569 let (sender, receiver) = futures::channel::mpsc::unbounded();
570 self.inner.lock().unwrap().sync_notifier = Some(sender);
571 receiver
572 }
573
574 #[allow(clippy::type_complexity)]
578 fn set_open_interceptor(&self, interceptor: Box<dyn Fn(&str, bool) -> Option<Status>>) {
579 self.inner.lock().unwrap().open_interceptor = interceptor;
580 }
581
582 async fn run(self: Arc<Self>, mut requests: fio::DirectoryRequestStream) {
583 while let Ok(Some(request)) = requests.try_next().await {
584 match request {
585 fio::DirectoryRequest::DeprecatedOpen {
586 flags,
587 mode,
588 path,
589 object,
590 control_handle: _,
591 } => {
592 match (self.inner.lock().unwrap().open_interceptor)(
593 &path,
594 flags.intersects(fio::OpenFlags::CREATE),
595 ) {
596 Some(status) => {
597 let (_, control_handle) = object.into_stream_and_control_handle();
598 control_handle
599 .send_on_open_(status.into_raw(), None)
600 .expect("failed to send OnOpen event");
601 control_handle.shutdown_with_epitaph(status);
602 }
603 None => {
604 self.real_dir
605 .deprecated_open(flags, mode, &path, object)
606 .expect("failed to forward Open request");
607 }
608 }
609 }
610 fio::DirectoryRequest::Open {
611 path,
612 flags,
613 options,
614 object,
615 control_handle: _,
616 } => {
617 let create = flags.intersects(fio::Flags::FLAG_MUST_CREATE);
618 match (self.inner.lock().unwrap().open_interceptor)(&path, create) {
619 Some(status) => {
620 object.close_with_epitaph(status).expect("failed to send epitaph");
621 }
622 None => {
623 self.real_dir
624 .open(&path, flags, &options, object)
625 .expect("failed to forward Open3 request");
626 }
627 }
628 }
629 fio::DirectoryRequest::Sync { responder } => {
630 let response =
631 self.real_dir.sync().await.expect("failed to forward Sync request");
632 responder.send(response).expect("failed to respond to Sync request");
633 if let Some(sender) = &self.inner.lock().unwrap().sync_notifier {
634 sender.unbounded_send(()).unwrap();
635 }
636 }
637 fio::DirectoryRequest::Rename { src, dst_parent_token, dst, responder } => {
638 let response = self
639 .real_dir
640 .rename(&src, dst_parent_token, &dst)
641 .await
642 .expect("failed to forward Rename request");
643 responder.send(response).expect("failed to respond to Rename request");
644 }
645 fio::DirectoryRequest::GetToken { responder } => {
646 let response = self
647 .real_dir
648 .get_token()
649 .await
650 .expect("failed to forward GetToken request");
651 responder
652 .send(response.0, response.1)
653 .expect("failed to respond to GetToken request");
654 }
655 request => unimplemented!("request: {:?}", request),
656 }
657 }
658 }
659 }
660
661 fn run_until_ready<F>(executor: &mut TestExecutor, fut: F) -> F::Output
666 where
667 F: std::future::Future,
668 {
669 let mut fut = std::pin::pin!(fut);
670 loop {
671 match executor.run_until_stalled(&mut fut) {
672 Poll::Ready(result) => return result,
673 Poll::Pending => std::thread::yield_now(),
674 }
675 }
676 }
677
678 fn assert_file_not_found(
680 executor: &mut TestExecutor,
681 directory: &fio::DirectoryProxy,
682 file_name: &str,
683 ) {
684 let open_fut =
685 fuchsia_fs::directory::open_file(directory, file_name, fuchsia_fs::PERM_READABLE);
686 let result = run_until_ready(executor, open_fut);
687 assert_matches!(result, Result::Err(e) if e.is_not_found_error());
688 }
689
690 fn assert_file_contents(
692 executor: &mut TestExecutor,
693 directory: &fio::DirectoryProxy,
694 file_name: &str,
695 expected_contents: TestStruct,
696 ) {
697 let read_fut = fuchsia_fs::directory::read_file(directory, file_name);
698 let data = run_until_ready(executor, read_fut).expect("reading file");
699 let data = fidl::unpersist::<TestStruct>(&data).expect("failed to read file as TestStruct");
700 assert_eq!(data, expected_contents);
701 }
702
703 #[fuchsia::test]
704 fn test_first_write_syncs_immediately() {
705 let written_value = VALUE1;
706 let mut executor = TestExecutor::new_with_fake_time();
707 executor.set_fake_time(MonotonicInstant::from_nanos(0));
708
709 let tempdir = tempfile::tempdir().expect("failed to create tempdir");
710 let storage_dir = open_tempdir(&tempdir);
711 let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
712 let mut sync_receiver = interceptor.install_sync_notifier();
713
714 let storage_fut = FidlStorage::with_file_proxy(
715 vec![(LibTestStruct::KEY, None)],
716 Clone::clone(&storage_dir),
717 move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
718 );
719 futures::pin_mut!(storage_fut);
720
721 let (storage, _sync_tasks) =
722 if let Poll::Ready(storage) = executor.run_until_stalled(&mut storage_fut) {
723 storage.expect("file proxy should be created")
724 } else {
725 panic!("storage creation stalled");
726 };
727
728 let value_to_write = LibTestStruct { value: written_value };
730 let write_future = storage.write(value_to_write);
731 futures::pin_mut!(write_future);
732
733 assert_matches!(
735 run_until_ready(&mut executor, &mut write_future),
736 Result::Ok(UpdateState::Updated)
737 );
738
739 assert_file_not_found(&mut executor, &storage_dir, "xyz.pfidl");
741
742 run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
744
745 assert_file_contents(
747 &mut executor,
748 &storage_dir,
749 "xyz.pfidl",
750 value_to_write.to_storable(),
751 );
752 }
753
754 #[fuchsia::test]
755 fn test_second_write_syncs_after_interval() {
756 let written_value = VALUE1;
757 let second_value = VALUE2;
758 let mut executor = TestExecutor::new_with_fake_time();
759 executor.set_fake_time(MonotonicInstant::from_nanos(0));
760
761 let tempdir = tempfile::tempdir().expect("failed to create tempdir");
762 let storage_dir = open_tempdir(&tempdir);
763 let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
764 let mut sync_receiver = interceptor.install_sync_notifier();
765
766 let storage_fut = FidlStorage::with_file_proxy(
767 vec![(LibTestStruct::KEY, None)],
768 Clone::clone(&storage_dir),
769 move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
770 );
771 futures::pin_mut!(storage_fut);
772
773 let (storage, _sync_tasks) =
774 if let Poll::Ready(storage) = executor.run_until_stalled(&mut storage_fut) {
775 storage.expect("file proxy should be created")
776 } else {
777 panic!("storage creation stalled");
778 };
779
780 let value_to_write = LibTestStruct { value: written_value };
782 let write_future = storage.write(value_to_write);
783 futures::pin_mut!(write_future);
784
785 assert_matches!(
787 run_until_ready(&mut executor, &mut write_future),
788 Result::Ok(UpdateState::Updated)
789 );
790
791 assert_file_not_found(&mut executor, &storage_dir, "xyz.pfidl");
793
794 run_until_ready(&mut executor, &mut sync_receiver.next()).expect("directory never synced");
796
797 assert_file_contents(
799 &mut executor,
800 &storage_dir,
801 "xyz.pfidl",
802 value_to_write.to_storable(),
803 );
804
805 let value_to_write2 = LibTestStruct { value: second_value };
807 let write_future = storage.write(value_to_write2);
808 futures::pin_mut!(write_future);
809
810 assert_matches!(
812 run_until_ready(&mut executor, &mut write_future),
813 Result::Ok(UpdateState::Updated)
814 );
815
816 assert_file_contents(
818 &mut executor,
819 &storage_dir,
820 "xyz.pfidl",
821 value_to_write.to_storable(),
822 );
823
824 executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000 - 1));
826 assert!(!executor.wake_expired_timers());
827
828 executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000));
830 run_until_ready(&mut executor, &mut sync_receiver.next()).expect("directory never synced");
831
832 assert_file_contents(
835 &mut executor,
836 &storage_dir,
837 "xyz.pfidl",
838 value_to_write2.to_storable(),
839 );
840 }
841
842 #[derive(Copy, Clone, Default, Debug)]
843 struct LibWrongStruct;
844
845 impl FidlStorageConvertible for LibWrongStruct {
846 type Storable = WrongStruct;
847 type Loader = NoneT;
848 const KEY: &'static str = "WRONG_STRUCT";
849
850 fn to_storable(self) -> Self::Storable {
851 WrongStruct
852 }
853
854 fn from_storable(_: Self::Storable) -> Self {
855 LibWrongStruct
856 }
857 }
858
859 #[fuchsia::test]
862 async fn test_write_with_mismatch_type_returns_error() {
863 let tempdir = tempfile::tempdir().expect("failed to create tempdir");
864 let storage_dir = open_tempdir(&tempdir);
865
866 let (storage, sync_tasks) = FidlStorage::with_file_proxy(
867 vec![(LibTestStruct::KEY, None)],
868 storage_dir,
869 move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
870 )
871 .await
872 .expect("file proxy should be created");
873 for task in sync_tasks {
874 task.detach();
875 }
876
877 let result = storage.write(LibTestStruct { value: VALUE2 }).await;
879 assert!(result.is_ok());
880
881 let result = storage.write(LibWrongStruct).await;
884 assert_matches!(result, Err(e) if e.to_string() == "Invalid data keyed by WRONG_STRUCT");
885 }
886
887 #[fuchsia::test]
890 fn test_multiple_write_debounce() {
891 let mut executor = TestExecutor::new_with_fake_time();
894 executor.set_fake_time(MonotonicInstant::from_nanos(0));
895
896 let tempdir = tempfile::tempdir().expect("failed to create tempdir");
897 let storage_dir = open_tempdir(&tempdir);
898 let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
899 let mut sync_receiver = interceptor.install_sync_notifier();
900
901 let storage_fut = FidlStorage::with_file_proxy(
902 vec![(LibTestStruct::KEY, None)],
903 Clone::clone(&storage_dir),
904 move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
905 );
906 let (storage, _sync_tasks) =
907 run_until_ready(&mut executor, storage_fut).expect("file proxy should be created");
908
909 let first_value = VALUE1;
910 let second_value = VALUE2;
911 let third_value = VALUE0;
912
913 let value_to_write = LibTestStruct { value: first_value };
915 let result = run_until_ready(&mut executor, storage.write(value_to_write));
917 assert_matches!(result, Result::Ok(UpdateState::Updated));
918
919 assert_file_not_found(&mut executor, &storage_dir, "xyz.pfidl");
921
922 run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
925
926 assert_file_contents(
928 &mut executor,
929 &storage_dir,
930 "xyz.pfidl",
931 value_to_write.to_storable(),
932 );
933
934 let value_to_write2 = LibTestStruct { value: second_value };
936 let result = run_until_ready(&mut executor, storage.write(value_to_write2));
937 assert_matches!(result, Result::Ok(UpdateState::Updated));
939
940 let data = run_until_ready(&mut executor, storage.get::<LibTestStruct>());
942 assert_eq!(data, value_to_write2);
943
944 assert_file_contents(
946 &mut executor,
947 &storage_dir,
948 "xyz.pfidl",
949 value_to_write.to_storable(),
950 );
951
952 let value_to_write3 = LibTestStruct { value: third_value };
954 let result = run_until_ready(&mut executor, storage.write(value_to_write3));
955 assert_matches!(result, Result::Ok(UpdateState::Updated));
957
958 let data = run_until_ready(&mut executor, storage.get::<LibTestStruct>());
961 assert_eq!(data, value_to_write3);
962
963 assert_file_contents(
965 &mut executor,
966 &storage_dir,
967 "xyz.pfidl",
968 value_to_write.to_storable(),
969 );
970
971 executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000 - 1));
973 assert!(!executor.wake_expired_timers());
974
975 assert_file_contents(
977 &mut executor,
978 &storage_dir,
979 "xyz.pfidl",
980 value_to_write.to_storable(),
981 );
982
983 executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000));
985 run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
986
987 assert_file_contents(
989 &mut executor,
990 &storage_dir,
991 "xyz.pfidl",
992 value_to_write3.to_storable(),
993 );
994 }
995
996 #[allow(clippy::unused_unit)]
999 #[test_case(1, 500)]
1000 #[test_case(2, 1_000)]
1001 #[test_case(3, 2_000)]
1002 #[test_case(4, 4_000)]
1003 #[test_case(5, 8_000)]
1004 #[test_case(6, 16_000)]
1005 #[test_case(7, 32_000)]
1006 #[test_case(8, 64_000)]
1007 #[test_case(9, 128_000)]
1008 #[test_case(10, 256_000)]
1009 #[test_case(11, 512_000)]
1010 #[test_case(12, 1_024_000)]
1011 #[test_case(13, 1_800_000)]
1012 #[test_case(14, 1_800_000)]
1013 fn test_exponential_backoff(retry_count: usize, max_wait_time: usize) {
1014 let mut executor = TestExecutor::new_with_fake_time();
1015 executor.set_fake_time(MonotonicInstant::from_nanos(0));
1016
1017 let tempdir = tempfile::tempdir().expect("failed to create tempdir");
1018 let storage_dir = open_tempdir(&tempdir);
1019 let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
1020 let attempts = std::sync::Mutex::new(0);
1021 interceptor.set_open_interceptor(Box::new(move |path, create| {
1022 let mut attempts_guard = attempts.lock().unwrap();
1023 if path == "abc_tmp.pfidl" && create && *attempts_guard < retry_count {
1024 *attempts_guard += 1;
1025 Some(Status::NO_SPACE)
1026 } else {
1027 None
1028 }
1029 }));
1030 let mut sync_receiver = interceptor.install_sync_notifier();
1031
1032 let expected_data = vec![1];
1033 let cached_storage = Rc::new(Mutex::new(CachedStorage {
1034 current_data: Some(expected_data.clone()),
1035 temp_file_path: "abc_tmp.pfidl".to_owned(),
1036 file_path: "abc.pfidl".to_owned(),
1037 }));
1038
1039 let (sender, receiver) = futures::channel::mpsc::unbounded();
1040
1041 let task = fasync::Task::local(FidlStorage::synchronize_task(
1043 Clone::clone(&storage_dir),
1044 Rc::clone(&cached_storage),
1045 receiver,
1046 ));
1047 futures::pin_mut!(task);
1048
1049 executor.set_fake_time(MonotonicInstant::from_nanos(0));
1050 sender.unbounded_send(()).expect("can send flush signal");
1051 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1052
1053 let mut clock_nanos = 0;
1054 for new_duration in (0..retry_count).map(|i| {
1057 (2_i64.pow(i as u32) * MIN_FLUSH_INTERVAL_MS).min(max_wait_time as i64) * 1_000_000
1058 - (i == retry_count - 1) as i64
1059 }) {
1060 executor.set_fake_time(MonotonicInstant::from_nanos(clock_nanos));
1061 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1063
1064 assert_file_not_found(&mut executor, &storage_dir, "abc_tmp.pfidl");
1066 assert_file_not_found(&mut executor, &storage_dir, "abc.pfidl");
1067
1068 clock_nanos += new_duration;
1069 }
1070
1071 executor.set_fake_time(MonotonicInstant::from_nanos(clock_nanos));
1072 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1074
1075 assert_file_not_found(&mut executor, &storage_dir, "abc_tmp.pfidl");
1077 assert_file_not_found(&mut executor, &storage_dir, "abc.pfidl");
1078
1079 clock_nanos += 1;
1081 executor.set_fake_time(MonotonicInstant::from_nanos(clock_nanos));
1082 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1083 run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
1084
1085 let read_fut = fuchsia_fs::directory::read_file(&storage_dir, "abc.pfidl");
1087 let data = run_until_ready(&mut executor, read_fut).expect("reading file");
1088 assert_eq!(data, expected_data);
1089
1090 drop(sender);
1091 run_until_ready(&mut executor, task);
1093 }
1094}