use crate::private::Sealed;
use crate::storage_factory::{DefaultLoader, NoneT};
use crate::UpdateState;
use anyhow::{bail, format_err, Context, Error};
use fidl::{persist, unpersist, Persistable, Status};
use fidl_fuchsia_io::DirectoryProxy;
use fuchsia_async::{Task, Time, Timer};
use fuchsia_fs::file::ReadError;
use fuchsia_fs::node::OpenError;
use fuchsia_fs::OpenFlags;
use fuchsia_zircon as zx;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::future::OptionFuture;
use futures::lock::{Mutex, MutexGuard};
use futures::{FutureExt, StreamExt};
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use zx::Duration;
const MIN_FLUSH_INTERVAL_MS: i64 = 500;
const MAX_FLUSH_INTERVAL_MS: i64 = 1_800_000; const MIN_FLUSH_DURATION: Duration = Duration::from_millis(MIN_FLUSH_INTERVAL_MS);
pub trait FidlStorageConvertible {
type Storable: Persistable + Any;
type Loader: DefaultDispatcher<Self>
where
Self: Sized;
const KEY: &'static str;
fn to_storable(self) -> Self::Storable;
fn from_storable(storable: Self::Storable) -> Self;
}
pub struct FidlStorage {
typed_storage_map: HashMap<&'static str, TypedStorage>,
typed_loader_map: HashMap<&'static str, Box<dyn Any + Send + Sync + 'static>>,
caching_enabled: bool,
debounce_writes: bool,
storage_dir: DirectoryProxy,
}
struct TypedStorage {
flush_sender: UnboundedSender<()>,
cached_storage: Arc<Mutex<CachedStorage>>,
}
struct CachedStorage {
current_data: Option<Vec<u8>>,
temp_file_path: String,
file_path: String,
}
impl CachedStorage {
async fn sync(&mut self, storage_dir: &DirectoryProxy) -> Result<(), Error> {
{
let file_proxy = fuchsia_fs::directory::open_file_deprecated(
storage_dir,
&self.temp_file_path,
OpenFlags::CREATE
| OpenFlags::TRUNCATE
| OpenFlags::RIGHT_READABLE
| OpenFlags::RIGHT_WRITABLE,
)
.await
.with_context(|| format!("unable to open {:?} for writing", self.temp_file_path))?;
fuchsia_fs::file::write(&file_proxy, self.current_data.as_ref().unwrap())
.await
.context("failed to write data to file")?;
file_proxy
.close()
.await
.context("failed to call close on temp file")?
.map_err(zx::Status::from_raw)?;
}
fuchsia_fs::directory::rename(storage_dir, &self.temp_file_path, &self.file_path)
.await
.context("failed to rename temp file to permanent file")?;
storage_dir
.sync()
.await
.context("failed to call sync on directory after rename")?
.map_err(zx::Status::from_raw)
.or_else(|e| if let zx::Status::NOT_SUPPORTED = e { Ok(()) } else { Err(e) })
.context("failed to sync rename to directory")
}
}
impl FidlStorage {
pub(crate) async fn with_file_proxy<I, G>(
iter: I,
storage_dir: DirectoryProxy,
files_generator: G,
) -> Result<(Self, Vec<Task<()>>), Error>
where
I: IntoIterator<Item = (&'static str, Option<Box<dyn Any + Send + Sync>>)>,
G: Fn(&'static str) -> Result<(String, String), Error>,
{
let mut typed_storage_map = HashMap::new();
let iter = iter.into_iter();
typed_storage_map.reserve(iter.size_hint().0);
let mut typed_loader_map = HashMap::new();
let mut sync_tasks = Vec::with_capacity(iter.size_hint().0);
for (key, loader) in iter {
let (flush_sender, flush_receiver) = futures::channel::mpsc::unbounded::<()>();
let (temp_file_path, file_path) =
files_generator(key).context("failed to generate file")?;
let cached_storage = Arc::new(Mutex::new(CachedStorage {
current_data: None,
temp_file_path,
file_path,
}));
let storage =
TypedStorage { flush_sender, cached_storage: Arc::clone(&cached_storage) };
let sync_task = Task::spawn(Self::synchronize_task(
Clone::clone(&storage_dir),
cached_storage,
flush_receiver,
));
sync_tasks.push(sync_task);
let _ = typed_storage_map.insert(key, storage);
if let Some(loader) = loader {
let _ = typed_loader_map.insert(key, loader);
}
}
Ok((
FidlStorage {
caching_enabled: true,
debounce_writes: true,
typed_storage_map,
typed_loader_map,
storage_dir,
},
sync_tasks,
))
}
async fn synchronize_task(
storage_dir: DirectoryProxy,
cached_storage: Arc<Mutex<CachedStorage>>,
flush_receiver: UnboundedReceiver<()>,
) {
let mut has_pending_flush = false;
let mut last_flush: Time = Time::now() - MIN_FLUSH_DURATION;
let mut next_flush_timer: OptionFuture<Timer> = None.into();
let mut next_flush_timer_fuse = next_flush_timer.fuse();
let mut retries = 0;
let mut retrying = false;
let flush_fuse = flush_receiver.fuse();
futures::pin_mut!(flush_fuse);
loop {
futures::select! {
_ = flush_fuse.select_next_some() => {
if retrying {
continue;
}
let now = Time::now();
let next_flush_time = if now - last_flush > MIN_FLUSH_DURATION {
now
} else {
last_flush + MIN_FLUSH_DURATION
};
has_pending_flush = true;
next_flush_timer = Some(Timer::new(next_flush_time)).into();
next_flush_timer_fuse = next_flush_timer.fuse();
}
_ = next_flush_timer_fuse => {
if has_pending_flush {
let mut cached_storage = cached_storage.lock().await;
if let Err(e) = cached_storage.sync(&storage_dir).await {
retrying = true;
let flush_duration = Duration::from_millis(
2_i64.saturating_pow(retries)
.saturating_mul(MIN_FLUSH_INTERVAL_MS)
.min(MAX_FLUSH_INTERVAL_MS)
);
let next_flush_time = Time::now() + flush_duration;
tracing::error!(
"Failed to sync write to disk for {:?}, delaying by {:?}, \
caused by: {:?}",
cached_storage.file_path,
flush_duration,
e
);
next_flush_timer = Some(Timer::new(next_flush_time)).into();
next_flush_timer_fuse = next_flush_timer.fuse();
retries += 1;
continue;
}
last_flush = Time::now();
has_pending_flush = false;
retrying = false;
retries = 0;
}
}
complete => break,
}
}
}
#[cfg(test)]
#[allow(dead_code)]
fn set_caching_enabled(&mut self, enabled: bool) {
self.caching_enabled = enabled;
}
#[cfg(test)]
#[allow(dead_code)]
fn set_debounce_writes(&mut self, debounce: bool) {
self.debounce_writes = debounce;
}
async fn inner_write(
&self,
key: &'static str,
new_value: Vec<u8>,
) -> Result<UpdateState, Error> {
let typed_storage = self
.typed_storage_map
.get(key)
.ok_or_else(|| format_err!("Invalid data keyed by {}", key))?;
let mut cached_storage = typed_storage.cached_storage.lock().await;
let bytes;
let cached_value = match cached_storage.current_data.as_ref() {
Some(cached_value) => Some(cached_value),
None => {
let file_proxy = fuchsia_fs::directory::open_file_deprecated(
&self.storage_dir,
&cached_storage.file_path,
OpenFlags::RIGHT_READABLE,
)
.await;
bytes = match file_proxy {
Ok(file_proxy) => match fuchsia_fs::file::read(&file_proxy).await {
Ok(bytes) => Some(bytes),
Err(ReadError::Open(OpenError::OpenError(e))) if e == Status::NOT_FOUND => {
None
}
Err(e) => {
bail!("failed to get value from fidl storage for {:?}: {:?}", key, e)
}
},
Err(OpenError::OpenError(Status::NOT_FOUND)) => None,
Err(e) => bail!("unable to read data on disk for {:?}: {:?}", key, e),
};
bytes.as_ref()
}
};
Ok(if cached_value.map(|c| *c != new_value).unwrap_or(true) {
cached_storage.current_data = Some(new_value);
if !self.debounce_writes {
cached_storage
.sync(&self.storage_dir)
.await
.with_context(|| format!("Failed to sync data for key {key:?}"))?;
} else {
typed_storage.flush_sender.unbounded_send(()).with_context(|| {
format!("flush_sender failed to send flush message, associated key is {key}")
})?;
}
UpdateState::Updated
} else {
UpdateState::Unchanged
})
}
pub async fn write<T>(&self, new_value: T) -> Result<UpdateState, Error>
where
T: FidlStorageConvertible,
{
let new_value = persist(&new_value.to_storable())?;
self.inner_write(T::KEY, new_value).await
}
async fn get_inner(&self, key: &'static str) -> MutexGuard<'_, CachedStorage> {
let typed_storage = self
.typed_storage_map
.get(key)
.unwrap_or_else(|| panic!("Invalid data keyed by {key}"));
let mut cached_storage = typed_storage.cached_storage.lock().await;
if cached_storage.current_data.is_none() || !self.caching_enabled {
if let Some(file_proxy) = match fuchsia_fs::directory::open_file_deprecated(
&self.storage_dir,
&cached_storage.file_path,
OpenFlags::RIGHT_READABLE,
)
.await
{
Ok(file_proxy) => Some(file_proxy),
Err(OpenError::OpenError(Status::NOT_FOUND)) => None,
Err(e) => panic!("failed to open file for {key:?}: {e:?}"),
} {
let data = match fuchsia_fs::file::read(&file_proxy).await {
Ok(data) => Some(data),
Err(ReadError::ReadError(Status::NOT_FOUND)) => None,
Err(e) => panic!("failed to get fidl data from disk for {key:?}: {e:?}"),
};
cached_storage.current_data = data;
}
}
cached_storage
}
pub async fn get<T>(&self) -> T
where
T: FidlStorageConvertible,
{
match self.get_inner(T::KEY).await.current_data.as_ref().map(|data| {
<T as FidlStorageConvertible>::from_storable(
unpersist(data).expect("Should not be able to save mismatching types in file"),
)
}) {
Some(data) => data,
None => <T::Loader as DefaultDispatcher<T>>::get_default(self),
}
}
}
pub trait DefaultDispatcher<T>: Sealed
where
T: FidlStorageConvertible,
{
fn get_default(_: &FidlStorage) -> T;
}
impl<T> DefaultDispatcher<T> for NoneT
where
T: FidlStorageConvertible<Loader = Self>,
T: Default,
{
fn get_default(_: &FidlStorage) -> T {
T::default()
}
}
impl<T, L> DefaultDispatcher<T> for L
where
T: FidlStorageConvertible<Loader = L>,
L: DefaultLoader<Result = T> + 'static,
{
fn get_default(storage: &FidlStorage) -> T {
match storage.typed_loader_map.get(T::KEY) {
Some(loader) => match loader.downcast_ref::<T::Loader>() {
Some(loader) => loader.default_value(),
None => {
panic!("Mismatch key and loader for key {}", T::KEY);
}
},
None => panic!("Missing loader for {}", T::KEY),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use fasync::TestExecutor;
use fidl::endpoints::ControlHandle;
use fidl::epitaph::ChannelEpitaphExt;
use fidl_test_storage::{TestStruct, WrongStruct};
use futures::TryStreamExt;
use std::task::Poll;
use test_case::test_case;
use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
const VALUE0: i32 = 3;
const VALUE1: i32 = 33;
const VALUE2: i32 = 128;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
struct LibTestStruct {
value: i32,
}
impl FidlStorageConvertible for LibTestStruct {
type Storable = TestStruct;
type Loader = NoneT;
const KEY: &'static str = "testkey";
fn to_storable(self) -> Self::Storable {
TestStruct { value: self.value }
}
fn from_storable(storable: Self::Storable) -> Self {
Self { value: storable.value }
}
}
impl Default for LibTestStruct {
fn default() -> Self {
Self { value: VALUE0 }
}
}
fn open_tempdir(tempdir: &tempfile::TempDir) -> fio::DirectoryProxy {
fuchsia_fs::directory::open_in_namespace_deprecated(
tempdir.path().to_str().expect("tempdir path is not valid UTF-8"),
fio::OpenFlags::RIGHT_READABLE | fio::OpenFlags::RIGHT_WRITABLE,
)
.expect("failed to open connection to tempdir")
}
#[fuchsia::test]
async fn test_get() {
let value_to_get = LibTestStruct { value: VALUE1 };
let tempdir = tempfile::tempdir().expect("failed to create tempdir");
let content = persist(&value_to_get.to_storable()).unwrap();
std::fs::write(tempdir.path().join("xyz.pfidl"), content).expect("failed to write file");
let storage_dir = open_tempdir(&tempdir);
let (storage, sync_tasks) = FidlStorage::with_file_proxy(
vec![(LibTestStruct::KEY, None)],
storage_dir,
move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
)
.await
.expect("should be able to generate file");
for task in sync_tasks {
task.detach();
}
let result = storage.get::<LibTestStruct>().await;
assert_eq!(result.value, VALUE1);
}
#[fuchsia::test]
async fn test_get_default() {
let tempdir = tempfile::tempdir().expect("failed to create tempdir");
let storage_dir = open_tempdir(&tempdir);
let (storage, sync_tasks) = FidlStorage::with_file_proxy(
vec![(LibTestStruct::KEY, None)],
storage_dir,
move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
)
.await
.expect("file proxy should be created");
for task in sync_tasks {
task.detach();
}
let result = storage.get::<LibTestStruct>().await;
assert_eq!(result.value, VALUE0);
}
struct DirectoryInterceptor {
real_dir: fio::DirectoryProxy,
inner: std::sync::Mutex<DirectoryInterceptorInner>,
}
struct DirectoryInterceptorInner {
sync_notifier: Option<futures::channel::mpsc::UnboundedSender<()>>,
#[allow(clippy::type_complexity)]
open_interceptor: Box<dyn Fn(&str, bool) -> Option<Status>>,
}
impl DirectoryInterceptor {
#[allow(clippy::arc_with_non_send_sync)]
fn new(real_dir: fio::DirectoryProxy) -> (Arc<Self>, fio::DirectoryProxy) {
let (proxy, requests) =
fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>().unwrap();
let this = Arc::new(Self {
real_dir,
inner: std::sync::Mutex::new(DirectoryInterceptorInner {
sync_notifier: None,
open_interceptor: Box::new(|_, _| None),
}),
});
fasync::Task::local(this.clone().run(requests)).detach();
(this.clone(), proxy)
}
fn install_sync_notifier(&self) -> futures::channel::mpsc::UnboundedReceiver<()> {
let (sender, receiver) = futures::channel::mpsc::unbounded();
self.inner.lock().unwrap().sync_notifier = Some(sender);
receiver
}
#[allow(clippy::type_complexity)]
fn set_open_interceptor(&self, interceptor: Box<dyn Fn(&str, bool) -> Option<Status>>) {
self.inner.lock().unwrap().open_interceptor = interceptor;
}
async fn run(self: Arc<Self>, mut requests: fio::DirectoryRequestStream) {
while let Ok(Some(request)) = requests.try_next().await {
match request {
fio::DirectoryRequest::Open {
flags,
mode,
path,
object,
control_handle: _,
} => {
match (self.inner.lock().unwrap().open_interceptor)(
&path,
flags.intersects(fio::OpenFlags::CREATE),
) {
Some(status) => {
let (_, control_handle) =
object.into_stream_and_control_handle().unwrap();
control_handle
.send_on_open_(status.into_raw(), None)
.expect("failed to send OnOpen event");
control_handle.shutdown_with_epitaph(status);
}
None => {
self.real_dir
.open(flags, mode, &path, object)
.expect("failed to forward Open request");
}
}
}
fio::DirectoryRequest::Open3 {
path,
flags,
options,
object,
control_handle: _,
} => {
let create = flags.intersects(fio::Flags::FLAG_MUST_CREATE);
match (self.inner.lock().unwrap().open_interceptor)(&path, create) {
Some(status) => {
object.close_with_epitaph(status).expect("failed to send epitaph");
}
None => {
self.real_dir
.open3(&path, flags, &options, object)
.expect("failed to forward Open3 request");
}
}
}
fio::DirectoryRequest::Sync { responder } => {
let response =
self.real_dir.sync().await.expect("failed to forward Sync request");
responder.send(response).expect("failed to respond to Sync request");
if let Some(sender) = &self.inner.lock().unwrap().sync_notifier {
sender.unbounded_send(()).unwrap();
}
}
fio::DirectoryRequest::Rename { src, dst_parent_token, dst, responder } => {
let response = self
.real_dir
.rename(&src, dst_parent_token, &dst)
.await
.expect("failed to forward Rename request");
responder.send(response).expect("failed to respond to Rename request");
}
fio::DirectoryRequest::GetToken { responder } => {
let response = self
.real_dir
.get_token()
.await
.expect("failed to forward GetToken request");
responder
.send(response.0, response.1)
.expect("failed to respond to GetToken request");
}
request => unimplemented!("request: {:?}", request),
}
}
}
}
fn run_until_ready<F>(executor: &mut TestExecutor, fut: F) -> F::Output
where
F: std::future::Future,
{
let mut fut = std::pin::pin!(fut);
loop {
match executor.run_until_stalled(&mut fut) {
Poll::Ready(result) => return result,
Poll::Pending => std::thread::yield_now(),
}
}
}
fn assert_file_not_found(
executor: &mut TestExecutor,
directory: &fio::DirectoryProxy,
file_name: &str,
) {
let open_fut = fuchsia_fs::directory::open_file_deprecated(
directory,
file_name,
OpenFlags::RIGHT_READABLE,
);
let result = run_until_ready(executor, open_fut);
assert_matches!(result, Result::Err(e) if e.is_not_found_error());
}
fn assert_file_contents(
executor: &mut TestExecutor,
directory: &fio::DirectoryProxy,
file_name: &str,
expected_contents: TestStruct,
) {
let read_fut = fuchsia_fs::directory::read_file(directory, file_name);
let data = run_until_ready(executor, read_fut).expect("reading file");
let data = fidl::unpersist::<TestStruct>(&data).expect("failed to read file as TestStruct");
assert_eq!(data, expected_contents);
}
#[fuchsia::test]
fn test_first_write_syncs_immediately() {
let written_value = VALUE1;
let mut executor = TestExecutor::new_with_fake_time();
executor.set_fake_time(Time::from_nanos(0));
let tempdir = tempfile::tempdir().expect("failed to create tempdir");
let storage_dir = open_tempdir(&tempdir);
let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
let mut sync_receiver = interceptor.install_sync_notifier();
let storage_fut = FidlStorage::with_file_proxy(
vec![(LibTestStruct::KEY, None)],
Clone::clone(&storage_dir),
move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
);
futures::pin_mut!(storage_fut);
let (storage, _sync_tasks) =
if let Poll::Ready(storage) = executor.run_until_stalled(&mut storage_fut) {
storage.expect("file proxy should be created")
} else {
panic!("storage creation stalled");
};
let value_to_write = LibTestStruct { value: written_value };
let write_future = storage.write(value_to_write);
futures::pin_mut!(write_future);
assert_matches!(
run_until_ready(&mut executor, &mut write_future),
Result::Ok(UpdateState::Updated)
);
assert_file_not_found(&mut executor, &storage_dir, "xyz.pfidl");
run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
assert_file_contents(
&mut executor,
&storage_dir,
"xyz.pfidl",
value_to_write.to_storable(),
);
}
#[fuchsia::test]
fn test_second_write_syncs_after_interval() {
let written_value = VALUE1;
let second_value = VALUE2;
let mut executor = TestExecutor::new_with_fake_time();
executor.set_fake_time(Time::from_nanos(0));
let tempdir = tempfile::tempdir().expect("failed to create tempdir");
let storage_dir = open_tempdir(&tempdir);
let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
let mut sync_receiver = interceptor.install_sync_notifier();
let storage_fut = FidlStorage::with_file_proxy(
vec![(LibTestStruct::KEY, None)],
Clone::clone(&storage_dir),
move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
);
futures::pin_mut!(storage_fut);
let (storage, _sync_tasks) =
if let Poll::Ready(storage) = executor.run_until_stalled(&mut storage_fut) {
storage.expect("file proxy should be created")
} else {
panic!("storage creation stalled");
};
let value_to_write = LibTestStruct { value: written_value };
let write_future = storage.write(value_to_write);
futures::pin_mut!(write_future);
assert_matches!(
run_until_ready(&mut executor, &mut write_future),
Result::Ok(UpdateState::Updated)
);
assert_file_not_found(&mut executor, &storage_dir, "xyz.pfidl");
run_until_ready(&mut executor, &mut sync_receiver.next()).expect("directory never synced");
assert_file_contents(
&mut executor,
&storage_dir,
"xyz.pfidl",
value_to_write.to_storable(),
);
let value_to_write2 = LibTestStruct { value: second_value };
let write_future = storage.write(value_to_write2);
futures::pin_mut!(write_future);
assert_matches!(
run_until_ready(&mut executor, &mut write_future),
Result::Ok(UpdateState::Updated)
);
assert_file_contents(
&mut executor,
&storage_dir,
"xyz.pfidl",
value_to_write.to_storable(),
);
executor.set_fake_time(Time::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000 - 1));
assert!(!executor.wake_expired_timers());
executor.set_fake_time(Time::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000));
run_until_ready(&mut executor, &mut sync_receiver.next()).expect("directory never synced");
assert_file_contents(
&mut executor,
&storage_dir,
"xyz.pfidl",
value_to_write2.to_storable(),
);
}
#[derive(Copy, Clone, Default, Debug)]
struct LibWrongStruct;
impl FidlStorageConvertible for LibWrongStruct {
type Storable = WrongStruct;
type Loader = NoneT;
const KEY: &'static str = "WRONG_STRUCT";
fn to_storable(self) -> Self::Storable {
WrongStruct
}
fn from_storable(_: Self::Storable) -> Self {
LibWrongStruct
}
}
#[fuchsia::test]
async fn test_write_with_mismatch_type_returns_error() {
let tempdir = tempfile::tempdir().expect("failed to create tempdir");
let storage_dir = open_tempdir(&tempdir);
let (storage, sync_tasks) = FidlStorage::with_file_proxy(
vec![(LibTestStruct::KEY, None)],
storage_dir,
move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
)
.await
.expect("file proxy should be created");
for task in sync_tasks {
task.detach();
}
let result = storage.write(LibTestStruct { value: VALUE2 }).await;
assert!(result.is_ok());
let result = storage.write(LibWrongStruct).await;
assert_matches!(result, Err(e) if e.to_string() == "Invalid data keyed by WRONG_STRUCT");
}
#[fuchsia::test]
fn test_multiple_write_debounce() {
let mut executor = TestExecutor::new_with_fake_time();
executor.set_fake_time(Time::from_nanos(0));
let tempdir = tempfile::tempdir().expect("failed to create tempdir");
let storage_dir = open_tempdir(&tempdir);
let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
let mut sync_receiver = interceptor.install_sync_notifier();
let storage_fut = FidlStorage::with_file_proxy(
vec![(LibTestStruct::KEY, None)],
Clone::clone(&storage_dir),
move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
);
let (storage, _sync_tasks) =
run_until_ready(&mut executor, storage_fut).expect("file proxy should be created");
let first_value = VALUE1;
let second_value = VALUE2;
let third_value = VALUE0;
let value_to_write = LibTestStruct { value: first_value };
let result = run_until_ready(&mut executor, storage.write(value_to_write));
assert_matches!(result, Result::Ok(UpdateState::Updated));
assert_file_not_found(&mut executor, &storage_dir, "xyz.pfidl");
run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
assert_file_contents(
&mut executor,
&storage_dir,
"xyz.pfidl",
value_to_write.to_storable(),
);
let value_to_write2 = LibTestStruct { value: second_value };
let result = run_until_ready(&mut executor, storage.write(value_to_write2));
assert_matches!(result, Result::Ok(UpdateState::Updated));
let data = run_until_ready(&mut executor, storage.get::<LibTestStruct>());
assert_eq!(data, value_to_write2);
assert_file_contents(
&mut executor,
&storage_dir,
"xyz.pfidl",
value_to_write.to_storable(),
);
let value_to_write3 = LibTestStruct { value: third_value };
let result = run_until_ready(&mut executor, storage.write(value_to_write3));
assert_matches!(result, Result::Ok(UpdateState::Updated));
let data = run_until_ready(&mut executor, storage.get::<LibTestStruct>());
assert_eq!(data, value_to_write3);
assert_file_contents(
&mut executor,
&storage_dir,
"xyz.pfidl",
value_to_write.to_storable(),
);
executor.set_fake_time(Time::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000 - 1));
assert!(!executor.wake_expired_timers());
assert_file_contents(
&mut executor,
&storage_dir,
"xyz.pfidl",
value_to_write.to_storable(),
);
executor.set_fake_time(Time::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000));
run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
assert_file_contents(
&mut executor,
&storage_dir,
"xyz.pfidl",
value_to_write3.to_storable(),
);
}
#[allow(clippy::unused_unit)]
#[test_case(1, 500)]
#[test_case(2, 1_000)]
#[test_case(3, 2_000)]
#[test_case(4, 4_000)]
#[test_case(5, 8_000)]
#[test_case(6, 16_000)]
#[test_case(7, 32_000)]
#[test_case(8, 64_000)]
#[test_case(9, 128_000)]
#[test_case(10, 256_000)]
#[test_case(11, 512_000)]
#[test_case(12, 1_024_000)]
#[test_case(13, 1_800_000)]
#[test_case(14, 1_800_000)]
fn test_exponential_backoff(retry_count: usize, max_wait_time: usize) {
let mut executor = TestExecutor::new_with_fake_time();
executor.set_fake_time(Time::from_nanos(0));
let tempdir = tempfile::tempdir().expect("failed to create tempdir");
let storage_dir = open_tempdir(&tempdir);
let (interceptor, storage_dir) = DirectoryInterceptor::new(storage_dir);
let attempts = std::sync::Mutex::new(0);
interceptor.set_open_interceptor(Box::new(move |path, create| {
let mut attempts_guard = attempts.lock().unwrap();
if path == "abc_tmp.pfidl" && create && *attempts_guard < retry_count {
*attempts_guard += 1;
Some(Status::NO_SPACE)
} else {
None
}
}));
let mut sync_receiver = interceptor.install_sync_notifier();
let expected_data = vec![1];
let cached_storage = Arc::new(Mutex::new(CachedStorage {
current_data: Some(expected_data.clone()),
temp_file_path: "abc_tmp.pfidl".to_owned(),
file_path: "abc.pfidl".to_owned(),
}));
let (sender, receiver) = futures::channel::mpsc::unbounded();
let task = fasync::Task::spawn(FidlStorage::synchronize_task(
Clone::clone(&storage_dir),
Arc::clone(&cached_storage),
receiver,
));
futures::pin_mut!(task);
executor.set_fake_time(Time::from_nanos(0));
sender.unbounded_send(()).expect("can send flush signal");
assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
let mut clock_nanos = 0;
for new_duration in (0..retry_count).map(|i| {
(2_i64.pow(i as u32) * MIN_FLUSH_INTERVAL_MS).min(max_wait_time as i64) * 1_000_000
- (i == retry_count - 1) as i64
}) {
executor.set_fake_time(Time::from_nanos(clock_nanos));
assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
assert_file_not_found(&mut executor, &storage_dir, "abc_tmp.pfidl");
assert_file_not_found(&mut executor, &storage_dir, "abc.pfidl");
clock_nanos += new_duration;
}
executor.set_fake_time(Time::from_nanos(clock_nanos));
assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
assert_file_not_found(&mut executor, &storage_dir, "abc_tmp.pfidl");
assert_file_not_found(&mut executor, &storage_dir, "abc.pfidl");
clock_nanos += 1;
executor.set_fake_time(Time::from_nanos(clock_nanos));
assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
run_until_ready(&mut executor, sync_receiver.next()).expect("directory never synced");
let read_fut = fuchsia_fs::directory::read_file(&storage_dir, "abc.pfidl");
let data = run_until_ready(&mut executor, read_fut).expect("reading file");
assert_eq!(data, expected_data);
drop(sender);
run_until_ready(&mut executor, task);
}
}