settings_storage/
fidl_storage.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::private::Sealed;
6use crate::storage_factory::{DefaultLoader, NoneT};
7use crate::UpdateState;
8use anyhow::{bail, format_err, Context, Error};
9use fidl::{persist, unpersist, Persistable, Status};
10use fidl_fuchsia_io::DirectoryProxy;
11use fuchsia_async::{MonotonicInstant, Task, Timer};
12use fuchsia_fs::file::ReadError;
13use fuchsia_fs::node::OpenError;
14use fuchsia_fs::Flags;
15
16use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
17use futures::future::OptionFuture;
18use futures::lock::{Mutex, MutexGuard};
19use futures::{FutureExt, StreamExt};
20use std::any::Any;
21use std::collections::HashMap;
22use std::pin::pin;
23use std::rc::Rc;
24use zx::MonotonicDuration;
25
26/// Minimum amount of time between flushing to disk, in milliseconds. The flush call triggers
27/// file I/O which is slow.
28const MIN_FLUSH_INTERVAL_MS: i64 = 500;
29const MAX_FLUSH_INTERVAL_MS: i64 = 1_800_000; // 30 minutes
30const MIN_FLUSH_DURATION: MonotonicDuration = MonotonicDuration::from_millis(MIN_FLUSH_INTERVAL_MS);
31
32pub trait FidlStorageConvertible {
33    type Storable;
34    type Loader;
35
36    const KEY: &'static str;
37
38    fn to_storable(self) -> Self::Storable;
39    fn from_storable(storable: Self::Storable) -> Self;
40}
41
42/// Stores device level settings in persistent storage.
43/// User level settings should not use this.
44pub struct FidlStorage {
45    /// Map of [`FidlStorageConvertible`] keys to their typed storage.
46    typed_storage_map: HashMap<&'static str, TypedStorage>,
47
48    typed_loader_map: HashMap<&'static str, Box<dyn Any>>,
49
50    /// If true, reads will be returned from the data in memory rather than reading from storage.
51    caching_enabled: bool,
52
53    /// If true, writes to the underlying storage will only occur at most every
54    /// [MIN_WRITE_INTERVAL_MS].
55    debounce_writes: bool,
56
57    storage_dir: DirectoryProxy,
58}
59
60/// A wrapper for managing all communication and caching for one particular type of data being
61/// stored. The actual types are erased.
62struct TypedStorage {
63    /// Sender to communicate with task loop that handles flushes.
64    flush_sender: UnboundedSender<()>,
65
66    /// Cached storage managed through interior mutability.
67    cached_storage: Rc<Mutex<CachedStorage>>,
68}
69
70/// `CachedStorage` abstracts over a cached value that's read from and written
71/// to some backing store.
72struct CachedStorage {
73    /// Cache for the most recently read or written value. The value is stored as the encoded bytes
74    /// of the persistent fidl.
75    current_data: Option<Vec<u8>>,
76
77    /// File path that will be used to write a temporary file when syncing to disk. After syncing,
78    /// this file is deleted.
79    ///
80    /// The approach used for syncing is:
81    /// * Write data to temp file
82    /// * Rename temp file to permanent file
83    /// * Delete temp file.
84    ///
85    /// This ensures that even if there's a power cut, the data in the permanent file is never
86    /// partially written.
87    temp_file_path: String,
88
89    /// File path to used for permanent file storage on disk.
90    file_path: String,
91}
92
93impl CachedStorage {
94    /// Triggers a sync on the file proxy.
95    async fn sync(&mut self, storage_dir: &DirectoryProxy) -> Result<(), Error> {
96        // Scope is important. file_proxy needs to be out-of-scope when the directory is renamed.
97        {
98            let file_proxy = fuchsia_fs::directory::open_file(
99                storage_dir,
100                &self.temp_file_path,
101                Flags::FLAG_MUST_CREATE
102                    | Flags::FILE_TRUNCATE
103                    | fuchsia_fs::PERM_READABLE
104                    | fuchsia_fs::PERM_WRITABLE,
105            )
106            .await
107            .with_context(|| format!("unable to open {:?} for writing", self.temp_file_path))?;
108            fuchsia_fs::file::write(&file_proxy, self.current_data.as_ref().unwrap())
109                .await
110                .context("failed to write data to file")?;
111            file_proxy
112                .close()
113                .await
114                .context("failed to call close on temp file")?
115                .map_err(zx::Status::from_raw)?;
116        }
117        fuchsia_fs::directory::rename(storage_dir, &self.temp_file_path, &self.file_path)
118            .await
119            .context("failed to rename temp file to permanent file")?;
120        storage_dir
121            .sync()
122            .await
123            .context("failed to call sync on directory after rename")?
124            .map_err(zx::Status::from_raw)
125            // This is only returned when the directory is backed by a VFS, so this is fine to
126            // ignore.
127            .or_else(|e| if let zx::Status::NOT_SUPPORTED = e { Ok(()) } else { Err(e) })
128            .context("failed to sync rename to directory")
129    }
130}
131
132impl FidlStorage {
133    /// Construct a fidl storage from:
134    /// * The iterable item, which will produce the keys for storage
135    /// * A generator function that will produce a file proxy for each key. It will return the temp
136    ///   file path and final file path for storing the data for this key.
137    ///
138    /// On success, returns the FidlStorage as well as the list of background synchronizing tasks.
139    /// The background tasks can be awaited or detached.
140    pub(crate) async fn with_file_proxy<I, G>(
141        iter: I,
142        storage_dir: DirectoryProxy,
143        files_generator: G,
144    ) -> Result<(Self, Vec<Task<()>>), Error>
145    where
146        I: IntoIterator<Item = (&'static str, Option<Box<dyn Any>>)>,
147        G: Fn(&'static str) -> Result<(String, String), Error>,
148    {
149        let mut typed_storage_map = HashMap::new();
150        let iter = iter.into_iter();
151        typed_storage_map.reserve(iter.size_hint().0);
152        let mut typed_loader_map = HashMap::new();
153        let mut sync_tasks = Vec::with_capacity(iter.size_hint().0);
154        for (key, loader) in iter {
155            // Generate a separate file proxy for each key.
156            let (flush_sender, flush_receiver) = futures::channel::mpsc::unbounded::<()>();
157            let (temp_file_path, file_path) =
158                files_generator(key).context("failed to generate file")?;
159
160            let cached_storage = Rc::new(Mutex::new(CachedStorage {
161                current_data: None,
162                temp_file_path,
163                file_path,
164            }));
165            let storage = TypedStorage { flush_sender, cached_storage: Rc::clone(&cached_storage) };
166
167            // Each key has an independent flush queue.
168            let sync_task = Task::local(Self::synchronize_task(
169                Clone::clone(&storage_dir),
170                cached_storage,
171                flush_receiver,
172            ));
173            sync_tasks.push(sync_task);
174            let _ = typed_storage_map.insert(key, storage);
175            if let Some(loader) = loader {
176                let _ = typed_loader_map.insert(key, loader);
177            }
178        }
179        Ok((
180            FidlStorage {
181                caching_enabled: true,
182                debounce_writes: true,
183                typed_storage_map,
184                typed_loader_map,
185                storage_dir,
186            },
187            sync_tasks,
188        ))
189    }
190
191    async fn synchronize_task(
192        storage_dir: DirectoryProxy,
193        cached_storage: Rc<Mutex<CachedStorage>>,
194        flush_receiver: UnboundedReceiver<()>,
195    ) {
196        let mut has_pending_flush = false;
197
198        // The time of the last flush. Initialized to MIN_FLUSH_INTERVAL_MS before the
199        // current time so that the first flush always goes through, no matter the
200        // timing.
201        let mut last_flush: MonotonicInstant = MonotonicInstant::now() - MIN_FLUSH_DURATION;
202
203        // Timer for flush cooldown. OptionFuture allows us to wait on the future even
204        // if it's None.
205        let mut next_flush_timer = pin!(OptionFuture::<Timer>::from(None).fuse());
206        let mut retries = 0;
207        let mut retrying = false;
208
209        let flush_fuse = flush_receiver.fuse();
210
211        futures::pin_mut!(flush_fuse);
212        loop {
213            futures::select! {
214                _ = flush_fuse.select_next_some() => {
215                    // Flush currently unable to complete. Don't prevent exponential
216                    // backoff from occurring.
217                    if retrying {
218                        continue;
219                    }
220
221                    // Received a request to do a flush.
222                    let now = MonotonicInstant::now();
223                    let next_flush_time = if now - last_flush > MIN_FLUSH_DURATION {
224                        // Last flush happened more than MIN_FLUSH_INTERVAL_MS ago,
225                        // flush immediately in next iteration of loop.
226                        now
227                    } else {
228                        // Last flush was less than MIN_FLUSH_INTERVAL_MS ago, schedule
229                        // it accordingly. It's okay if the time is in the past, Timer
230                        // will still trigger on the next loop iteration.
231                        last_flush + MIN_FLUSH_DURATION
232                    };
233
234                    has_pending_flush = true;
235                    next_flush_timer.set(OptionFuture::from(Some(Timer::new(next_flush_time))).fuse());
236                }
237
238                _ = next_flush_timer => {
239                    // Timer triggered, check for pending syncs.
240                    if has_pending_flush {
241                        let mut cached_storage = cached_storage.lock().await;
242
243                        // If the sync fails, exponentionally backoff the syncs until a
244                        // maximum wait time.
245                        if let Err(e) = cached_storage.sync(&storage_dir).await {
246                            retrying = true;
247                            let flush_duration = MonotonicDuration::from_millis(
248                                2_i64.saturating_pow(retries)
249                                    .saturating_mul(MIN_FLUSH_INTERVAL_MS)
250                                    .min(MAX_FLUSH_INTERVAL_MS)
251                            );
252                            let next_flush_time = MonotonicInstant::now() + flush_duration;
253                            log::error!(
254                                "Failed to sync write to disk for {:?}, delaying by {:?}, \
255                                    caused by: {:?}",
256                                cached_storage.file_path,
257                                flush_duration,
258                                e
259                            );
260
261                            // Reset the timer so we can try again in the future
262                            next_flush_timer.set(OptionFuture::from(Some(Timer::new(next_flush_time))).fuse());
263                            retries += 1;
264                            continue;
265                        }
266                        last_flush = MonotonicInstant::now();
267                        has_pending_flush = false;
268                        retrying = false;
269                        retries = 0;
270                    }
271                }
272
273                complete => break,
274            }
275        }
276    }
277
278    #[cfg(test)]
279    // TODO(https://fxbug.dev/42172967) Remove allow once all tests have been migrated to fidl storage.
280    #[allow(dead_code)]
281    fn set_caching_enabled(&mut self, enabled: bool) {
282        self.caching_enabled = enabled;
283    }
284
285    #[cfg(test)]
286    // TODO(https://fxbug.dev/42172967) Remove allow once all tests have been migrated to fidl storage.
287    #[allow(dead_code)]
288    fn set_debounce_writes(&mut self, debounce: bool) {
289        self.debounce_writes = debounce;
290    }
291
292    async fn inner_write(
293        &self,
294        key: &'static str,
295        new_value: Vec<u8>,
296    ) -> Result<UpdateState, Error> {
297        let typed_storage = self
298            .typed_storage_map
299            .get(key)
300            .ok_or_else(|| format_err!("Invalid data keyed by {}", key))?;
301        let mut cached_storage = typed_storage.cached_storage.lock().await;
302        let bytes;
303        let cached_value = match cached_storage.current_data.as_ref() {
304            Some(cached_value) => Some(cached_value),
305            None => {
306                let file_proxy = fuchsia_fs::directory::open_file(
307                    &self.storage_dir,
308                    &cached_storage.file_path,
309                    fuchsia_fs::PERM_READABLE,
310                )
311                .await;
312                bytes = match file_proxy {
313                    Ok(file_proxy) => match fuchsia_fs::file::read(&file_proxy).await {
314                        Ok(bytes) => Some(bytes),
315                        Err(ReadError::Open(OpenError::OpenError(e))) if e == Status::NOT_FOUND => {
316                            None
317                        }
318                        Err(e) => {
319                            bail!("failed to get value from fidl storage for {:?}: {:?}", key, e)
320                        }
321                    },
322                    Err(OpenError::OpenError(Status::NOT_FOUND)) => None,
323                    Err(e) => bail!("unable to read data on disk for {:?}: {:?}", key, e),
324                };
325                bytes.as_ref()
326            }
327        };
328
329        Ok(if cached_value.map(|c| *c != new_value).unwrap_or(true) {
330            cached_storage.current_data = Some(new_value);
331            if !self.debounce_writes {
332                // Not debouncing writes for testing, just sync immediately.
333                cached_storage
334                    .sync(&self.storage_dir)
335                    .await
336                    .with_context(|| format!("Failed to sync data for key {key:?}"))?;
337            } else {
338                typed_storage.flush_sender.unbounded_send(()).with_context(|| {
339                    format!("flush_sender failed to send flush message, associated key is {key}")
340                })?;
341            }
342            UpdateState::Updated
343        } else {
344            UpdateState::Unchanged
345        })
346    }
347
348    /// Write `new_value` to storage. The write will be persisted to disk at a set interval.
349    pub async fn write<T>(&self, new_value: T) -> Result<UpdateState, Error>
350    where
351        T: FidlStorageConvertible,
352        <T as FidlStorageConvertible>::Storable: Persistable,
353    {
354        let new_value = persist(&new_value.to_storable())?;
355        self.inner_write(T::KEY, new_value).await
356    }
357
358    async fn get_inner(&self, key: &'static str) -> MutexGuard<'_, CachedStorage> {
359        let typed_storage = self
360            .typed_storage_map
361            .get(key)
362            // TODO(https://fxbug.dev/42064613) Replace this with an error result.
363            .unwrap_or_else(|| panic!("Invalid data keyed by {key}"));
364        let mut cached_storage = typed_storage.cached_storage.lock().await;
365        if cached_storage.current_data.is_none() || !self.caching_enabled {
366            if let Some(file_proxy) = match fuchsia_fs::directory::open_file(
367                &self.storage_dir,
368                &cached_storage.file_path,
369                fuchsia_fs::PERM_READABLE,
370            )
371            .await
372            {
373                Ok(file_proxy) => Some(file_proxy),
374                Err(OpenError::OpenError(Status::NOT_FOUND)) => None,
375                // TODO(https://fxbug.dev/42064613) Replace this with an error result.
376                Err(e) => panic!("failed to open file for {key:?}: {e:?}"),
377            } {
378                let data = match fuchsia_fs::file::read(&file_proxy).await {
379                    Ok(data) => Some(data),
380                    Err(ReadError::ReadError(Status::NOT_FOUND)) => None,
381                    // TODO(https://fxbug.dev/42064613) Replace this with an error result.
382                    Err(e) => panic!("failed to get fidl data from disk for {key:?}: {e:?}"),
383                };
384
385                cached_storage.current_data = data;
386            }
387        }
388
389        cached_storage
390    }
391
392    /// Gets the latest value cached locally, or loads the value from storage.
393    /// Doesn't support multiple concurrent callers of the same struct.
394    pub async fn get<T>(&self) -> T
395    where
396        T: FidlStorageConvertible,
397        T::Storable: Persistable,
398        T::Loader: DefaultDispatcher<T>,
399    {
400        match self.get_inner(T::KEY).await.current_data.as_ref().map(|data| {
401            T::from_storable(
402                unpersist(data).expect("Should not be able to save mismatching types in file"),
403            )
404        }) {
405            Some(data) => data,
406            None => <T::Loader as DefaultDispatcher<T>>::get_default(self),
407        }
408    }
409}
410
411pub trait DefaultDispatcher<T>: Sealed {
412    fn get_default(_: &FidlStorage) -> T;
413}
414
415impl<T> DefaultDispatcher<T> for NoneT
416where
417    T: Default,
418{
419    fn get_default(_: &FidlStorage) -> T {
420        T::default()
421    }
422}
423
424impl<T, L> DefaultDispatcher<T> for L
425where
426    T: FidlStorageConvertible<Loader = L>,
427    L: DefaultLoader<Result = T> + 'static,
428{
429    fn get_default(storage: &FidlStorage) -> T {
430        match storage.typed_loader_map.get(T::KEY) {
431            Some(loader) => match loader.downcast_ref::<T::Loader>() {
432                Some(loader) => loader.default_value(),
433                None => {
434                    panic!("Mismatch key and loader for key {}", T::KEY);
435                }
436            },
437            None => panic!("Missing loader for {}", T::KEY),
438        }
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445    use assert_matches::assert_matches;
446    use fasync::TestExecutor;
447    use fidl::epitaph::ChannelEpitaphExt;
448    use fidl_test_storage::{TestStruct, WrongStruct};
449    use futures::TryStreamExt;
450    use std::sync::Arc;
451    use std::task::Poll;
452    use test_case::test_case;
453    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
454
455    const VALUE0: i32 = 3;
456    const VALUE1: i32 = 33;
457    const VALUE2: i32 = 128;
458
459    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
460    struct LibTestStruct {
461        value: i32,
462    }
463
464    impl FidlStorageConvertible for LibTestStruct {
465        type Storable = TestStruct;
466        type Loader = NoneT;
467        const KEY: &'static str = "testkey";
468
469        fn to_storable(self) -> Self::Storable {
470            TestStruct { value: self.value }
471        }
472
473        fn from_storable(storable: Self::Storable) -> Self {
474            Self { value: storable.value }
475        }
476    }
477
478    impl Default for LibTestStruct {
479        fn default() -> Self {
480            Self { value: VALUE0 }
481        }
482    }
483
484    fn open_tempdir(tempdir: &tempfile::TempDir) -> fio::DirectoryProxy {
485        fuchsia_fs::directory::open_in_namespace(
486            tempdir.path().to_str().expect("tempdir path is not valid UTF-8"),
487            fuchsia_fs::PERM_READABLE | fuchsia_fs::PERM_WRITABLE,
488        )
489        .expect("failed to open connection to tempdir")
490    }
491
492    #[fuchsia::test]
493    async fn test_get() {
494        let value_to_get = LibTestStruct { value: VALUE1 };
495        let tempdir = tempfile::tempdir().expect("failed to create tempdir");
496        let content = persist(&value_to_get.to_storable()).unwrap();
497        std::fs::write(tempdir.path().join("xyz.pfidl"), content).expect("failed to write file");
498        let storage_dir = open_tempdir(&tempdir);
499
500        let (storage, sync_tasks) = FidlStorage::with_file_proxy(
501            vec![(LibTestStruct::KEY, None)],
502            storage_dir,
503            move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
504        )
505        .await
506        .expect("should be able to generate file");
507        for task in sync_tasks {
508            task.detach();
509        }
510        let result = storage.get::<LibTestStruct>().await;
511
512        assert_eq!(result.value, VALUE1);
513    }
514
515    #[fuchsia::test]
516    async fn test_get_default() {
517        let tempdir = tempfile::tempdir().expect("failed to create tempdir");
518        let storage_dir = open_tempdir(&tempdir);
519
520        let (storage, sync_tasks) = FidlStorage::with_file_proxy(
521            vec![(LibTestStruct::KEY, None)],
522            storage_dir,
523            move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
524        )
525        .await
526        .expect("file proxy should be created");
527        for task in sync_tasks {
528            task.detach();
529        }
530        let result = storage.get::<LibTestStruct>().await;
531
532        assert_eq!(result.value, VALUE0);
533    }
534
535    /// Proxies directory request to a real directory while allowing for some of the requests to be
536    /// intercepted.
537    struct DirectoryInterceptor {
538        real_dir: fio::DirectoryProxy,
539        inner: std::sync::Mutex<DirectoryInterceptorInner>,
540    }
541
542    struct DirectoryInterceptorInner {
543        sync_notifier: Option<futures::channel::mpsc::UnboundedSender<()>>,
544        #[allow(clippy::type_complexity)]
545        open_interceptor: Box<dyn Fn(&str, bool) -> Option<Status>>,
546    }
547
548    impl DirectoryInterceptor {
549        // TODO(b/356474618): re-enable and fix existing occurance
550        #[allow(clippy::arc_with_non_send_sync)]
551        fn new(real_dir: fio::DirectoryProxy) -> (Arc<Self>, fio::DirectoryProxy) {
552            let (proxy, requests) =
553                fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
554            let this = Arc::new(Self {
555                real_dir,
556                inner: std::sync::Mutex::new(DirectoryInterceptorInner {
557                    sync_notifier: None,
558                    open_interceptor: Box::new(|_, _| None),
559                }),
560            });
561            fasync::Task::local(this.clone().run(requests)).detach();
562            (this.clone(), proxy)
563        }
564
565        /// Returns a receiver that will be notified after each Sync request to the real directory
566        /// has completed.
567        fn install_sync_notifier(&self) -> futures::channel::mpsc::UnboundedReceiver<()> {
568            let (sender, receiver) = futures::channel::mpsc::unbounded();
569            self.inner.lock().unwrap().sync_notifier = Some(sender);
570            receiver
571        }
572
573        /// Sets a callback to be called on every Open request. If the callback returns an error,
574        /// then the request will be failed with that error instead of being forwarded to the real
575        /// directory.
576        #[allow(clippy::type_complexity)]
577        fn set_open_interceptor(&self, interceptor: Box<dyn Fn(&str, bool) -> Option<Status>>) {
578            self.inner.lock().unwrap().open_interceptor = interceptor;
579        }
580
581        async fn run(self: Arc<Self>, mut requests: fio::DirectoryRequestStream) {
582            while let Ok(Some(request)) = requests.try_next().await {
583                match request {
584                    fio::DirectoryRequest::Open {
585                        path,
586                        flags,
587                        options,
588                        object,
589                        control_handle: _,
590                    } => {
591                        let create = flags.intersects(fio::Flags::FLAG_MUST_CREATE);
592                        match (self.inner.lock().unwrap().open_interceptor)(&path, create) {
593                            Some(status) => {
594                                object.close_with_epitaph(status).expect("failed to send epitaph");
595                            }
596                            None => {
597                                self.real_dir
598                                    .open(&path, flags, &options, object)
599                                    .expect("failed to forward Open3 request");
600                            }
601                        }
602                    }
603                    fio::DirectoryRequest::Sync { responder } => {
604                        let response =
605                            self.real_dir.sync().await.expect("failed to forward Sync request");
606                        responder.send(response).expect("failed to respond to Sync request");
607                        if let Some(sender) = &self.inner.lock().unwrap().sync_notifier {
608                            sender.unbounded_send(()).unwrap();
609                        }
610                    }
611                    fio::DirectoryRequest::Rename { src, dst_parent_token, dst, responder } => {
612                        let response = self
613                            .real_dir
614                            .rename(&src, dst_parent_token, &dst)
615                            .await
616                            .expect("failed to forward Rename request");
617                        responder.send(response).expect("failed to respond to Rename request");
618                    }
619                    fio::DirectoryRequest::GetToken { responder } => {
620                        let response = self
621                            .real_dir
622                            .get_token()
623                            .await
624                            .expect("failed to forward GetToken request");
625                        responder
626                            .send(response.0, response.1)
627                            .expect("failed to respond to GetToken request");
628                    }
629                    request => unimplemented!("request: {:?}", request),
630                }
631            }
632        }
633    }
634
635    /// Repeatedly polls `fut` until it returns `Poll::Ready`. When using a `TestExecutor` with fake
636    /// time, only `run_until_stalled` can be used but `run_until_stalled` is incompatible with
637    /// external filesystems. This function bridges the gap by continuously polling the future until
638    /// the filesystem responds.
639    fn run_until_ready<F>(executor: &mut TestExecutor, fut: F) -> F::Output
640    where
641        F: std::future::Future,
642    {
643        let mut fut = std::pin::pin!(fut);
644        loop {
645            match executor.run_until_stalled(&mut fut) {
646                Poll::Ready(result) => return result,
647                Poll::Pending => std::thread::yield_now(),
648            }
649        }
650    }
651
652    /// Asserts that a file doesn't exist.
653    fn assert_file_not_found(
654        executor: &mut TestExecutor,
655        directory: &fio::DirectoryProxy,
656        file_name: &str,
657    ) {
658        let open_fut =
659            fuchsia_fs::directory::open_file(directory, file_name, fuchsia_fs::PERM_READABLE);
660        let result = run_until_ready(executor, open_fut);
661        assert_matches!(result, Result::Err(e) if e.is_not_found_error());
662    }
663
664    /// Verifies the contents of a file.
665    fn assert_file_contents(
666        executor: &mut TestExecutor,
667        directory: &fio::DirectoryProxy,
668        file_name: &str,
669        expected_contents: TestStruct,
670    ) {
671        let read_fut = fuchsia_fs::directory::read_file(directory, file_name);
672        let data = run_until_ready(executor, read_fut).expect("reading file");
673        let data = fidl::unpersist::<TestStruct>(&data).expect("failed to read file as TestStruct");
674        assert_eq!(data, expected_contents);
675    }
676
677    #[fuchsia::test]
678    fn test_first_write_syncs_immediately() {
679        let written_value = VALUE1;
680        let mut executor = TestExecutor::new_with_fake_time();
681        executor.set_fake_time(MonotonicInstant::from_nanos(0));
682
683        let tempdir = tempfile::tempdir().expect("failed to create tempdir");
684        let storage_dir = open_tempdir(&tempdir);
685        let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
686        let mut sync_receiver = interceptor.install_sync_notifier();
687
688        let storage_fut = FidlStorage::with_file_proxy(
689            vec![(LibTestStruct::KEY, None)],
690            Clone::clone(&storage_dir),
691            move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
692        );
693        futures::pin_mut!(storage_fut);
694
695        let (storage, _sync_tasks) =
696            if let Poll::Ready(storage) = executor.run_until_stalled(&mut storage_fut) {
697                storage.expect("file proxy should be created")
698            } else {
699                panic!("storage creation stalled");
700            };
701
702        // Write to device storage.
703        let value_to_write = LibTestStruct { value: written_value };
704        let write_future = storage.write(value_to_write);
705        futures::pin_mut!(write_future);
706
707        // Initial cache check is done if no read was ever performed.
708        assert_matches!(
709            run_until_ready(&mut executor, &mut write_future),
710            Result::Ok(UpdateState::Updated)
711        );
712
713        // Storage is not yet ready.
714        assert_file_not_found(&mut executor, &storage_dir, "xyz.pfidl");
715
716        // Wait for the sync task to complete.
717        run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
718
719        // Validate the value matches what was set.
720        assert_file_contents(
721            &mut executor,
722            &storage_dir,
723            "xyz.pfidl",
724            value_to_write.to_storable(),
725        );
726    }
727
728    #[fuchsia::test]
729    fn test_second_write_syncs_after_interval() {
730        let written_value = VALUE1;
731        let second_value = VALUE2;
732        let mut executor = TestExecutor::new_with_fake_time();
733        executor.set_fake_time(MonotonicInstant::from_nanos(0));
734
735        let tempdir = tempfile::tempdir().expect("failed to create tempdir");
736        let storage_dir = open_tempdir(&tempdir);
737        let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
738        let mut sync_receiver = interceptor.install_sync_notifier();
739
740        let storage_fut = FidlStorage::with_file_proxy(
741            vec![(LibTestStruct::KEY, None)],
742            Clone::clone(&storage_dir),
743            move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
744        );
745        futures::pin_mut!(storage_fut);
746
747        let (storage, _sync_tasks) =
748            if let Poll::Ready(storage) = executor.run_until_stalled(&mut storage_fut) {
749                storage.expect("file proxy should be created")
750            } else {
751                panic!("storage creation stalled");
752            };
753
754        // Write to device storage.
755        let value_to_write = LibTestStruct { value: written_value };
756        let write_future = storage.write(value_to_write);
757        futures::pin_mut!(write_future);
758
759        // Initial cache check is done if no read was ever performed.
760        assert_matches!(
761            run_until_ready(&mut executor, &mut write_future),
762            Result::Ok(UpdateState::Updated)
763        );
764
765        // Storage is not yet ready.
766        assert_file_not_found(&mut executor, &storage_dir, "xyz.pfidl");
767
768        // Wait for the sync task to complete.
769        run_until_ready(&mut executor, &mut sync_receiver.next()).expect("directory never synced");
770
771        // Validate that the file has been synced.
772        assert_file_contents(
773            &mut executor,
774            &storage_dir,
775            "xyz.pfidl",
776            value_to_write.to_storable(),
777        );
778
779        // Write second time to device storage.
780        let value_to_write2 = LibTestStruct { value: second_value };
781        let write_future = storage.write(value_to_write2);
782        futures::pin_mut!(write_future);
783
784        // Initial cache check is done if no read was ever performed.
785        assert_matches!(
786            run_until_ready(&mut executor, &mut write_future),
787            Result::Ok(UpdateState::Updated)
788        );
789
790        // Storage is not yet ready, should still equal old value.
791        assert_file_contents(
792            &mut executor,
793            &storage_dir,
794            "xyz.pfidl",
795            value_to_write.to_storable(),
796        );
797
798        // Move executor to just before sync interval.
799        executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000 - 1));
800        assert!(!executor.wake_expired_timers());
801
802        // Move executor to just after sync interval. It should run now.
803        executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000));
804        run_until_ready(&mut executor, &mut sync_receiver.next()).expect("directory never synced");
805
806        // Validate that the file has been synced.
807
808        assert_file_contents(
809            &mut executor,
810            &storage_dir,
811            "xyz.pfidl",
812            value_to_write2.to_storable(),
813        );
814    }
815
816    #[derive(Copy, Clone, Default, Debug)]
817    struct LibWrongStruct;
818
819    impl FidlStorageConvertible for LibWrongStruct {
820        type Storable = WrongStruct;
821        type Loader = NoneT;
822        const KEY: &'static str = "WRONG_STRUCT";
823
824        fn to_storable(self) -> Self::Storable {
825            WrongStruct
826        }
827
828        fn from_storable(_: Self::Storable) -> Self {
829            LibWrongStruct
830        }
831    }
832
833    // Test that attempting to write two kinds of structs to a storage instance that only supports
834    // one results in a failure.
835    #[fuchsia::test]
836    async fn test_write_with_mismatch_type_returns_error() {
837        let tempdir = tempfile::tempdir().expect("failed to create tempdir");
838        let storage_dir = open_tempdir(&tempdir);
839
840        let (storage, sync_tasks) = FidlStorage::with_file_proxy(
841            vec![(LibTestStruct::KEY, None)],
842            storage_dir,
843            move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
844        )
845        .await
846        .expect("file proxy should be created");
847        for task in sync_tasks {
848            task.detach();
849        }
850
851        // Write successfully to storage once.
852        let result = storage.write(LibTestStruct { value: VALUE2 }).await;
853        assert!(result.is_ok());
854
855        // Write to device storage again with a different type to validate that the type can't
856        // be changed.
857        let result = storage.write(LibWrongStruct).await;
858        assert_matches!(result, Err(e) if e.to_string() == "Invalid data keyed by WRONG_STRUCT");
859    }
860
861    // Test that multiple writes to FidlStorage will cause a write each time, but will only
862    // sync to the fs at an interval.
863    #[fuchsia::test]
864    fn test_multiple_write_debounce() {
865        // Custom executor for this test so that we can advance the clock arbitrarily and verify the
866        // state of the executor at any given point.
867        let mut executor = TestExecutor::new_with_fake_time();
868        executor.set_fake_time(MonotonicInstant::from_nanos(0));
869
870        let tempdir = tempfile::tempdir().expect("failed to create tempdir");
871        let storage_dir = open_tempdir(&tempdir);
872        let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
873        let mut sync_receiver = interceptor.install_sync_notifier();
874
875        let storage_fut = FidlStorage::with_file_proxy(
876            vec![(LibTestStruct::KEY, None)],
877            Clone::clone(&storage_dir),
878            move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
879        );
880        let (storage, _sync_tasks) =
881            run_until_ready(&mut executor, storage_fut).expect("file proxy should be created");
882
883        let first_value = VALUE1;
884        let second_value = VALUE2;
885        let third_value = VALUE0;
886
887        // First write finishes immediately.
888        let value_to_write = LibTestStruct { value: first_value };
889        // Initial cache check is done if no read was ever performed.
890        let result = run_until_ready(&mut executor, storage.write(value_to_write));
891        assert_matches!(result, Result::Ok(UpdateState::Updated));
892
893        // Storage is not yet ready.
894        assert_file_not_found(&mut executor, &storage_dir, "xyz.pfidl");
895
896        // Wake the initial time without advancing the clock. Confirms that the first write is
897        // "immediate".
898        run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
899
900        // Validate that the file has been synced.
901        assert_file_contents(
902            &mut executor,
903            &storage_dir,
904            "xyz.pfidl",
905            value_to_write.to_storable(),
906        );
907
908        // Write second time to device storage.
909        let value_to_write2 = LibTestStruct { value: second_value };
910        let result = run_until_ready(&mut executor, storage.write(value_to_write2));
911        // Value is marked as updated after the write.
912        assert_matches!(result, Result::Ok(UpdateState::Updated));
913
914        // Validate the updated values are still returned from the storage cache.
915        let data = run_until_ready(&mut executor, storage.get::<LibTestStruct>());
916        assert_eq!(data, value_to_write2);
917
918        // But the data has not been persisted to disk.
919        assert_file_contents(
920            &mut executor,
921            &storage_dir,
922            "xyz.pfidl",
923            value_to_write.to_storable(),
924        );
925
926        // Now write a third time before advancing the clock.
927        let value_to_write3 = LibTestStruct { value: third_value };
928        let result = run_until_ready(&mut executor, storage.write(value_to_write3));
929        // Value is marked as updated after the write.
930        assert_matches!(result, Result::Ok(UpdateState::Updated));
931
932        // Validate the updated values are still returned from the storage cache.
933
934        let data = run_until_ready(&mut executor, storage.get::<LibTestStruct>());
935        assert_eq!(data, value_to_write3);
936
937        // But the data has still not been persisted to disk.
938        assert_file_contents(
939            &mut executor,
940            &storage_dir,
941            "xyz.pfidl",
942            value_to_write.to_storable(),
943        );
944
945        // Move clock to just before sync interval.
946        executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000 - 1));
947        assert!(!executor.wake_expired_timers());
948
949        // And validate that the data has still not been synced to disk.
950        assert_file_contents(
951            &mut executor,
952            &storage_dir,
953            "xyz.pfidl",
954            value_to_write.to_storable(),
955        );
956
957        // Move executor to just after sync interval.
958        executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000));
959        run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
960
961        // Validate that the file has finally been synced.
962        assert_file_contents(
963            &mut executor,
964            &storage_dir,
965            "xyz.pfidl",
966            value_to_write3.to_storable(),
967        );
968    }
969
970    // Tests that syncing can recover after a failed write. The test cases list the number of failed
971    // attempts and the maximum amount of time waited from the previous write.
972    #[allow(clippy::unused_unit)]
973    #[test_case(1, 500)]
974    #[test_case(2, 1_000)]
975    #[test_case(3, 2_000)]
976    #[test_case(4, 4_000)]
977    #[test_case(5, 8_000)]
978    #[test_case(6, 16_000)]
979    #[test_case(7, 32_000)]
980    #[test_case(8, 64_000)]
981    #[test_case(9, 128_000)]
982    #[test_case(10, 256_000)]
983    #[test_case(11, 512_000)]
984    #[test_case(12, 1_024_000)]
985    #[test_case(13, 1_800_000)]
986    #[test_case(14, 1_800_000)]
987    fn test_exponential_backoff(retry_count: usize, max_wait_time: usize) {
988        let mut executor = TestExecutor::new_with_fake_time();
989        executor.set_fake_time(MonotonicInstant::from_nanos(0));
990
991        let tempdir = tempfile::tempdir().expect("failed to create tempdir");
992        let storage_dir = open_tempdir(&tempdir);
993        let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
994        let attempts = std::sync::Mutex::new(0);
995        interceptor.set_open_interceptor(Box::new(move |path, create| {
996            let mut attempts_guard = attempts.lock().unwrap();
997            if path == "abc_tmp.pfidl" && create && *attempts_guard < retry_count {
998                *attempts_guard += 1;
999                Some(Status::NO_SPACE)
1000            } else {
1001                None
1002            }
1003        }));
1004        let mut sync_receiver = interceptor.install_sync_notifier();
1005
1006        let expected_data = vec![1];
1007        let cached_storage = Rc::new(Mutex::new(CachedStorage {
1008            current_data: Some(expected_data.clone()),
1009            temp_file_path: "abc_tmp.pfidl".to_owned(),
1010            file_path: "abc.pfidl".to_owned(),
1011        }));
1012
1013        let (sender, receiver) = futures::channel::mpsc::unbounded();
1014
1015        // Call spawn in a future since we have to be in an executor context to call spawn.
1016        let task = fasync::Task::local(FidlStorage::synchronize_task(
1017            Clone::clone(&storage_dir),
1018            Rc::clone(&cached_storage),
1019            receiver,
1020        ));
1021        futures::pin_mut!(task);
1022
1023        executor.set_fake_time(MonotonicInstant::from_nanos(0));
1024        sender.unbounded_send(()).expect("can send flush signal");
1025        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1026
1027        let mut clock_nanos = 0;
1028        // (2^i) * 500 = exponential backoff.
1029        // 1,000,000 = convert ms to ns.
1030        for new_duration in (0..retry_count).map(|i| {
1031            (2_i64.pow(i as u32) * MIN_FLUSH_INTERVAL_MS).min(max_wait_time as i64) * 1_000_000
1032                - (i == retry_count - 1) as i64
1033        }) {
1034            executor.set_fake_time(MonotonicInstant::from_nanos(clock_nanos));
1035            // Task should not complete while retrying.
1036            assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1037
1038            // Check that files don't exist.
1039            assert_file_not_found(&mut executor, &storage_dir, "abc_tmp.pfidl");
1040            assert_file_not_found(&mut executor, &storage_dir, "abc.pfidl");
1041
1042            clock_nanos += new_duration;
1043        }
1044
1045        executor.set_fake_time(MonotonicInstant::from_nanos(clock_nanos));
1046        // At this point the clock should be 1ns before the timer, so it shouldn't wake.
1047        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1048
1049        // Check that files don't exist.
1050        assert_file_not_found(&mut executor, &storage_dir, "abc_tmp.pfidl");
1051        assert_file_not_found(&mut executor, &storage_dir, "abc.pfidl");
1052
1053        // Now pass the timer where we can read the result.
1054        clock_nanos += 1;
1055        executor.set_fake_time(MonotonicInstant::from_nanos(clock_nanos));
1056        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1057        run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
1058
1059        // Check that the file now matches what was in the cache.
1060        let read_fut = fuchsia_fs::directory::read_file(&storage_dir, "abc.pfidl");
1061        let data = run_until_ready(&mut executor, read_fut).expect("reading file");
1062        assert_eq!(data, expected_data);
1063
1064        drop(sender);
1065        // Ensure the task can properly exit.
1066        run_until_ready(&mut executor, task);
1067    }
1068}