settings_storage/
fidl_storage.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::private::Sealed;
6use crate::storage_factory::{DefaultLoader, NoneT};
7use crate::UpdateState;
8use anyhow::{bail, format_err, Context, Error};
9use fidl::{persist, unpersist, Persistable, Status};
10use fidl_fuchsia_io::DirectoryProxy;
11use fuchsia_async::{MonotonicInstant, Task, Timer};
12use fuchsia_fs::file::ReadError;
13use fuchsia_fs::node::OpenError;
14use fuchsia_fs::Flags;
15
16use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
17use futures::future::OptionFuture;
18use futures::lock::{Mutex, MutexGuard};
19use futures::{FutureExt, StreamExt};
20use std::any::Any;
21use std::collections::HashMap;
22use std::pin::pin;
23use std::rc::Rc;
24use zx::MonotonicDuration;
25
26/// Minimum amount of time between flushing to disk, in milliseconds. The flush call triggers
27/// file I/O which is slow.
28const MIN_FLUSH_INTERVAL_MS: i64 = 500;
29const MAX_FLUSH_INTERVAL_MS: i64 = 1_800_000; // 30 minutes
30const MIN_FLUSH_DURATION: MonotonicDuration = MonotonicDuration::from_millis(MIN_FLUSH_INTERVAL_MS);
31
32pub trait FidlStorageConvertible {
33    type Storable;
34    type Loader;
35
36    const KEY: &'static str;
37
38    fn to_storable(self) -> Self::Storable;
39    fn from_storable(storable: Self::Storable) -> Self;
40}
41
42/// Stores device level settings in persistent storage.
43/// User level settings should not use this.
44pub struct FidlStorage {
45    /// Map of [`FidlStorageConvertible`] keys to their typed storage.
46    typed_storage_map: HashMap<&'static str, TypedStorage>,
47
48    typed_loader_map: HashMap<&'static str, Box<dyn Any>>,
49
50    /// If true, reads will be returned from the data in memory rather than reading from storage.
51    caching_enabled: bool,
52
53    /// If true, writes to the underlying storage will only occur at most every
54    /// [MIN_WRITE_INTERVAL_MS].
55    debounce_writes: bool,
56
57    storage_dir: DirectoryProxy,
58}
59
60/// A wrapper for managing all communication and caching for one particular type of data being
61/// stored. The actual types are erased.
62struct TypedStorage {
63    /// Sender to communicate with task loop that handles flushes.
64    flush_sender: UnboundedSender<()>,
65
66    /// Cached storage managed through interior mutability.
67    cached_storage: Rc<Mutex<CachedStorage>>,
68}
69
70/// `CachedStorage` abstracts over a cached value that's read from and written
71/// to some backing store.
72struct CachedStorage {
73    /// Cache for the most recently read or written value. The value is stored as the encoded bytes
74    /// of the persistent fidl.
75    current_data: Option<Vec<u8>>,
76
77    /// File path that will be used to write a temporary file when syncing to disk. After syncing,
78    /// this file is deleted.
79    ///
80    /// The approach used for syncing is:
81    /// * Write data to temp file
82    /// * Rename temp file to permanent file
83    /// * Delete temp file.
84    ///
85    /// This ensures that even if there's a power cut, the data in the permanent file is never
86    /// partially written.
87    temp_file_path: String,
88
89    /// File path to used for permanent file storage on disk.
90    file_path: String,
91}
92
93impl CachedStorage {
94    /// Triggers a sync on the file proxy.
95    async fn sync(&mut self, storage_dir: &DirectoryProxy) -> Result<(), Error> {
96        // Scope is important. file_proxy needs to be out-of-scope when the directory is renamed.
97        {
98            let file_proxy = fuchsia_fs::directory::open_file(
99                storage_dir,
100                &self.temp_file_path,
101                Flags::FLAG_MUST_CREATE
102                    | Flags::FILE_TRUNCATE
103                    | fuchsia_fs::PERM_READABLE
104                    | fuchsia_fs::PERM_WRITABLE,
105            )
106            .await
107            .with_context(|| format!("unable to open {:?} for writing", self.temp_file_path))?;
108            fuchsia_fs::file::write(&file_proxy, self.current_data.as_ref().unwrap())
109                .await
110                .context("failed to write data to file")?;
111            file_proxy
112                .close()
113                .await
114                .context("failed to call close on temp file")?
115                .map_err(zx::Status::from_raw)?;
116        }
117        fuchsia_fs::directory::rename(storage_dir, &self.temp_file_path, &self.file_path)
118            .await
119            .context("failed to rename temp file to permanent file")?;
120        storage_dir
121            .sync()
122            .await
123            .context("failed to call sync on directory after rename")?
124            .map_err(zx::Status::from_raw)
125            // This is only returned when the directory is backed by a VFS, so this is fine to
126            // ignore.
127            .or_else(|e| if let zx::Status::NOT_SUPPORTED = e { Ok(()) } else { Err(e) })
128            .context("failed to sync rename to directory")
129    }
130}
131
132impl FidlStorage {
133    /// Construct a fidl storage from:
134    /// * The iterable item, which will produce the keys for storage
135    /// * A generator function that will produce a file proxy for each key. It will return the temp
136    ///   file path and final file path for storing the data for this key.
137    ///
138    /// On success, returns the FidlStorage as well as the list of background synchronizing tasks.
139    /// The background tasks can be awaited or detached.
140    pub(crate) async fn with_file_proxy<I, G>(
141        iter: I,
142        storage_dir: DirectoryProxy,
143        files_generator: G,
144    ) -> Result<(Self, Vec<Task<()>>), Error>
145    where
146        I: IntoIterator<Item = (&'static str, Option<Box<dyn Any>>)>,
147        G: Fn(&'static str) -> Result<(String, String), Error>,
148    {
149        let mut typed_storage_map = HashMap::new();
150        let iter = iter.into_iter();
151        typed_storage_map.reserve(iter.size_hint().0);
152        let mut typed_loader_map = HashMap::new();
153        let mut sync_tasks = Vec::with_capacity(iter.size_hint().0);
154        for (key, loader) in iter {
155            // Generate a separate file proxy for each key.
156            let (flush_sender, flush_receiver) = futures::channel::mpsc::unbounded::<()>();
157            let (temp_file_path, file_path) =
158                files_generator(key).context("failed to generate file")?;
159
160            let cached_storage = Rc::new(Mutex::new(CachedStorage {
161                current_data: None,
162                temp_file_path,
163                file_path,
164            }));
165            let storage = TypedStorage { flush_sender, cached_storage: Rc::clone(&cached_storage) };
166
167            // Each key has an independent flush queue.
168            let sync_task = Task::local(Self::synchronize_task(
169                Clone::clone(&storage_dir),
170                cached_storage,
171                flush_receiver,
172            ));
173            sync_tasks.push(sync_task);
174            let _ = typed_storage_map.insert(key, storage);
175            if let Some(loader) = loader {
176                let _ = typed_loader_map.insert(key, loader);
177            }
178        }
179        Ok((
180            FidlStorage {
181                caching_enabled: true,
182                debounce_writes: true,
183                typed_storage_map,
184                typed_loader_map,
185                storage_dir,
186            },
187            sync_tasks,
188        ))
189    }
190
191    async fn synchronize_task(
192        storage_dir: DirectoryProxy,
193        cached_storage: Rc<Mutex<CachedStorage>>,
194        flush_receiver: UnboundedReceiver<()>,
195    ) {
196        let mut has_pending_flush = false;
197
198        // The time of the last flush. Initialized to MIN_FLUSH_INTERVAL_MS before the
199        // current time so that the first flush always goes through, no matter the
200        // timing.
201        let mut last_flush: MonotonicInstant = MonotonicInstant::now() - MIN_FLUSH_DURATION;
202
203        // Timer for flush cooldown. OptionFuture allows us to wait on the future even
204        // if it's None.
205        let mut next_flush_timer = pin!(OptionFuture::<Timer>::from(None).fuse());
206        let mut retries = 0;
207        let mut retrying = false;
208
209        let flush_fuse = flush_receiver.fuse();
210
211        futures::pin_mut!(flush_fuse);
212        loop {
213            futures::select! {
214                _ = flush_fuse.select_next_some() => {
215                    // Flush currently unable to complete. Don't prevent exponential
216                    // backoff from occurring.
217                    if retrying {
218                        continue;
219                    }
220
221                    // Received a request to do a flush.
222                    let now = MonotonicInstant::now();
223                    let next_flush_time = if now - last_flush > MIN_FLUSH_DURATION {
224                        // Last flush happened more than MIN_FLUSH_INTERVAL_MS ago,
225                        // flush immediately in next iteration of loop.
226                        now
227                    } else {
228                        // Last flush was less than MIN_FLUSH_INTERVAL_MS ago, schedule
229                        // it accordingly. It's okay if the time is in the past, Timer
230                        // will still trigger on the next loop iteration.
231                        last_flush + MIN_FLUSH_DURATION
232                    };
233
234                    has_pending_flush = true;
235                    next_flush_timer.set(OptionFuture::from(Some(Timer::new(next_flush_time))).fuse());
236                }
237
238                _ = next_flush_timer => {
239                    // Timer triggered, check for pending syncs.
240                    if has_pending_flush {
241                        let mut cached_storage = cached_storage.lock().await;
242
243                        // If the sync fails, exponentionally backoff the syncs until a
244                        // maximum wait time.
245                        if let Err(e) = cached_storage.sync(&storage_dir).await {
246                            retrying = true;
247                            let flush_duration = MonotonicDuration::from_millis(
248                                2_i64.saturating_pow(retries)
249                                    .saturating_mul(MIN_FLUSH_INTERVAL_MS)
250                                    .min(MAX_FLUSH_INTERVAL_MS)
251                            );
252                            let next_flush_time = MonotonicInstant::now() + flush_duration;
253                            log::error!(
254                                "Failed to sync write to disk for {:?}, delaying by {:?}, \
255                                    caused by: {:?}",
256                                cached_storage.file_path,
257                                flush_duration,
258                                e
259                            );
260
261                            // Reset the timer so we can try again in the future
262                            next_flush_timer.set(OptionFuture::from(Some(Timer::new(next_flush_time))).fuse());
263                            retries += 1;
264                            continue;
265                        }
266                        last_flush = MonotonicInstant::now();
267                        has_pending_flush = false;
268                        retrying = false;
269                        retries = 0;
270                    }
271                }
272
273                complete => break,
274            }
275        }
276    }
277
278    #[cfg(test)]
279    // TODO(https://fxbug.dev/42172967) Remove allow once all tests have been migrated to fidl storage.
280    #[allow(dead_code)]
281    fn set_caching_enabled(&mut self, enabled: bool) {
282        self.caching_enabled = enabled;
283    }
284
285    #[cfg(test)]
286    // TODO(https://fxbug.dev/42172967) Remove allow once all tests have been migrated to fidl storage.
287    #[allow(dead_code)]
288    fn set_debounce_writes(&mut self, debounce: bool) {
289        self.debounce_writes = debounce;
290    }
291
292    async fn inner_write(
293        &self,
294        key: &'static str,
295        new_value: Vec<u8>,
296    ) -> Result<UpdateState, Error> {
297        let typed_storage = self
298            .typed_storage_map
299            .get(key)
300            .ok_or_else(|| format_err!("Invalid data keyed by {}", key))?;
301        let mut cached_storage = typed_storage.cached_storage.lock().await;
302        let bytes;
303        let cached_value = match cached_storage.current_data.as_ref() {
304            Some(cached_value) => Some(cached_value),
305            None => {
306                let file_proxy = fuchsia_fs::directory::open_file(
307                    &self.storage_dir,
308                    &cached_storage.file_path,
309                    fuchsia_fs::PERM_READABLE,
310                )
311                .await;
312                bytes = match file_proxy {
313                    Ok(file_proxy) => match fuchsia_fs::file::read(&file_proxy).await {
314                        Ok(bytes) => Some(bytes),
315                        Err(ReadError::Open(OpenError::OpenError(e))) if e == Status::NOT_FOUND => {
316                            None
317                        }
318                        Err(e) => {
319                            bail!("failed to get value from fidl storage for {:?}: {:?}", key, e)
320                        }
321                    },
322                    Err(OpenError::OpenError(Status::NOT_FOUND)) => None,
323                    Err(e) => bail!("unable to read data on disk for {:?}: {:?}", key, e),
324                };
325                bytes.as_ref()
326            }
327        };
328
329        Ok(if cached_value.map(|c| *c != new_value).unwrap_or(true) {
330            cached_storage.current_data = Some(new_value);
331            if !self.debounce_writes {
332                // Not debouncing writes for testing, just sync immediately.
333                cached_storage
334                    .sync(&self.storage_dir)
335                    .await
336                    .with_context(|| format!("Failed to sync data for key {key:?}"))?;
337            } else {
338                typed_storage.flush_sender.unbounded_send(()).with_context(|| {
339                    format!("flush_sender failed to send flush message, associated key is {key}")
340                })?;
341            }
342            UpdateState::Updated
343        } else {
344            UpdateState::Unchanged
345        })
346    }
347
348    /// Write `new_value` to storage. The write will be persisted to disk at a set interval.
349    pub async fn write<T>(&self, new_value: T) -> Result<UpdateState, Error>
350    where
351        T: FidlStorageConvertible,
352        <T as FidlStorageConvertible>::Storable: Persistable,
353    {
354        let new_value = persist(&new_value.to_storable())?;
355        self.inner_write(T::KEY, new_value).await
356    }
357
358    async fn get_inner(&self, key: &'static str) -> MutexGuard<'_, CachedStorage> {
359        let typed_storage = self
360            .typed_storage_map
361            .get(key)
362            // TODO(https://fxbug.dev/42064613) Replace this with an error result.
363            .unwrap_or_else(|| panic!("Invalid data keyed by {key}"));
364        let mut cached_storage = typed_storage.cached_storage.lock().await;
365        if cached_storage.current_data.is_none() || !self.caching_enabled {
366            if let Some(file_proxy) = match fuchsia_fs::directory::open_file(
367                &self.storage_dir,
368                &cached_storage.file_path,
369                fuchsia_fs::PERM_READABLE,
370            )
371            .await
372            {
373                Ok(file_proxy) => Some(file_proxy),
374                Err(OpenError::OpenError(Status::NOT_FOUND)) => None,
375                // TODO(https://fxbug.dev/42064613) Replace this with an error result.
376                Err(e) => panic!("failed to open file for {key:?}: {e:?}"),
377            } {
378                let data = match fuchsia_fs::file::read(&file_proxy).await {
379                    Ok(data) => Some(data),
380                    Err(ReadError::ReadError(Status::NOT_FOUND)) => None,
381                    // TODO(https://fxbug.dev/42064613) Replace this with an error result.
382                    Err(e) => panic!("failed to get fidl data from disk for {key:?}: {e:?}"),
383                };
384
385                cached_storage.current_data = data;
386            }
387        }
388
389        cached_storage
390    }
391
392    /// Gets the latest value cached locally, or loads the value from storage.
393    /// Doesn't support multiple concurrent callers of the same struct.
394    pub async fn get<T>(&self) -> T
395    where
396        T: FidlStorageConvertible,
397        T::Storable: Persistable,
398        T::Loader: DefaultDispatcher<T>,
399    {
400        match self.get_inner(T::KEY).await.current_data.as_ref().map(|data| {
401            T::from_storable(
402                unpersist(data).expect("Should not be able to save mismatching types in file"),
403            )
404        }) {
405            Some(data) => data,
406            None => <T::Loader as DefaultDispatcher<T>>::get_default(self),
407        }
408    }
409}
410
411pub trait DefaultDispatcher<T>: Sealed {
412    fn get_default(_: &FidlStorage) -> T;
413}
414
415impl<T> DefaultDispatcher<T> for NoneT
416where
417    T: Default,
418{
419    fn get_default(_: &FidlStorage) -> T {
420        T::default()
421    }
422}
423
424impl<T, L> DefaultDispatcher<T> for L
425where
426    T: FidlStorageConvertible<Loader = L>,
427    L: DefaultLoader<Result = T> + 'static,
428{
429    fn get_default(storage: &FidlStorage) -> T {
430        match storage.typed_loader_map.get(T::KEY) {
431            Some(loader) => match loader.downcast_ref::<T::Loader>() {
432                Some(loader) => loader.default_value(),
433                None => {
434                    panic!("Mismatch key and loader for key {}", T::KEY);
435                }
436            },
437            None => panic!("Missing loader for {}", T::KEY),
438        }
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445    use assert_matches::assert_matches;
446    use fasync::TestExecutor;
447    use fidl::endpoints::ControlHandle;
448    use fidl::epitaph::ChannelEpitaphExt;
449    use fidl_test_storage::{TestStruct, WrongStruct};
450    use futures::TryStreamExt;
451    use std::sync::Arc;
452    use std::task::Poll;
453    use test_case::test_case;
454    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
455
456    const VALUE0: i32 = 3;
457    const VALUE1: i32 = 33;
458    const VALUE2: i32 = 128;
459
460    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
461    struct LibTestStruct {
462        value: i32,
463    }
464
465    impl FidlStorageConvertible for LibTestStruct {
466        type Storable = TestStruct;
467        type Loader = NoneT;
468        const KEY: &'static str = "testkey";
469
470        fn to_storable(self) -> Self::Storable {
471            TestStruct { value: self.value }
472        }
473
474        fn from_storable(storable: Self::Storable) -> Self {
475            Self { value: storable.value }
476        }
477    }
478
479    impl Default for LibTestStruct {
480        fn default() -> Self {
481            Self { value: VALUE0 }
482        }
483    }
484
485    fn open_tempdir(tempdir: &tempfile::TempDir) -> fio::DirectoryProxy {
486        fuchsia_fs::directory::open_in_namespace(
487            tempdir.path().to_str().expect("tempdir path is not valid UTF-8"),
488            fuchsia_fs::PERM_READABLE | fuchsia_fs::PERM_WRITABLE,
489        )
490        .expect("failed to open connection to tempdir")
491    }
492
493    #[fuchsia::test]
494    async fn test_get() {
495        let value_to_get = LibTestStruct { value: VALUE1 };
496        let tempdir = tempfile::tempdir().expect("failed to create tempdir");
497        let content = persist(&value_to_get.to_storable()).unwrap();
498        std::fs::write(tempdir.path().join("xyz.pfidl"), content).expect("failed to write file");
499        let storage_dir = open_tempdir(&tempdir);
500
501        let (storage, sync_tasks) = FidlStorage::with_file_proxy(
502            vec![(LibTestStruct::KEY, None)],
503            storage_dir,
504            move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
505        )
506        .await
507        .expect("should be able to generate file");
508        for task in sync_tasks {
509            task.detach();
510        }
511        let result = storage.get::<LibTestStruct>().await;
512
513        assert_eq!(result.value, VALUE1);
514    }
515
516    #[fuchsia::test]
517    async fn test_get_default() {
518        let tempdir = tempfile::tempdir().expect("failed to create tempdir");
519        let storage_dir = open_tempdir(&tempdir);
520
521        let (storage, sync_tasks) = FidlStorage::with_file_proxy(
522            vec![(LibTestStruct::KEY, None)],
523            storage_dir,
524            move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
525        )
526        .await
527        .expect("file proxy should be created");
528        for task in sync_tasks {
529            task.detach();
530        }
531        let result = storage.get::<LibTestStruct>().await;
532
533        assert_eq!(result.value, VALUE0);
534    }
535
536    /// Proxies directory request to a real directory while allowing for some of the requests to be
537    /// intercepted.
538    struct DirectoryInterceptor {
539        real_dir: fio::DirectoryProxy,
540        inner: std::sync::Mutex<DirectoryInterceptorInner>,
541    }
542
543    struct DirectoryInterceptorInner {
544        sync_notifier: Option<futures::channel::mpsc::UnboundedSender<()>>,
545        #[allow(clippy::type_complexity)]
546        open_interceptor: Box<dyn Fn(&str, bool) -> Option<Status>>,
547    }
548
549    impl DirectoryInterceptor {
550        // TODO(b/356474618): re-enable and fix existing occurance
551        #[allow(clippy::arc_with_non_send_sync)]
552        fn new(real_dir: fio::DirectoryProxy) -> (Arc<Self>, fio::DirectoryProxy) {
553            let (proxy, requests) =
554                fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>();
555            let this = Arc::new(Self {
556                real_dir,
557                inner: std::sync::Mutex::new(DirectoryInterceptorInner {
558                    sync_notifier: None,
559                    open_interceptor: Box::new(|_, _| None),
560                }),
561            });
562            fasync::Task::local(this.clone().run(requests)).detach();
563            (this.clone(), proxy)
564        }
565
566        /// Returns a receiver that will be notified after each Sync request to the real directory
567        /// has completed.
568        fn install_sync_notifier(&self) -> futures::channel::mpsc::UnboundedReceiver<()> {
569            let (sender, receiver) = futures::channel::mpsc::unbounded();
570            self.inner.lock().unwrap().sync_notifier = Some(sender);
571            receiver
572        }
573
574        /// Sets a callback to be called on every Open request. If the callback returns an error,
575        /// then the request will be failed with that error instead of being forwarded to the real
576        /// directory.
577        #[allow(clippy::type_complexity)]
578        fn set_open_interceptor(&self, interceptor: Box<dyn Fn(&str, bool) -> Option<Status>>) {
579            self.inner.lock().unwrap().open_interceptor = interceptor;
580        }
581
582        async fn run(self: Arc<Self>, mut requests: fio::DirectoryRequestStream) {
583            while let Ok(Some(request)) = requests.try_next().await {
584                match request {
585                    fio::DirectoryRequest::DeprecatedOpen {
586                        flags,
587                        mode,
588                        path,
589                        object,
590                        control_handle: _,
591                    } => {
592                        match (self.inner.lock().unwrap().open_interceptor)(
593                            &path,
594                            flags.intersects(fio::OpenFlags::CREATE),
595                        ) {
596                            Some(status) => {
597                                let (_, control_handle) = object.into_stream_and_control_handle();
598                                control_handle
599                                    .send_on_open_(status.into_raw(), None)
600                                    .expect("failed to send OnOpen event");
601                                control_handle.shutdown_with_epitaph(status);
602                            }
603                            None => {
604                                self.real_dir
605                                    .deprecated_open(flags, mode, &path, object)
606                                    .expect("failed to forward Open request");
607                            }
608                        }
609                    }
610                    fio::DirectoryRequest::Open {
611                        path,
612                        flags,
613                        options,
614                        object,
615                        control_handle: _,
616                    } => {
617                        let create = flags.intersects(fio::Flags::FLAG_MUST_CREATE);
618                        match (self.inner.lock().unwrap().open_interceptor)(&path, create) {
619                            Some(status) => {
620                                object.close_with_epitaph(status).expect("failed to send epitaph");
621                            }
622                            None => {
623                                self.real_dir
624                                    .open(&path, flags, &options, object)
625                                    .expect("failed to forward Open3 request");
626                            }
627                        }
628                    }
629                    fio::DirectoryRequest::Sync { responder } => {
630                        let response =
631                            self.real_dir.sync().await.expect("failed to forward Sync request");
632                        responder.send(response).expect("failed to respond to Sync request");
633                        if let Some(sender) = &self.inner.lock().unwrap().sync_notifier {
634                            sender.unbounded_send(()).unwrap();
635                        }
636                    }
637                    fio::DirectoryRequest::Rename { src, dst_parent_token, dst, responder } => {
638                        let response = self
639                            .real_dir
640                            .rename(&src, dst_parent_token, &dst)
641                            .await
642                            .expect("failed to forward Rename request");
643                        responder.send(response).expect("failed to respond to Rename request");
644                    }
645                    fio::DirectoryRequest::GetToken { responder } => {
646                        let response = self
647                            .real_dir
648                            .get_token()
649                            .await
650                            .expect("failed to forward GetToken request");
651                        responder
652                            .send(response.0, response.1)
653                            .expect("failed to respond to GetToken request");
654                    }
655                    request => unimplemented!("request: {:?}", request),
656                }
657            }
658        }
659    }
660
661    /// Repeatedly polls `fut` until it returns `Poll::Ready`. When using a `TestExecutor` with fake
662    /// time, only `run_until_stalled` can be used but `run_until_stalled` is incompatible with
663    /// external filesystems. This function bridges the gap by continuously polling the future until
664    /// the filesystem responds.
665    fn run_until_ready<F>(executor: &mut TestExecutor, fut: F) -> F::Output
666    where
667        F: std::future::Future,
668    {
669        let mut fut = std::pin::pin!(fut);
670        loop {
671            match executor.run_until_stalled(&mut fut) {
672                Poll::Ready(result) => return result,
673                Poll::Pending => std::thread::yield_now(),
674            }
675        }
676    }
677
678    /// Asserts that a file doesn't exist.
679    fn assert_file_not_found(
680        executor: &mut TestExecutor,
681        directory: &fio::DirectoryProxy,
682        file_name: &str,
683    ) {
684        let open_fut =
685            fuchsia_fs::directory::open_file(directory, file_name, fuchsia_fs::PERM_READABLE);
686        let result = run_until_ready(executor, open_fut);
687        assert_matches!(result, Result::Err(e) if e.is_not_found_error());
688    }
689
690    /// Verifies the contents of a file.
691    fn assert_file_contents(
692        executor: &mut TestExecutor,
693        directory: &fio::DirectoryProxy,
694        file_name: &str,
695        expected_contents: TestStruct,
696    ) {
697        let read_fut = fuchsia_fs::directory::read_file(directory, file_name);
698        let data = run_until_ready(executor, read_fut).expect("reading file");
699        let data = fidl::unpersist::<TestStruct>(&data).expect("failed to read file as TestStruct");
700        assert_eq!(data, expected_contents);
701    }
702
703    #[fuchsia::test]
704    fn test_first_write_syncs_immediately() {
705        let written_value = VALUE1;
706        let mut executor = TestExecutor::new_with_fake_time();
707        executor.set_fake_time(MonotonicInstant::from_nanos(0));
708
709        let tempdir = tempfile::tempdir().expect("failed to create tempdir");
710        let storage_dir = open_tempdir(&tempdir);
711        let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
712        let mut sync_receiver = interceptor.install_sync_notifier();
713
714        let storage_fut = FidlStorage::with_file_proxy(
715            vec![(LibTestStruct::KEY, None)],
716            Clone::clone(&storage_dir),
717            move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
718        );
719        futures::pin_mut!(storage_fut);
720
721        let (storage, _sync_tasks) =
722            if let Poll::Ready(storage) = executor.run_until_stalled(&mut storage_fut) {
723                storage.expect("file proxy should be created")
724            } else {
725                panic!("storage creation stalled");
726            };
727
728        // Write to device storage.
729        let value_to_write = LibTestStruct { value: written_value };
730        let write_future = storage.write(value_to_write);
731        futures::pin_mut!(write_future);
732
733        // Initial cache check is done if no read was ever performed.
734        assert_matches!(
735            run_until_ready(&mut executor, &mut write_future),
736            Result::Ok(UpdateState::Updated)
737        );
738
739        // Storage is not yet ready.
740        assert_file_not_found(&mut executor, &storage_dir, "xyz.pfidl");
741
742        // Wait for the sync task to complete.
743        run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
744
745        // Validate the value matches what was set.
746        assert_file_contents(
747            &mut executor,
748            &storage_dir,
749            "xyz.pfidl",
750            value_to_write.to_storable(),
751        );
752    }
753
754    #[fuchsia::test]
755    fn test_second_write_syncs_after_interval() {
756        let written_value = VALUE1;
757        let second_value = VALUE2;
758        let mut executor = TestExecutor::new_with_fake_time();
759        executor.set_fake_time(MonotonicInstant::from_nanos(0));
760
761        let tempdir = tempfile::tempdir().expect("failed to create tempdir");
762        let storage_dir = open_tempdir(&tempdir);
763        let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
764        let mut sync_receiver = interceptor.install_sync_notifier();
765
766        let storage_fut = FidlStorage::with_file_proxy(
767            vec![(LibTestStruct::KEY, None)],
768            Clone::clone(&storage_dir),
769            move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
770        );
771        futures::pin_mut!(storage_fut);
772
773        let (storage, _sync_tasks) =
774            if let Poll::Ready(storage) = executor.run_until_stalled(&mut storage_fut) {
775                storage.expect("file proxy should be created")
776            } else {
777                panic!("storage creation stalled");
778            };
779
780        // Write to device storage.
781        let value_to_write = LibTestStruct { value: written_value };
782        let write_future = storage.write(value_to_write);
783        futures::pin_mut!(write_future);
784
785        // Initial cache check is done if no read was ever performed.
786        assert_matches!(
787            run_until_ready(&mut executor, &mut write_future),
788            Result::Ok(UpdateState::Updated)
789        );
790
791        // Storage is not yet ready.
792        assert_file_not_found(&mut executor, &storage_dir, "xyz.pfidl");
793
794        // Wait for the sync task to complete.
795        run_until_ready(&mut executor, &mut sync_receiver.next()).expect("directory never synced");
796
797        // Validate that the file has been synced.
798        assert_file_contents(
799            &mut executor,
800            &storage_dir,
801            "xyz.pfidl",
802            value_to_write.to_storable(),
803        );
804
805        // Write second time to device storage.
806        let value_to_write2 = LibTestStruct { value: second_value };
807        let write_future = storage.write(value_to_write2);
808        futures::pin_mut!(write_future);
809
810        // Initial cache check is done if no read was ever performed.
811        assert_matches!(
812            run_until_ready(&mut executor, &mut write_future),
813            Result::Ok(UpdateState::Updated)
814        );
815
816        // Storage is not yet ready, should still equal old value.
817        assert_file_contents(
818            &mut executor,
819            &storage_dir,
820            "xyz.pfidl",
821            value_to_write.to_storable(),
822        );
823
824        // Move executor to just before sync interval.
825        executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000 - 1));
826        assert!(!executor.wake_expired_timers());
827
828        // Move executor to just after sync interval. It should run now.
829        executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000));
830        run_until_ready(&mut executor, &mut sync_receiver.next()).expect("directory never synced");
831
832        // Validate that the file has been synced.
833
834        assert_file_contents(
835            &mut executor,
836            &storage_dir,
837            "xyz.pfidl",
838            value_to_write2.to_storable(),
839        );
840    }
841
842    #[derive(Copy, Clone, Default, Debug)]
843    struct LibWrongStruct;
844
845    impl FidlStorageConvertible for LibWrongStruct {
846        type Storable = WrongStruct;
847        type Loader = NoneT;
848        const KEY: &'static str = "WRONG_STRUCT";
849
850        fn to_storable(self) -> Self::Storable {
851            WrongStruct
852        }
853
854        fn from_storable(_: Self::Storable) -> Self {
855            LibWrongStruct
856        }
857    }
858
859    // Test that attempting to write two kinds of structs to a storage instance that only supports
860    // one results in a failure.
861    #[fuchsia::test]
862    async fn test_write_with_mismatch_type_returns_error() {
863        let tempdir = tempfile::tempdir().expect("failed to create tempdir");
864        let storage_dir = open_tempdir(&tempdir);
865
866        let (storage, sync_tasks) = FidlStorage::with_file_proxy(
867            vec![(LibTestStruct::KEY, None)],
868            storage_dir,
869            move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
870        )
871        .await
872        .expect("file proxy should be created");
873        for task in sync_tasks {
874            task.detach();
875        }
876
877        // Write successfully to storage once.
878        let result = storage.write(LibTestStruct { value: VALUE2 }).await;
879        assert!(result.is_ok());
880
881        // Write to device storage again with a different type to validate that the type can't
882        // be changed.
883        let result = storage.write(LibWrongStruct).await;
884        assert_matches!(result, Err(e) if e.to_string() == "Invalid data keyed by WRONG_STRUCT");
885    }
886
887    // Test that multiple writes to FidlStorage will cause a write each time, but will only
888    // sync to the fs at an interval.
889    #[fuchsia::test]
890    fn test_multiple_write_debounce() {
891        // Custom executor for this test so that we can advance the clock arbitrarily and verify the
892        // state of the executor at any given point.
893        let mut executor = TestExecutor::new_with_fake_time();
894        executor.set_fake_time(MonotonicInstant::from_nanos(0));
895
896        let tempdir = tempfile::tempdir().expect("failed to create tempdir");
897        let storage_dir = open_tempdir(&tempdir);
898        let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
899        let mut sync_receiver = interceptor.install_sync_notifier();
900
901        let storage_fut = FidlStorage::with_file_proxy(
902            vec![(LibTestStruct::KEY, None)],
903            Clone::clone(&storage_dir),
904            move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
905        );
906        let (storage, _sync_tasks) =
907            run_until_ready(&mut executor, storage_fut).expect("file proxy should be created");
908
909        let first_value = VALUE1;
910        let second_value = VALUE2;
911        let third_value = VALUE0;
912
913        // First write finishes immediately.
914        let value_to_write = LibTestStruct { value: first_value };
915        // Initial cache check is done if no read was ever performed.
916        let result = run_until_ready(&mut executor, storage.write(value_to_write));
917        assert_matches!(result, Result::Ok(UpdateState::Updated));
918
919        // Storage is not yet ready.
920        assert_file_not_found(&mut executor, &storage_dir, "xyz.pfidl");
921
922        // Wake the initial time without advancing the clock. Confirms that the first write is
923        // "immediate".
924        run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
925
926        // Validate that the file has been synced.
927        assert_file_contents(
928            &mut executor,
929            &storage_dir,
930            "xyz.pfidl",
931            value_to_write.to_storable(),
932        );
933
934        // Write second time to device storage.
935        let value_to_write2 = LibTestStruct { value: second_value };
936        let result = run_until_ready(&mut executor, storage.write(value_to_write2));
937        // Value is marked as updated after the write.
938        assert_matches!(result, Result::Ok(UpdateState::Updated));
939
940        // Validate the updated values are still returned from the storage cache.
941        let data = run_until_ready(&mut executor, storage.get::<LibTestStruct>());
942        assert_eq!(data, value_to_write2);
943
944        // But the data has not been persisted to disk.
945        assert_file_contents(
946            &mut executor,
947            &storage_dir,
948            "xyz.pfidl",
949            value_to_write.to_storable(),
950        );
951
952        // Now write a third time before advancing the clock.
953        let value_to_write3 = LibTestStruct { value: third_value };
954        let result = run_until_ready(&mut executor, storage.write(value_to_write3));
955        // Value is marked as updated after the write.
956        assert_matches!(result, Result::Ok(UpdateState::Updated));
957
958        // Validate the updated values are still returned from the storage cache.
959
960        let data = run_until_ready(&mut executor, storage.get::<LibTestStruct>());
961        assert_eq!(data, value_to_write3);
962
963        // But the data has still not been persisted to disk.
964        assert_file_contents(
965            &mut executor,
966            &storage_dir,
967            "xyz.pfidl",
968            value_to_write.to_storable(),
969        );
970
971        // Move clock to just before sync interval.
972        executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000 - 1));
973        assert!(!executor.wake_expired_timers());
974
975        // And validate that the data has still not been synced to disk.
976        assert_file_contents(
977            &mut executor,
978            &storage_dir,
979            "xyz.pfidl",
980            value_to_write.to_storable(),
981        );
982
983        // Move executor to just after sync interval.
984        executor.set_fake_time(MonotonicInstant::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000));
985        run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
986
987        // Validate that the file has finally been synced.
988        assert_file_contents(
989            &mut executor,
990            &storage_dir,
991            "xyz.pfidl",
992            value_to_write3.to_storable(),
993        );
994    }
995
996    // Tests that syncing can recover after a failed write. The test cases list the number of failed
997    // attempts and the maximum amount of time waited from the previous write.
998    #[allow(clippy::unused_unit)]
999    #[test_case(1, 500)]
1000    #[test_case(2, 1_000)]
1001    #[test_case(3, 2_000)]
1002    #[test_case(4, 4_000)]
1003    #[test_case(5, 8_000)]
1004    #[test_case(6, 16_000)]
1005    #[test_case(7, 32_000)]
1006    #[test_case(8, 64_000)]
1007    #[test_case(9, 128_000)]
1008    #[test_case(10, 256_000)]
1009    #[test_case(11, 512_000)]
1010    #[test_case(12, 1_024_000)]
1011    #[test_case(13, 1_800_000)]
1012    #[test_case(14, 1_800_000)]
1013    fn test_exponential_backoff(retry_count: usize, max_wait_time: usize) {
1014        let mut executor = TestExecutor::new_with_fake_time();
1015        executor.set_fake_time(MonotonicInstant::from_nanos(0));
1016
1017        let tempdir = tempfile::tempdir().expect("failed to create tempdir");
1018        let storage_dir = open_tempdir(&tempdir);
1019        let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
1020        let attempts = std::sync::Mutex::new(0);
1021        interceptor.set_open_interceptor(Box::new(move |path, create| {
1022            let mut attempts_guard = attempts.lock().unwrap();
1023            if path == "abc_tmp.pfidl" && create && *attempts_guard < retry_count {
1024                *attempts_guard += 1;
1025                Some(Status::NO_SPACE)
1026            } else {
1027                None
1028            }
1029        }));
1030        let mut sync_receiver = interceptor.install_sync_notifier();
1031
1032        let expected_data = vec![1];
1033        let cached_storage = Rc::new(Mutex::new(CachedStorage {
1034            current_data: Some(expected_data.clone()),
1035            temp_file_path: "abc_tmp.pfidl".to_owned(),
1036            file_path: "abc.pfidl".to_owned(),
1037        }));
1038
1039        let (sender, receiver) = futures::channel::mpsc::unbounded();
1040
1041        // Call spawn in a future since we have to be in an executor context to call spawn.
1042        let task = fasync::Task::local(FidlStorage::synchronize_task(
1043            Clone::clone(&storage_dir),
1044            Rc::clone(&cached_storage),
1045            receiver,
1046        ));
1047        futures::pin_mut!(task);
1048
1049        executor.set_fake_time(MonotonicInstant::from_nanos(0));
1050        sender.unbounded_send(()).expect("can send flush signal");
1051        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1052
1053        let mut clock_nanos = 0;
1054        // (2^i) * 500 = exponential backoff.
1055        // 1,000,000 = convert ms to ns.
1056        for new_duration in (0..retry_count).map(|i| {
1057            (2_i64.pow(i as u32) * MIN_FLUSH_INTERVAL_MS).min(max_wait_time as i64) * 1_000_000
1058                - (i == retry_count - 1) as i64
1059        }) {
1060            executor.set_fake_time(MonotonicInstant::from_nanos(clock_nanos));
1061            // Task should not complete while retrying.
1062            assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1063
1064            // Check that files don't exist.
1065            assert_file_not_found(&mut executor, &storage_dir, "abc_tmp.pfidl");
1066            assert_file_not_found(&mut executor, &storage_dir, "abc.pfidl");
1067
1068            clock_nanos += new_duration;
1069        }
1070
1071        executor.set_fake_time(MonotonicInstant::from_nanos(clock_nanos));
1072        // At this point the clock should be 1ns before the timer, so it shouldn't wake.
1073        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1074
1075        // Check that files don't exist.
1076        assert_file_not_found(&mut executor, &storage_dir, "abc_tmp.pfidl");
1077        assert_file_not_found(&mut executor, &storage_dir, "abc.pfidl");
1078
1079        // Now pass the timer where we can read the result.
1080        clock_nanos += 1;
1081        executor.set_fake_time(MonotonicInstant::from_nanos(clock_nanos));
1082        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1083        run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
1084
1085        // Check that the file now matches what was in the cache.
1086        let read_fut = fuchsia_fs::directory::read_file(&storage_dir, "abc.pfidl");
1087        let data = run_until_ready(&mut executor, read_fut).expect("reading file");
1088        assert_eq!(data, expected_data);
1089
1090        drop(sender);
1091        // Ensure the task can properly exit.
1092        run_until_ready(&mut executor, task);
1093    }
1094}