use crate::UpdateState;
use anyhow::{bail, format_err, Context, Error};
use fidl::Status;
use fidl::{persist, unpersist, Persistable};
use fidl_fuchsia_io::DirectoryProxy;
use fuchsia_async::{Task, Time, Timer};
use fuchsia_fs::file::ReadError;
use fuchsia_fs::node::OpenError;
use fuchsia_fs::OpenFlags;
use fuchsia_zircon as zx;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::future::OptionFuture;
use futures::lock::{Mutex, MutexGuard};
use futures::{FutureExt, StreamExt};
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use zx::Duration;
const MIN_FLUSH_INTERVAL_MS: i64 = 500;
const MAX_FLUSH_INTERVAL_MS: i64 = 1_800_000; const MIN_FLUSH_DURATION: Duration = Duration::from_millis(MIN_FLUSH_INTERVAL_MS);
pub trait FidlStorageConvertible {
type Storable: Persistable + Any;
const KEY: &'static str;
fn default_value() -> Self;
fn to_storable(self) -> Self::Storable;
fn from_storable(storable: Self::Storable) -> Self;
}
pub struct FidlStorage {
typed_storage_map: HashMap<&'static str, TypedStorage>,
caching_enabled: bool,
debounce_writes: bool,
storage_dir: DirectoryProxy,
}
struct TypedStorage {
flush_sender: UnboundedSender<()>,
cached_storage: Arc<Mutex<CachedStorage>>,
}
struct CachedStorage {
current_data: Option<Vec<u8>>,
temp_file_path: String,
file_path: String,
}
impl CachedStorage {
async fn sync(&mut self, storage_dir: &DirectoryProxy) -> Result<(), Error> {
{
let file_proxy = fuchsia_fs::directory::open_file(
storage_dir,
&self.temp_file_path,
OpenFlags::CREATE
| OpenFlags::TRUNCATE
| OpenFlags::RIGHT_READABLE
| OpenFlags::RIGHT_WRITABLE,
)
.await
.with_context(|| format!("unable to open {:?} for writing", self.temp_file_path))?;
fuchsia_fs::file::write(&file_proxy, self.current_data.as_ref().unwrap())
.await
.context("failed to write data to file")?;
file_proxy
.close()
.await
.context("failed to call close on temp file")?
.map_err(zx::Status::from_raw)?;
}
fuchsia_fs::directory::rename(storage_dir, &self.temp_file_path, &self.file_path)
.await
.context("failed to rename temp file to permanent file")?;
storage_dir
.sync()
.await
.context("failed to call sync on directory after rename")?
.map_err(zx::Status::from_raw)
.or_else(|e| if let zx::Status::NOT_SUPPORTED = e { Ok(()) } else { Err(e) })
.context("failed to sync rename to directory")
}
}
impl FidlStorage {
pub(crate) async fn with_file_proxy<I, G>(
iter: I,
storage_dir: DirectoryProxy,
files_generator: G,
) -> Result<(Self, Vec<Task<()>>), Error>
where
I: IntoIterator<Item = &'static str>,
G: Fn(&'static str) -> Result<(String, String), Error>,
{
let mut typed_storage_map = HashMap::new();
let iter = iter.into_iter();
typed_storage_map.reserve(iter.size_hint().0);
let mut sync_tasks = Vec::with_capacity(iter.size_hint().0);
for key in iter {
let (flush_sender, flush_receiver) = futures::channel::mpsc::unbounded::<()>();
let (temp_file_path, file_path) =
files_generator(key).context("failed to generate file")?;
let cached_storage = Arc::new(Mutex::new(CachedStorage {
current_data: None,
temp_file_path,
file_path,
}));
let storage =
TypedStorage { flush_sender, cached_storage: Arc::clone(&cached_storage) };
let sync_task = Task::spawn(Self::synchronize_task(
Clone::clone(&storage_dir),
cached_storage,
flush_receiver,
));
sync_tasks.push(sync_task);
let _ = typed_storage_map.insert(key, storage);
}
Ok((
FidlStorage {
caching_enabled: true,
debounce_writes: true,
typed_storage_map,
storage_dir,
},
sync_tasks,
))
}
async fn synchronize_task(
storage_dir: DirectoryProxy,
cached_storage: Arc<Mutex<CachedStorage>>,
flush_receiver: UnboundedReceiver<()>,
) {
let mut has_pending_flush = false;
let mut last_flush: Time = Time::now() - MIN_FLUSH_DURATION;
let mut next_flush_timer: OptionFuture<Timer> = None.into();
let mut next_flush_timer_fuse = next_flush_timer.fuse();
let mut retries = 0;
let mut retrying = false;
let flush_fuse = flush_receiver.fuse();
futures::pin_mut!(flush_fuse);
loop {
futures::select! {
_ = flush_fuse.select_next_some() => {
if retrying {
continue;
}
let now = Time::now();
let next_flush_time = if now - last_flush > MIN_FLUSH_DURATION {
now
} else {
last_flush + MIN_FLUSH_DURATION
};
has_pending_flush = true;
next_flush_timer = Some(Timer::new(next_flush_time)).into();
next_flush_timer_fuse = next_flush_timer.fuse();
}
_ = next_flush_timer_fuse => {
if has_pending_flush {
let mut cached_storage = cached_storage.lock().await;
if let Err(e) = cached_storage.sync(&storage_dir).await {
retrying = true;
let flush_duration = Duration::from_millis(
2_i64.saturating_pow(retries)
.saturating_mul(MIN_FLUSH_INTERVAL_MS)
.min(MAX_FLUSH_INTERVAL_MS)
);
let next_flush_time = Time::now() + flush_duration;
tracing::error!(
"Failed to sync write to disk for {:?}, delaying by {:?}, \
caused by: {:?}",
cached_storage.file_path,
flush_duration,
e
);
next_flush_timer = Some(Timer::new(next_flush_time)).into();
next_flush_timer_fuse = next_flush_timer.fuse();
retries += 1;
continue;
}
last_flush = Time::now();
has_pending_flush = false;
retrying = false;
retries = 0;
}
}
complete => break,
}
}
}
#[cfg(test)]
#[allow(dead_code)]
fn set_caching_enabled(&mut self, enabled: bool) {
self.caching_enabled = enabled;
}
#[cfg(test)]
#[allow(dead_code)]
fn set_debounce_writes(&mut self, debounce: bool) {
self.debounce_writes = debounce;
}
async fn inner_write(
&self,
key: &'static str,
new_value: Vec<u8>,
) -> Result<UpdateState, Error> {
let typed_storage = self
.typed_storage_map
.get(key)
.ok_or_else(|| format_err!("Invalid data keyed by {}", key))?;
let mut cached_storage = typed_storage.cached_storage.lock().await;
let bytes;
let cached_value = match cached_storage.current_data.as_ref() {
Some(cached_value) => Some(cached_value),
None => {
let file_proxy = fuchsia_fs::directory::open_file(
&self.storage_dir,
&cached_storage.file_path,
OpenFlags::RIGHT_READABLE,
)
.await;
bytes = match file_proxy {
Ok(file_proxy) => match fuchsia_fs::file::read(&file_proxy).await {
Ok(bytes) => Some(bytes),
Err(ReadError::Open(OpenError::OpenError(e))) if e == Status::NOT_FOUND => {
None
}
Err(e) => {
bail!("failed to get value from fidl storage for {:?}: {:?}", key, e)
}
},
Err(OpenError::OpenError(Status::NOT_FOUND)) => None,
Err(e) => bail!("unable to read data on disk for {:?}: {:?}", key, e),
};
bytes.as_ref()
}
};
Ok(if cached_value.map(|c| *c != new_value).unwrap_or(true) {
cached_storage.current_data = Some(new_value);
if !self.debounce_writes {
cached_storage
.sync(&self.storage_dir)
.await
.with_context(|| format!("Failed to sync data for key {key:?}"))?;
} else {
typed_storage.flush_sender.unbounded_send(()).with_context(|| {
format!("flush_sender failed to send flush message, associated key is {key}")
})?;
}
UpdateState::Updated
} else {
UpdateState::Unchanged
})
}
pub async fn write<T>(&self, new_value: T) -> Result<UpdateState, Error>
where
T: FidlStorageConvertible,
{
let new_value = persist(&new_value.to_storable())?;
self.inner_write(T::KEY, new_value).await
}
async fn get_inner(&self, key: &'static str) -> MutexGuard<'_, CachedStorage> {
let typed_storage = self
.typed_storage_map
.get(key)
.unwrap_or_else(|| panic!("Invalid data keyed by {key}"));
let mut cached_storage = typed_storage.cached_storage.lock().await;
if cached_storage.current_data.is_none() || !self.caching_enabled {
if let Some(file_proxy) = match fuchsia_fs::directory::open_file(
&self.storage_dir,
&cached_storage.file_path,
OpenFlags::RIGHT_READABLE,
)
.await
{
Ok(file_proxy) => Some(file_proxy),
Err(OpenError::OpenError(Status::NOT_FOUND)) => None,
Err(e) => panic!("failed to open file for {key:?}: {e:?}"),
} {
let data = match fuchsia_fs::file::read(&file_proxy).await {
Ok(data) => Some(data),
Err(ReadError::ReadError(Status::NOT_FOUND)) => None,
Err(e) => panic!("failed to get fidl data from disk for {key:?}: {e:?}"),
};
cached_storage.current_data = data;
}
}
cached_storage
}
pub async fn get<T>(&self) -> T
where
T: FidlStorageConvertible,
{
self.get_inner(T::KEY)
.await
.current_data
.as_ref()
.map(|data| {
<T as FidlStorageConvertible>::from_storable(
unpersist(data).expect("Should not be able to save mismatching types in file"),
)
})
.unwrap_or_else(|| T::default_value())
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use fasync::TestExecutor;
use fidl::endpoints::{create_proxy, ServerEnd};
use fidl_fuchsia_io::DirectoryMarker;
use fidl_test_storage::{TestStruct, WrongStruct};
use fuchsia_async as fasync;
use std::task::Poll;
use test_case::test_case;
use vfs::directory::entry_container::Directory;
use vfs::directory::mutable::simple::tree_constructor;
use vfs::execution_scope::ExecutionScope;
use vfs::file::vmo::read_write;
use vfs::mut_pseudo_directory;
const VALUE0: i32 = 3;
const VALUE1: i32 = 33;
const VALUE2: i32 = 128;
impl FidlStorageConvertible for TestStruct {
type Storable = Self;
const KEY: &'static str = "testkey";
fn default_value() -> Self {
TestStruct { value: VALUE0 }
}
fn to_storable(self) -> Self::Storable {
self
}
fn from_storable(storable: Self::Storable) -> Self {
storable
}
}
fn serve_vfs_dir(root: Arc<impl Directory>) -> DirectoryProxy {
let fs_scope = ExecutionScope::build()
.entry_constructor(tree_constructor(move |_, _| Ok(read_write(b""))))
.new();
let (client, server) = create_proxy::<DirectoryMarker>().unwrap();
root.open(
fs_scope,
OpenFlags::RIGHT_READABLE | OpenFlags::RIGHT_WRITABLE,
vfs::path::Path::dot(),
ServerEnd::new(server.into_channel()),
);
client
}
#[fasync::run_until_stalled(test)]
async fn test_get() {
let value_to_get = TestStruct { value: VALUE1 };
let content = persist(&value_to_get.to_storable()).unwrap();
let fs = mut_pseudo_directory! {
"xyz.pfidl" => read_write(content),
};
let storage_dir = serve_vfs_dir(fs);
let (storage, sync_tasks) =
FidlStorage::with_file_proxy(vec![TestStruct::KEY], storage_dir, move |_| {
Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl")))
})
.await
.expect("should be able to generate file");
for task in sync_tasks {
task.detach();
}
let result = storage.get::<TestStruct>().await;
assert_eq!(result.value, VALUE1);
}
#[fasync::run_until_stalled(test)]
async fn test_get_default() {
let fs = mut_pseudo_directory! {};
let storage_dir = serve_vfs_dir(fs);
let (storage, sync_tasks) =
FidlStorage::with_file_proxy(vec![TestStruct::KEY], storage_dir, move |_| {
Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl")))
})
.await
.expect("file proxy should be created");
for task in sync_tasks {
task.detach();
}
let result = storage.get::<TestStruct>().await;
assert_eq!(result.value, VALUE0);
}
#[fuchsia::test]
fn test_first_write_syncs_immediately() {
let written_value = VALUE1;
let mut executor = TestExecutor::new_with_fake_time();
executor.set_fake_time(Time::from_nanos(0));
let fs = mut_pseudo_directory! {};
let storage_dir = serve_vfs_dir(fs);
let storage_fut = FidlStorage::with_file_proxy(
vec![TestStruct::KEY],
Clone::clone(&storage_dir),
move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
);
futures::pin_mut!(storage_fut);
let (storage, sync_tasks) =
if let Poll::Ready(storage) = executor.run_until_stalled(&mut storage_fut) {
storage.expect("file proxy should be created")
} else {
panic!("storage creation stalled");
};
assert_eq!(sync_tasks.len(), 1);
let sync_task = sync_tasks.into_iter().next().unwrap();
futures::pin_mut!(sync_task);
let value_to_write = TestStruct { value: written_value };
let write_future = storage.write(value_to_write);
futures::pin_mut!(write_future);
assert_matches!(
executor.run_until_stalled(&mut write_future),
Poll::Ready(Result::Ok(UpdateState::Updated))
);
let open_fut =
fuchsia_fs::directory::open_file(&storage_dir, "xyz.pfidl", OpenFlags::RIGHT_READABLE);
futures::pin_mut!(open_fut);
let result = executor.run_until_stalled(&mut open_fut);
assert_matches!(
result,
Poll::Ready(Result::Err(OpenError::OpenError(zx::Status::NOT_FOUND)))
);
let _ = executor.run_until_stalled(&mut sync_task);
let open_fut =
fuchsia_fs::directory::open_file(&storage_dir, "xyz.pfidl", OpenFlags::RIGHT_READABLE);
futures::pin_mut!(open_fut);
let result = executor.run_until_stalled(&mut open_fut);
let file = if let Poll::Ready(Result::Ok(file)) = result {
file
} else {
panic!("result is not ready: {result:?}");
};
let read_fut = fuchsia_fs::file::read_fidl::<TestStruct>(&file);
futures::pin_mut!(read_fut);
let result = executor.run_until_stalled(&mut read_fut);
let data = if let Poll::Ready(Result::Ok(data)) = result {
data
} else {
panic!("result is not ready: {result:?}");
};
assert_eq!(data, value_to_write);
}
#[fuchsia::test]
fn test_second_write_syncs_after_interval() {
let written_value = VALUE1;
let second_value = VALUE2;
let mut executor = TestExecutor::new_with_fake_time();
executor.set_fake_time(Time::from_nanos(0));
let fs = mut_pseudo_directory! {};
let storage_dir = serve_vfs_dir(fs);
let storage_fut = FidlStorage::with_file_proxy(
vec![TestStruct::KEY],
Clone::clone(&storage_dir),
move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
);
futures::pin_mut!(storage_fut);
let (storage, sync_tasks) =
if let Poll::Ready(storage) = executor.run_until_stalled(&mut storage_fut) {
storage.expect("file proxy should be created")
} else {
panic!("storage creation stalled");
};
let sync_task = sync_tasks.into_iter().next().unwrap();
futures::pin_mut!(sync_task);
let value_to_write = TestStruct { value: written_value };
let write_future = storage.write(value_to_write);
futures::pin_mut!(write_future);
assert_matches!(
executor.run_until_stalled(&mut write_future),
Poll::Ready(Result::Ok(UpdateState::Updated))
);
let open_fut =
fuchsia_fs::directory::open_file(&storage_dir, "xyz.pfidl", OpenFlags::RIGHT_READABLE);
futures::pin_mut!(open_fut);
let result = executor.run_until_stalled(&mut open_fut);
assert_matches!(
result,
Poll::Ready(Result::Err(OpenError::OpenError(zx::Status::NOT_FOUND)))
);
let _ = executor.run_until_stalled(&mut sync_task);
let open_fut =
fuchsia_fs::directory::open_file(&storage_dir, "xyz.pfidl", OpenFlags::RIGHT_READABLE);
futures::pin_mut!(open_fut);
let result = executor.run_until_stalled(&mut open_fut);
let file = if let Poll::Ready(Result::Ok(file)) = result {
file
} else {
panic!("result is not ready: {result:?}");
};
let read_fut = fuchsia_fs::file::read_fidl::<TestStruct>(&file);
futures::pin_mut!(read_fut);
let result = executor.run_until_stalled(&mut read_fut);
let data = if let Poll::Ready(Result::Ok(data)) = result {
data
} else {
panic!("result is not ready: {result:?}");
};
assert_eq!(data, value_to_write);
let value_to_write2 = TestStruct { value: second_value };
let write_future = storage.write(value_to_write2);
futures::pin_mut!(write_future);
assert_matches!(
executor.run_until_stalled(&mut write_future),
Poll::Ready(Result::Ok(UpdateState::Updated))
);
let open_fut =
fuchsia_fs::directory::open_file(&storage_dir, "xyz.pfidl", OpenFlags::RIGHT_READABLE);
futures::pin_mut!(open_fut);
let result = executor.run_until_stalled(&mut open_fut);
let file = if let Poll::Ready(Result::Ok(file)) = result {
file
} else {
panic!("result is not ready: {result:?}");
};
let read_fut = fuchsia_fs::file::read_fidl::<TestStruct>(&file);
futures::pin_mut!(read_fut);
let result = executor.run_until_stalled(&mut read_fut);
let data = if let Poll::Ready(Result::Ok(data)) = result {
data
} else {
panic!("result is not ready: {result:?}");
};
assert_eq!(data, value_to_write);
executor.set_fake_time(Time::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000 - 1));
assert!(!executor.wake_expired_timers());
executor.set_fake_time(Time::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000));
let _ = executor.run_until_stalled(&mut sync_task);
let open_fut =
fuchsia_fs::directory::open_file(&storage_dir, "xyz.pfidl", OpenFlags::RIGHT_READABLE);
futures::pin_mut!(open_fut);
let result = executor.run_until_stalled(&mut open_fut);
let file = if let Poll::Ready(Result::Ok(file)) = result {
file
} else {
panic!("result is not ready: {result:?}");
};
let read_fut = fuchsia_fs::file::read_fidl::<TestStruct>(&file);
futures::pin_mut!(read_fut);
let result = executor.run_until_stalled(&mut read_fut);
let data = if let Poll::Ready(Result::Ok(data)) = result {
data
} else {
panic!("result is not ready: {result:?}");
};
assert_eq!(data, value_to_write2);
}
impl FidlStorageConvertible for WrongStruct {
type Storable = Self;
const KEY: &'static str = "WRONG_STRUCT";
fn default_value() -> Self {
Self
}
fn to_storable(self) -> Self::Storable {
self
}
fn from_storable(storable: Self::Storable) -> Self {
storable
}
}
#[fasync::run_until_stalled(test)]
async fn test_write_with_mismatch_type_returns_error() {
let fs = mut_pseudo_directory! {};
let storage_dir = serve_vfs_dir(fs);
let (storage, sync_tasks) =
FidlStorage::with_file_proxy(vec![TestStruct::KEY], storage_dir, move |_| {
Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl")))
})
.await
.expect("file proxy should be created");
for task in sync_tasks {
task.detach();
}
let result = storage.write(TestStruct { value: VALUE2 }).await;
assert!(result.is_ok());
let result = storage.write(WrongStruct).await;
assert_matches!(result, Err(e) if e.to_string() == "Invalid data keyed by WRONG_STRUCT");
}
macro_rules! run_to_ready {
($executor:expr, $fut:expr $(, $msg:expr $(,)?)? $(,)?) => {
{
let fut = $fut;
futures::pin_mut!(fut);
match $executor.run_until_stalled(&mut fut) {
Poll::Ready(result) => result,
Poll::Pending => run_to_ready!(@msg $($msg)?),
}
}
};
(@msg $msg:expr) => {
panic!($msg)
};
(@msg) => {
panic!("expected ready")
}
}
macro_rules! assert_file {
($executor:expr, $storage_dir:expr, $file_name:literal, $expected_contents:expr) => {
let open_fut = fuchsia_fs::directory::open_file(
&$storage_dir,
$file_name,
OpenFlags::RIGHT_READABLE,
);
let file = run_to_ready!($executor, open_fut).expect("opening file");
let read_fut = fuchsia_fs::file::read_fidl::<TestStruct>(&file);
let data = run_to_ready!($executor, read_fut).expect("reading file");
assert_eq!(data, $expected_contents);
};
}
#[fuchsia::test]
fn test_multiple_write_debounce() {
let mut executor = TestExecutor::new_with_fake_time();
executor.set_fake_time(Time::from_nanos(0));
let fs = mut_pseudo_directory! {};
let storage_dir = serve_vfs_dir(fs);
let storage_fut = FidlStorage::with_file_proxy(
vec![TestStruct::KEY],
Clone::clone(&storage_dir),
move |_| Ok((String::from("xyz_temp.pfidl"), String::from("xyz.pfidl"))),
);
let (storage, sync_tasks) =
run_to_ready!(executor, storage_fut, "storage creation stalled")
.expect("file proxy should be created");
let mut sync_task = sync_tasks.into_iter().next().unwrap();
let first_value = VALUE1;
let second_value = VALUE2;
let third_value = VALUE0;
let value_to_write = TestStruct { value: first_value };
let result = run_to_ready!(executor, storage.write(value_to_write));
assert_matches!(result, Result::Ok(UpdateState::Updated));
let result = run_to_ready!(
executor,
fuchsia_fs::directory::open_file(&storage_dir, "xyz.pfidl", OpenFlags::RIGHT_READABLE)
);
assert_matches!(result, Result::Err(OpenError::OpenError(zx::Status::NOT_FOUND)));
let _ = executor.run_until_stalled(&mut sync_task);
assert_file!(executor, storage_dir, "xyz.pfidl", value_to_write);
let value_to_write2 = TestStruct { value: second_value };
let result = run_to_ready!(executor, storage.write(value_to_write2));
assert_matches!(result, Result::Ok(UpdateState::Updated));
let data = run_to_ready!(executor, storage.get::<TestStruct>());
assert_eq!(data, value_to_write2);
assert_file!(executor, storage_dir, "xyz.pfidl", value_to_write);
let value_to_write3 = TestStruct { value: third_value };
let result = run_to_ready!(executor, storage.write(value_to_write3));
assert_matches!(result, Result::Ok(UpdateState::Updated));
let data = run_to_ready!(executor, storage.get::<TestStruct>());
assert_eq!(data, value_to_write3);
assert_file!(executor, storage_dir, "xyz.pfidl", value_to_write);
executor.set_fake_time(Time::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000 - 1));
assert!(!executor.wake_expired_timers());
assert_file!(executor, storage_dir, "xyz.pfidl", value_to_write);
executor.set_fake_time(Time::from_nanos(MIN_FLUSH_INTERVAL_MS * 1_000_000));
let _ = executor.run_until_stalled(&mut sync_task);
assert_file!(executor, storage_dir, "xyz.pfidl", value_to_write3);
}
fn serve_full_vfs_dir(root: Arc<impl Directory>, recovers_after: usize) -> DirectoryProxy {
let attempts = std::sync::Mutex::new(0);
let fs_scope = ExecutionScope::build()
.entry_constructor(tree_constructor(move |_, file_name| {
let mut attempts_guard = attempts.lock().unwrap();
if file_name == "abc_tmp.pfidl" && *attempts_guard < recovers_after {
*attempts_guard += 1;
println!("Force failing attempt {}", *attempts_guard);
Err(fidl::Status::NO_SPACE)
} else {
Ok(read_write(""))
}
}))
.new();
let (client, server) = create_proxy::<DirectoryMarker>().unwrap();
root.open(
fs_scope,
OpenFlags::RIGHT_READABLE | OpenFlags::RIGHT_WRITABLE,
vfs::path::Path::dot(),
ServerEnd::new(server.into_channel()),
);
client
}
#[allow(clippy::unused_unit)]
#[test_case(1, 500)]
#[test_case(2, 1_000)]
#[test_case(3, 2_000)]
#[test_case(4, 4_000)]
#[test_case(5, 8_000)]
#[test_case(6, 16_000)]
#[test_case(7, 32_000)]
#[test_case(8, 64_000)]
#[test_case(9, 128_000)]
#[test_case(10, 256_000)]
#[test_case(11, 512_000)]
#[test_case(12, 1_024_000)]
#[test_case(13, 1_800_000)]
#[test_case(14, 1_800_000)]
fn test_exponential_backoff(retry_count: usize, max_wait_time: usize) {
let mut executor = TestExecutor::new_with_fake_time();
executor.set_fake_time(Time::from_nanos(0));
let fs = mut_pseudo_directory! {};
let storage_dir = serve_full_vfs_dir(fs, retry_count);
let expected_data = vec![1];
let cached_storage = Arc::new(Mutex::new(CachedStorage {
current_data: Some(expected_data.clone()),
temp_file_path: "abc_tmp.pfidl".to_owned(),
file_path: "abc.pfidl".to_owned(),
}));
let (sender, receiver) = futures::channel::mpsc::unbounded();
let task = fasync::Task::spawn(FidlStorage::synchronize_task(
Clone::clone(&storage_dir),
Arc::clone(&cached_storage),
receiver,
));
futures::pin_mut!(task);
executor.set_fake_time(Time::from_nanos(0));
sender.unbounded_send(()).expect("can send flush signal");
assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
let mut clock_nanos = 0;
for new_duration in (0..retry_count).map(|i| {
(2_i64.pow(i as u32) * MIN_FLUSH_INTERVAL_MS).min(max_wait_time as i64) * 1_000_000
- (i == retry_count - 1) as i64
}) {
executor.set_fake_time(Time::from_nanos(clock_nanos));
assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
let result = run_to_ready!(
executor,
fuchsia_fs::directory::open_file(
&storage_dir,
"abc_tmp.pfidl",
OpenFlags::RIGHT_READABLE
)
);
assert_matches!(result, Result::Err(OpenError::OpenError(zx::Status::NOT_FOUND)));
let result = run_to_ready!(
executor,
fuchsia_fs::directory::open_file(
&storage_dir,
"abc.pfidl",
OpenFlags::RIGHT_READABLE
)
);
assert_matches!(result, Result::Err(OpenError::OpenError(zx::Status::NOT_FOUND)));
clock_nanos += new_duration;
}
executor.set_fake_time(Time::from_nanos(clock_nanos));
assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
let result = run_to_ready!(
executor,
fuchsia_fs::directory::open_file(
&storage_dir,
"abc_tmp.pfidl",
OpenFlags::RIGHT_READABLE
)
);
assert_matches!(result, Result::Err(OpenError::OpenError(zx::Status::NOT_FOUND)));
let result = run_to_ready!(
executor,
fuchsia_fs::directory::open_file(&storage_dir, "abc.pfidl", OpenFlags::RIGHT_READABLE)
);
assert_matches!(result, Result::Err(OpenError::OpenError(zx::Status::NOT_FOUND)));
clock_nanos += 1;
executor.set_fake_time(Time::from_nanos(clock_nanos));
assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
let open_fut =
fuchsia_fs::directory::open_file(&storage_dir, "abc.pfidl", OpenFlags::RIGHT_READABLE);
let file = run_to_ready!(executor, open_fut).expect("opening file");
let read_fut = fuchsia_fs::file::read(&file);
let data = run_to_ready!(executor, read_fut).expect("reading file");
assert_eq!(data, expected_data);
drop(sender);
assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
}
}