1use crate::private::Sealed;
6use crate::storage_factory::{DefaultLoader, NoneT};
7use crate::UpdateState;
8use anyhow::{bail, format_err, Context, Error};
9use fidl::{persist, unpersist, Persistable, Status};
10use fidl_fuchsia_io::DirectoryProxy;
11use fuchsia_async::{MonotonicInstant, Task, Timer};
12use fuchsia_fs::file::ReadError;
13use fuchsia_fs::node::OpenError;
14use fuchsia_fs::Flags;
15
16use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
17use futures::future::OptionFuture;
18use futures::lock::{Mutex, MutexGuard};
19use futures::{FutureExt, StreamExt};
20use std::any::Any;
21use std::collections::HashMap;
22use std::pin::pin;
23use std::rc::Rc;
24use zx::MonotonicDuration;
25
26const MIN_FLUSH_INTERVAL_MS: i64 = 500;
29const MAX_FLUSH_INTERVAL_MS: i64 = 1_800_000; const MIN_FLUSH_DURATION: MonotonicDuration = MonotonicDuration::from_millis(MIN_FLUSH_INTERVAL_MS);
31
32pub trait FidlStorageConvertible {
33 type Storable;
34 type Loader;
35
36 const KEY: &'static str;
37
38 fn to_storable(self) -> Self::Storable;
39 fn from_storable(storable: Self::Storable) -> Self;
40}
41
42pub struct FidlStorage {
45 typed_storage_map: HashMap<&'static str, TypedStorage>,
47
48 typed_loader_map: HashMap<&'static str, Box<dyn Any>>,
49
50 caching_enabled: bool,
52
53 debounce_writes: bool,
56
57 storage_dir: DirectoryProxy,
58}
59
60struct TypedStorage {
63 flush_sender: UnboundedSender<()>,
65
66 cached_storage: Rc<Mutex<CachedStorage>>,
68}
69
70struct CachedStorage {
73 current_data: Option<Vec<u8>>,
76
77 temp_file_path: String,
88
89 file_path: String,
91}
92
93impl CachedStorage {
94 async fn sync(&mut self, storage_dir: &DirectoryProxy) -> Result<(), Error> {
96 {
98 let file_proxy = fuchsia_fs::directory::open_file(
99 storage_dir,
100 &self.temp_file_path,
101 Flags::FLAG_MUST_CREATE
102 | Flags::FILE_TRUNCATE
103 | fuchsia_fs::PERM_READABLE
104 | fuchsia_fs::PERM_WRITABLE,
105 )
106 .await
107 .with_context(|| format!("unable to open {:?} for writing", self.temp_file_path))?;
108 fuchsia_fs::file::write(&file_proxy, self.current_data.as_ref().unwrap())
109 .await
110 .context("failed to write data to file")?;
111 file_proxy
112 .close()
113 .await
114 .context("failed to call close on temp file")?
115 .map_err(zx::Status::from_raw)?;
116 }
117 fuchsia_fs::directory::rename(storage_dir, &self.temp_file_path, &self.file_path)
118 .await
119 .context("failed to rename temp file to permanent file")?;
120 storage_dir
121 .sync()
122 .await
123 .context("failed to call sync on directory after rename")?
124 .map_err(zx::Status::from_raw)
125 .or_else(|e| if let zx::Status::NOT_SUPPORTED = e { Ok(()) } else { Err(e) })
128 .context("failed to sync rename to directory")
129 }
130}
131
132impl FidlStorage {
133 pub(crate) async fn with_file_proxy<I, G>(
141 iter: I,
142 storage_dir: DirectoryProxy,
143 files_generator: G,
144 ) -> Result<(Self, Vec<Task<()>>), Error>
145 where
146 I: IntoIterator<Item = (&'static str, Option<Box<dyn Any>>)>,
147 G: Fn(&'static str) -> Result<(String, String), Error>,
148 {
149 let mut typed_storage_map = HashMap::new();
150 let iter = iter.into_iter();
151 typed_storage_map.reserve(iter.size_hint().0);
152 let mut typed_loader_map = HashMap::new();
153 let mut sync_tasks = Vec::with_capacity(iter.size_hint().0);
154 for (key, loader) in iter {
155 let (flush_sender, flush_receiver) = futures::channel::mpsc::unbounded::<()>();
157 let (temp_file_path, file_path) =
158 files_generator(key).context("failed to generate file")?;
159
160 let cached_storage = Rc::new(Mutex::new(CachedStorage {
161 current_data: None,
162 temp_file_path,
163 file_path,
164 }));
165 let storage = TypedStorage { flush_sender, cached_storage: Rc::clone(&cached_storage) };
166
167 let sync_task = Task::local(Self::synchronize_task(
169 Clone::clone(&storage_dir),
170 cached_storage,
171 flush_receiver,
172 ));
173 sync_tasks.push(sync_task);
174 let _ = typed_storage_map.insert(key, storage);
175 if let Some(loader) = loader {
176 let _ = typed_loader_map.insert(key, loader);
177 }
178 }
179 Ok((
180 FidlStorage {
181 caching_enabled: true,
182 debounce_writes: true,
183 typed_storage_map,
184 typed_loader_map,
185 storage_dir,
186 },
187 sync_tasks,
188 ))
189 }
190
191 async fn synchronize_task(
192 storage_dir: DirectoryProxy,
193 cached_storage: Rc<Mutex<CachedStorage>>,
194 flush_receiver: UnboundedReceiver<()>,
195 ) {
196 let mut has_pending_flush = false;
197
198 let mut last_flush: MonotonicInstant = MonotonicInstant::now() - MIN_FLUSH_DURATION;
202
203 let mut next_flush_timer = pin!(OptionFuture::<Timer>::from(None).fuse());
206 let mut retries = 0;
207 let mut retrying = false;
208
209 let flush_fuse = flush_receiver.fuse();
210
211 futures::pin_mut!(flush_fuse);
212 loop {
213 futures::select! {
214 _ = flush_fuse.select_next_some() => {
215 if retrying {
218 continue;
219 }
220
221 let now = MonotonicInstant::now();
223 let next_flush_time = if now - last_flush > MIN_FLUSH_DURATION {
224 now
227 } else {
228 last_flush + MIN_FLUSH_DURATION
232 };
233
234 has_pending_flush = true;
235 next_flush_timer.set(OptionFuture::from(Some(Timer::new(next_flush_time))).fuse());
236 }
237
238 _ = next_flush_timer => {
239 if has_pending_flush {
241 let mut cached_storage = cached_storage.lock().await;
242
243 if let Err(e) = cached_storage.sync(&storage_dir).await {
246 retrying = true;
247 let flush_duration = MonotonicDuration::from_millis(
248 2_i64.saturating_pow(retries)
249 .saturating_mul(MIN_FLUSH_INTERVAL_MS)
250 .min(MAX_FLUSH_INTERVAL_MS)
251 );
252 let next_flush_time = MonotonicInstant::now() + flush_duration;
253 log::error!(
254 "Failed to sync write to disk for {:?}, delaying by {:?}, \
255 caused by: {:?}",
256 cached_storage.file_path,
257 flush_duration,
258 e
259 );
260
261 next_flush_timer.set(OptionFuture::from(Some(Timer::new(next_flush_time))).fuse());
263 retries += 1;
264 continue;
265 }
266 last_flush = MonotonicInstant::now();
267 has_pending_flush = false;
268 retrying = false;
269 retries = 0;
270 }
271 }
272
273 complete => break,
274 }
275 }
276 }
277
278 #[cfg(test)]
279 #[allow(dead_code)]
281 fn set_caching_enabled(&mut self, enabled: bool) {
282 self.caching_enabled = enabled;
283 }
284
285 #[cfg(test)]
286 #[allow(dead_code)]
288 fn set_debounce_writes(&mut self, debounce: bool) {
289 self.debounce_writes = debounce;
290 }
291
292 async fn inner_write(
293 &self,
294 key: &'static str,
295 new_value: Vec<u8>,
296 ) -> Result<UpdateState, Error> {
297 let typed_storage = self
298 .typed_storage_map
299 .get(key)
300 .ok_or_else(|| format_err!("Invalid data keyed by {}", key))?;
301 let mut cached_storage = typed_storage.cached_storage.lock().await;
302 let bytes;
303 let cached_value = match cached_storage.current_data.as_ref() {
304 Some(cached_value) => Some(cached_value),
305 None => {
306 let file_proxy = fuchsia_fs::directory::open_file(
307 &self.storage_dir,
308 &cached_storage.file_path,
309 fuchsia_fs::PERM_READABLE,
310 )
311 .await;
312 bytes = match file_proxy {
313 Ok(file_proxy) => match fuchsia_fs::file::read(&file_proxy).await {
314 Ok(bytes) => Some(bytes),
315 Err(ReadError::Open(OpenError::OpenError(e))) if e == Status::NOT_FOUND => {
316 None
317 }
318 Err(e) => {
319 bail!("failed to get value from fidl storage for {:?}: {:?}", key, e)
320 }
321 },
322 Err(OpenError::OpenError(Status::NOT_FOUND)) => None,
323 Err(e) => bail!("unable to read data on disk for {:?}: {:?}", key, e),
324 };
325 bytes.as_ref()
326 }
327 };
328
329 Ok(if cached_value.map(|c| *c != new_value).unwrap_or(true) {
330 cached_storage.current_data = Some(new_value);
331 if !self.debounce_writes {
332 cached_storage
334 .sync(&self.storage_dir)
335 .await
336 .with_context(|| format!("Failed to sync data for key {key:?}"))?;
337 } else {
338 typed_storage.flush_sender.unbounded_send(()).with_context(|| {
339 format!("flush_sender failed to send flush message, associated key is {key}")
340 })?;
341 }
342 UpdateState::Updated
343 } else {
344 UpdateState::Unchanged
345 })
346 }
347
348 pub async fn write<T>(&self, new_value: T) -> Result<UpdateState, Error>
350 where
351 T: FidlStorageConvertible,
352 <T as FidlStorageConvertible>::Storable: Persistable,
353 {
354 let new_value = persist(&new_value.to_storable())?;
355 self.inner_write(T::KEY, new_value).await
356 }
357
358 async fn get_inner(&self, key: &'static str) -> MutexGuard<'_, CachedStorage> {
359 let typed_storage = self
360 .typed_storage_map
361 .get(key)
362 .unwrap_or_else(|| panic!("Invalid data keyed by {key}"));
364 let mut cached_storage = typed_storage.cached_storage.lock().await;
365 if cached_storage.current_data.is_none() || !self.caching_enabled {
366 if let Some(file_proxy) = match fuchsia_fs::directory::open_file(
367 &self.storage_dir,
368 &cached_storage.file_path,
369 fuchsia_fs::PERM_READABLE,
370 )
371 .await
372 {
373 Ok(file_proxy) => Some(file_proxy),
374 Err(OpenError::OpenError(Status::NOT_FOUND)) => None,
375 Err(e) => panic!("failed to open file for {key:?}: {e:?}"),
377 } {
378 let data = match fuchsia_fs::file::read(&file_proxy).await {
379 Ok(data) => Some(data),
380 Err(ReadError::ReadError(Status::NOT_FOUND)) => None,
381 Err(e) => panic!("failed to get fidl data from disk for {key:?}: {e:?}"),
383 };
384
385 cached_storage.current_data = data;
386 }
387 }
388
389 cached_storage
390 }
391
392 pub async fn get<T>(&self) -> T
395 where
396 T: FidlStorageConvertible,
397 T::Storable: Persistable,
398 T::Loader: DefaultDispatcher<T>,
399 {
400 match self.get_inner(T::KEY).await.current_data.as_ref().map(|data| {
401 T::from_storable(
402 unpersist(data).expect("Should not be able to save mismatching types in file"),
403 )
404 }) {
405 Some(data) => data,
406 None => <T::Loader as DefaultDispatcher<T>>::get_default(self),
407 }
408 }
409}
410
411pub trait DefaultDispatcher<T>: Sealed {
412 fn get_default(_: &FidlStorage) -> T;
413}
414
415impl<T> DefaultDispatcher<T> for NoneT
416where
417 T: Default,
418{
419 fn get_default(_: &FidlStorage) -> T {
420 T::default()
421 }
422}
423
424impl<T, L> DefaultDispatcher<T> for L
425where
426 T: FidlStorageConvertible<Loader = L>,
427 L: DefaultLoader<Result = T> + 'static,
428{
429 fn get_default(storage: &FidlStorage) -> T {
430 match storage.typed_loader_map.get(T::KEY) {
431 Some(loader) => match loader.downcast_ref::<T::Loader>() {
432 Some(loader) => loader.default_value(),
433 None => {
434 panic!("Mismatch key and loader for key {}", T::KEY);
435 }
436 },
437 None => panic!("Missing loader for {}", T::KEY),
438 }
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445 use assert_matches::assert_matches;
446 use fasync::TestExecutor;
447 use fidl::epitaph::ChannelEpitaphExt;
448 use fidl_test_storage::{TestStruct, WrongStruct};
449 use futures::TryStreamExt;
450 use std::sync::Arc;
451 use std::task::Poll;
452 use test_case::test_case;
453 use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
454
455 const VALUE0: i32 = 3;
456 const VALUE1: i32 = 33;
457 const VALUE2: i32 = 128;
458
459 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
460 struct LibTestStruct {
461 value: i32,
462 }
463
464 impl FidlStorageConvertible for LibTestStruct {
465 type Storable = TestStruct;
466 type Loader = NoneT;
467 const KEY: &'static str = "testkey";
468
469 fn to_storable(self) -> Self::Storable {
470 TestStruct { value: self.value }
471 }
472
473 fn from_storable(storable: Self::Storable) -> Self {
474 Self { value: storable.value }
475 }
476 }
477
478 impl Default for LibTestStruct {
479 fn default() -> Self {
480 Self { value: VALUE0 }
481 }
482 }
483
484 fn open_tempdir(tempdir: &tempfile::TempDir) -> fio::DirectoryProxy {
485 fuchsia_fs::directory::open_in_namespace(
486 tempdir.path().to_str().expect("tempdir path is not valid UTF-8"),
487 fuchsia_fs::PERM_READABLE | fuchsia_fs::PERM_WRITABLE,
488 )
489 .expect("failed to open connection to tempdir")
490 }
491
492 #[fuchsia::test]
493 async fn test_get() {
494 let value_to_get = LibTestStruct { value: VALUE1 };
495 let tempdir = tempfile::tempdir().expect("failed to create tempdir");
496 let content = persist(&value_to_get.to_storable()).unwrap();
497 std::fs::write(tempdir.path().join("xyz.pfidl"), content).expect("failed to write file");
498 let storage_dir = open_tempdir(&tempdir);
499
500 let (storage, sync_tasks) = FidlStorage::with_file_proxy(
501 vec![(LibTestStruct::KEY, None)],
502 storage_dir,
503 move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
504 )
505 .await
506 .expect("should be able to generate file");
507 for task in sync_tasks {
508 task.detach();
509 }
510 let result = storage.get::<LibTestStruct>().await;
511
512 assert_eq!(result.value, VALUE1);
513 }
514
515 #[fuchsia::test]
516 async fn test_get_default() {
517 let tempdir = tempfile::tempdir().expect("failed to create tempdir");
518 let storage_dir = open_tempdir(&tempdir);
519
520 let (storage, sync_tasks) = FidlStorage::with_file_proxy(
521 vec![(LibTestStruct::KEY, None)],
522 storage_dir,
523 move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
524 )
525 .await
526 .expect("file proxy should be created");
527 for task in sync_tasks {
528 task.detach();
529 }
530 let result = storage.get::<LibTestStruct>().await;
531
532 assert_eq!(result.value, VALUE0);
533 }
534
535 struct DirectoryInterceptor {
538 real_dir: fio::DirectoryProxy,
539 inner: std::sync::Mutex<DirectoryInterceptorInner>,
540 }
541
542 struct DirectoryInterceptorInner {
543 sync_notifier: Option<futures::channel::mpsc::UnboundedSender<()>>,
544 #[allow(clippy::type_complexity)]
545 open_interceptor: Box<dyn Fn(&str, bool) -> Option<Status>>,
546 }
547
548 impl DirectoryInterceptor {
549 #[allow(clippy::arc_with_non_send_sync)]
551 fn new(real_dir: fio::DirectoryProxy) -> (Arc<Self>, fio::DirectoryProxy) {
552 let (proxy, requests) =
553 fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
554 let this = Arc::new(Self {
555 real_dir,
556 inner: std::sync::Mutex::new(DirectoryInterceptorInner {
557 sync_notifier: None,
558 open_interceptor: Box::new(|_, _| None),
559 }),
560 });
561 fasync::Task::local(this.clone().run(requests)).detach();
562 (this.clone(), proxy)
563 }
564
565 fn install_sync_notifier(&self) -> futures::channel::mpsc::UnboundedReceiver<()> {
568 let (sender, receiver) = futures::channel::mpsc::unbounded();
569 self.inner.lock().unwrap().sync_notifier = Some(sender);
570 receiver
571 }
572
573 #[allow(clippy::type_complexity)]
577 fn set_open_interceptor(&self, interceptor: Box<dyn Fn(&str, bool) -> Option<Status>>) {
578 self.inner.lock().unwrap().open_interceptor = interceptor;
579 }
580
581 async fn run(self: Arc<Self>, mut requests: fio::DirectoryRequestStream) {
582 while let Ok(Some(request)) = requests.try_next().await {
583 match request {
584 fio::DirectoryRequest::Open {
585 path,
586 flags,
587 options,
588 object,
589 control_handle: _,
590 } => {
591 let create = flags.intersects(fio::Flags::FLAG_MUST_CREATE);
592 match (self.inner.lock().unwrap().open_interceptor)(&path, create) {
593 Some(status) => {
594 object.close_with_epitaph(status).expect("failed to send epitaph");
595 }
596 None => {
597 self.real_dir
598 .open(&path, flags, &options, object)
599 .expect("failed to forward Open3 request");
600 }
601 }
602 }
603 fio::DirectoryRequest::Sync { responder } => {
604 let response =
605 self.real_dir.sync().await.expect("failed to forward Sync request");
606 responder.send(response).expect("failed to respond to Sync request");
607 if let Some(sender) = &self.inner.lock().unwrap().sync_notifier {
608 sender.unbounded_send(()).unwrap();
609 }
610 }
611 fio::DirectoryRequest::Rename { src, dst_parent_token, dst, responder } => {
612 let response = self
613 .real_dir
614 .rename(&src, dst_parent_token, &dst)
615 .await
616 .expect("failed to forward Rename request");
617 responder.send(response).expect("failed to respond to Rename request");
618 }
619 fio::DirectoryRequest::GetToken { responder } => {
620 let response = self
621 .real_dir
622 .get_token()
623 .await
624 .expect("failed to forward GetToken request");
625 responder
626 .send(response.0, response.1)
627 .expect("failed to respond to GetToken request");
628 }
629 request => unimplemented!("request: {:?}", request),
630 }
631 }
632 }
633 }
634
635 fn run_until_ready<F>(executor: &mut TestExecutor, fut: F) -> F::Output
640 where
641 F: std::future::Future,
642 {
643 let mut fut = std::pin::pin!(fut);
644 loop {
645 match executor.run_until_stalled(&mut fut) {
646 Poll::Ready(result) => return result,
647 Poll::Pending => std::thread::yield_now(),
648 }
649 }
650 }
651
652 fn assert_file_not_found(
654 executor: &mut TestExecutor,
655 directory: &fio::DirectoryProxy,
656 file_name: &str,
657 ) {
658 let open_fut =
659 fuchsia_fs::directory::open_file(directory, file_name, fuchsia_fs::PERM_READABLE);
660 let result = run_until_ready(executor, open_fut);
661 assert_matches!(result, Result::Err(e) if e.is_not_found_error());
662 }
663
664 fn assert_file_contents(
666 executor: &mut TestExecutor,
667 directory: &fio::DirectoryProxy,
668 file_name: &str,
669 expected_contents: TestStruct,
670 ) {
671 let read_fut = fuchsia_fs::directory::read_file(directory, file_name);
672 let data = run_until_ready(executor, read_fut).expect("reading file");
673 let data = fidl::unpersist::<TestStruct>(&data).expect("failed to read file as TestStruct");
674 assert_eq!(data, expected_contents);
675 }
676
677 #[fuchsia::test]
678 fn test_first_write_syncs_immediately() {
679 let written_value = VALUE1;
680 let mut executor = TestExecutor::new_with_fake_time();
681 executor.set_fake_time(MonotonicInstant::from_nanos(0));
682
683 let tempdir = tempfile::tempdir().expect("failed to create tempdir");
684 let storage_dir = open_tempdir(&tempdir);
685 let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
686 let mut sync_receiver = interceptor.install_sync_notifier();
687
688 let storage_fut = FidlStorage::with_file_proxy(
689 vec![(LibTestStruct::KEY, None)],
690 Clone::clone(&storage_dir),
691 move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
692 );
693 futures::pin_mut!(storage_fut);
694
695 let (storage, _sync_tasks) =
696 if let Poll::Ready(storage) = executor.run_until_stalled(&mut storage_fut) {
697 storage.expect("file proxy should be created")
698 } else {
699 panic!("storage creation stalled");
700 };
701
702 let value_to_write = LibTestStruct { value: written_value };
704 let write_future = storage.write(value_to_write);
705 futures::pin_mut!(write_future);
706
707 assert_matches!(
709 run_until_ready(&mut executor, &mut write_future),
710 Result::Ok(UpdateState::Updated)
711 );
712
713 assert_file_not_found(&mut executor, &storage_dir, "xyz.pfidl");
715
716 run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
718
719 assert_file_contents(
721 &mut executor,
722 &storage_dir,
723 "xyz.pfidl",
724 value_to_write.to_storable(),
725 );
726 }
727
728 #[fuchsia::test]
729 fn test_second_write_syncs_after_interval() {
730 let written_value = VALUE1;
731 let second_value = VALUE2;
732 let mut executor = TestExecutor::new_with_fake_time();
733 executor.set_fake_time(MonotonicInstant::from_nanos(0));
734
735 let tempdir = tempfile::tempdir().expect("failed to create tempdir");
736 let storage_dir = open_tempdir(&tempdir);
737 let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
738 let mut sync_receiver = interceptor.install_sync_notifier();
739
740 let storage_fut = FidlStorage::with_file_proxy(
741 vec![(LibTestStruct::KEY, None)],
742 Clone::clone(&storage_dir),
743 move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
744 );
745 futures::pin_mut!(storage_fut);
746
747 let (storage, _sync_tasks) =
748 if let Poll::Ready(storage) = executor.run_until_stalled(&mut storage_fut) {
749 storage.expect("file proxy should be created")
750 } else {
751 panic!("storage creation stalled");
752 };
753
754 let value_to_write = LibTestStruct { value: written_value };
756 let write_future = storage.write(value_to_write);
757 futures::pin_mut!(write_future);
758
759 assert_matches!(
761 run_until_ready(&mut executor, &mut write_future),
762 Result::Ok(UpdateState::Updated)
763 );
764
765 assert_file_not_found(&mut executor, &storage_dir, "xyz.pfidl");
767
768 run_until_ready(&mut executor, &mut sync_receiver.next()).expect("directory never synced");
770
771 assert_file_contents(
773 &mut executor,
774 &storage_dir,
775 "xyz.pfidl",
776 value_to_write.to_storable(),
777 );
778
779 let value_to_write2 = LibTestStruct { value: second_value };
781 let write_future = storage.write(value_to_write2);
782 futures::pin_mut!(write_future);
783
784 assert_matches!(
786 run_until_ready(&mut executor, &mut write_future),
787 Result::Ok(UpdateState::Updated)
788 );
789
790 assert_file_contents(
792 &mut executor,
793 &storage_dir,
794 "xyz.pfidl",
795 value_to_write.to_storable(),
796 );
797
798 executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000 - 1));
800 assert!(!executor.wake_expired_timers());
801
802 executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000));
804 run_until_ready(&mut executor, &mut sync_receiver.next()).expect("directory never synced");
805
806 assert_file_contents(
809 &mut executor,
810 &storage_dir,
811 "xyz.pfidl",
812 value_to_write2.to_storable(),
813 );
814 }
815
816 #[derive(Copy, Clone, Default, Debug)]
817 struct LibWrongStruct;
818
819 impl FidlStorageConvertible for LibWrongStruct {
820 type Storable = WrongStruct;
821 type Loader = NoneT;
822 const KEY: &'static str = "WRONG_STRUCT";
823
824 fn to_storable(self) -> Self::Storable {
825 WrongStruct
826 }
827
828 fn from_storable(_: Self::Storable) -> Self {
829 LibWrongStruct
830 }
831 }
832
833 #[fuchsia::test]
836 async fn test_write_with_mismatch_type_returns_error() {
837 let tempdir = tempfile::tempdir().expect("failed to create tempdir");
838 let storage_dir = open_tempdir(&tempdir);
839
840 let (storage, sync_tasks) = FidlStorage::with_file_proxy(
841 vec![(LibTestStruct::KEY, None)],
842 storage_dir,
843 move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
844 )
845 .await
846 .expect("file proxy should be created");
847 for task in sync_tasks {
848 task.detach();
849 }
850
851 let result = storage.write(LibTestStruct { value: VALUE2 }).await;
853 assert!(result.is_ok());
854
855 let result = storage.write(LibWrongStruct).await;
858 assert_matches!(result, Err(e) if e.to_string() == "Invalid data keyed by WRONG_STRUCT");
859 }
860
861 #[fuchsia::test]
864 fn test_multiple_write_debounce() {
865 let mut executor = TestExecutor::new_with_fake_time();
868 executor.set_fake_time(MonotonicInstant::from_nanos(0));
869
870 let tempdir = tempfile::tempdir().expect("failed to create tempdir");
871 let storage_dir = open_tempdir(&tempdir);
872 let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
873 let mut sync_receiver = interceptor.install_sync_notifier();
874
875 let storage_fut = FidlStorage::with_file_proxy(
876 vec![(LibTestStruct::KEY, None)],
877 Clone::clone(&storage_dir),
878 move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
879 );
880 let (storage, _sync_tasks) =
881 run_until_ready(&mut executor, storage_fut).expect("file proxy should be created");
882
883 let first_value = VALUE1;
884 let second_value = VALUE2;
885 let third_value = VALUE0;
886
887 let value_to_write = LibTestStruct { value: first_value };
889 let result = run_until_ready(&mut executor, storage.write(value_to_write));
891 assert_matches!(result, Result::Ok(UpdateState::Updated));
892
893 assert_file_not_found(&mut executor, &storage_dir, "xyz.pfidl");
895
896 run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
899
900 assert_file_contents(
902 &mut executor,
903 &storage_dir,
904 "xyz.pfidl",
905 value_to_write.to_storable(),
906 );
907
908 let value_to_write2 = LibTestStruct { value: second_value };
910 let result = run_until_ready(&mut executor, storage.write(value_to_write2));
911 assert_matches!(result, Result::Ok(UpdateState::Updated));
913
914 let data = run_until_ready(&mut executor, storage.get::<LibTestStruct>());
916 assert_eq!(data, value_to_write2);
917
918 assert_file_contents(
920 &mut executor,
921 &storage_dir,
922 "xyz.pfidl",
923 value_to_write.to_storable(),
924 );
925
926 let value_to_write3 = LibTestStruct { value: third_value };
928 let result = run_until_ready(&mut executor, storage.write(value_to_write3));
929 assert_matches!(result, Result::Ok(UpdateState::Updated));
931
932 let data = run_until_ready(&mut executor, storage.get::<LibTestStruct>());
935 assert_eq!(data, value_to_write3);
936
937 assert_file_contents(
939 &mut executor,
940 &storage_dir,
941 "xyz.pfidl",
942 value_to_write.to_storable(),
943 );
944
945 executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000 - 1));
947 assert!(!executor.wake_expired_timers());
948
949 assert_file_contents(
951 &mut executor,
952 &storage_dir,
953 "xyz.pfidl",
954 value_to_write.to_storable(),
955 );
956
957 executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000));
959 run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
960
961 assert_file_contents(
963 &mut executor,
964 &storage_dir,
965 "xyz.pfidl",
966 value_to_write3.to_storable(),
967 );
968 }
969
970 #[allow(clippy::unused_unit)]
973 #[test_case(1, 500)]
974 #[test_case(2, 1_000)]
975 #[test_case(3, 2_000)]
976 #[test_case(4, 4_000)]
977 #[test_case(5, 8_000)]
978 #[test_case(6, 16_000)]
979 #[test_case(7, 32_000)]
980 #[test_case(8, 64_000)]
981 #[test_case(9, 128_000)]
982 #[test_case(10, 256_000)]
983 #[test_case(11, 512_000)]
984 #[test_case(12, 1_024_000)]
985 #[test_case(13, 1_800_000)]
986 #[test_case(14, 1_800_000)]
987 fn test_exponential_backoff(retry_count: usize, max_wait_time: usize) {
988 let mut executor = TestExecutor::new_with_fake_time();
989 executor.set_fake_time(MonotonicInstant::from_nanos(0));
990
991 let tempdir = tempfile::tempdir().expect("failed to create tempdir");
992 let storage_dir = open_tempdir(&tempdir);
993 let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
994 let attempts = std::sync::Mutex::new(0);
995 interceptor.set_open_interceptor(Box::new(move |path, create| {
996 let mut attempts_guard = attempts.lock().unwrap();
997 if path == "abc_tmp.pfidl" && create && *attempts_guard < retry_count {
998 *attempts_guard += 1;
999 Some(Status::NO_SPACE)
1000 } else {
1001 None
1002 }
1003 }));
1004 let mut sync_receiver = interceptor.install_sync_notifier();
1005
1006 let expected_data = vec![1];
1007 let cached_storage = Rc::new(Mutex::new(CachedStorage {
1008 current_data: Some(expected_data.clone()),
1009 temp_file_path: "abc_tmp.pfidl".to_owned(),
1010 file_path: "abc.pfidl".to_owned(),
1011 }));
1012
1013 let (sender, receiver) = futures::channel::mpsc::unbounded();
1014
1015 let task = fasync::Task::local(FidlStorage::synchronize_task(
1017 Clone::clone(&storage_dir),
1018 Rc::clone(&cached_storage),
1019 receiver,
1020 ));
1021 futures::pin_mut!(task);
1022
1023 executor.set_fake_time(MonotonicInstant::from_nanos(0));
1024 sender.unbounded_send(()).expect("can send flush signal");
1025 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1026
1027 let mut clock_nanos = 0;
1028 for new_duration in (0..retry_count).map(|i| {
1031 (2_i64.pow(i as u32) * MIN_FLUSH_INTERVAL_MS).min(max_wait_time as i64) * 1_000_000
1032 - (i == retry_count - 1) as i64
1033 }) {
1034 executor.set_fake_time(MonotonicInstant::from_nanos(clock_nanos));
1035 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1037
1038 assert_file_not_found(&mut executor, &storage_dir, "abc_tmp.pfidl");
1040 assert_file_not_found(&mut executor, &storage_dir, "abc.pfidl");
1041
1042 clock_nanos += new_duration;
1043 }
1044
1045 executor.set_fake_time(MonotonicInstant::from_nanos(clock_nanos));
1046 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1048
1049 assert_file_not_found(&mut executor, &storage_dir, "abc_tmp.pfidl");
1051 assert_file_not_found(&mut executor, &storage_dir, "abc.pfidl");
1052
1053 clock_nanos += 1;
1055 executor.set_fake_time(MonotonicInstant::from_nanos(clock_nanos));
1056 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1057 run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
1058
1059 let read_fut = fuchsia_fs::directory::read_file(&storage_dir, "abc.pfidl");
1061 let data = run_until_ready(&mut executor, read_fut).expect("reading file");
1062 assert_eq!(data, expected_data);
1063
1064 drop(sender);
1065 run_until_ready(&mut executor, task);
1067 }
1068}